mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-26 15:28:35 +03:00
contrib/es9p_client: Add support for reporting notifications to SM-DP+
The ES9+ interface is not only used for downloading eSIM profiles, but it is also used to report back the installation result as well as profile management operations like enable/disable/delete. Change-Id: Iefba7fa0471b34eae30700ed43531a515af0eb93
This commit is contained in:
@@ -29,8 +29,9 @@ from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
|
|||||||
from cryptography.hazmat.primitives.asymmetric import ec
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
|
||||||
import pySim.esim.rsp as rsp
|
import pySim.esim.rsp as rsp
|
||||||
from pySim.esim import es9p
|
from pySim.esim import es9p, PMO
|
||||||
from pySim.utils import h2b, b2h, swap_nibbles, bertlv_parse_one_rawtag, bertlv_return_one_rawtlv
|
from pySim.utils import h2b, b2h, swap_nibbles, is_hexstr
|
||||||
|
from pySim.utils import bertlv_parse_one_rawtag, bertlv_return_one_rawtlv
|
||||||
from pySim.esim.x509_cert import CertAndPrivkey
|
from pySim.esim.x509_cert import CertAndPrivkey
|
||||||
from pySim.esim.es8p import BoundProfilePackage
|
from pySim.esim.es8p import BoundProfilePackage
|
||||||
|
|
||||||
@@ -63,6 +64,29 @@ parser_dl.add_argument('--output-path', default='.',
|
|||||||
parser_dl.add_argument('--confirmation-code',
|
parser_dl.add_argument('--confirmation-code',
|
||||||
help="Confirmation Code for the eSIM download")
|
help="Confirmation Code for the eSIM download")
|
||||||
|
|
||||||
|
# notification
|
||||||
|
parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification')
|
||||||
|
parser_ntf.add_argument('operation', choices=['enable','disable','delete'],
|
||||||
|
help='Profile Management Opreation whoise occurrence shall be notififed')
|
||||||
|
parser_ntf.add_argument('--sequence-nr', type=int, required=True,
|
||||||
|
help='eUICC global notification sequence number')
|
||||||
|
parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL')
|
||||||
|
parser_ntf.add_argument('--iccid', type=is_hexstr, help='ICCID to which the notification relates')
|
||||||
|
|
||||||
|
# notification-install
|
||||||
|
parser_ntfi = subparsers.add_parser('notification-install', help='ES9+ installation notification')
|
||||||
|
parser_ntfi.add_argument('--sequence-nr', type=int, required=True,
|
||||||
|
help='eUICC global notification sequence number')
|
||||||
|
parser_ntfi.add_argument('--transaction-id', required=True,
|
||||||
|
help='transactionId of previous ES9+ download')
|
||||||
|
parser_ntfi.add_argument('--notification-address', help='notificationAddress, if different from URL')
|
||||||
|
parser_ntfi.add_argument('--iccid', type=is_hexstr, help='ICCID to which the notification relates')
|
||||||
|
parser_ntfi.add_argument('--smdpp-oid', required=True, help='SM-DP+ OID (as in CERT.DPpb.ECDSA)')
|
||||||
|
parser_ntfi.add_argument('--isdp-aid', type=is_hexstr, required=True,
|
||||||
|
help='AID of the ISD-P of the installed profile')
|
||||||
|
parser_ntfi.add_argument('--sima-response', type=is_hexstr, required=True,
|
||||||
|
help='hex digits of BER-encoded SAIP EUICCResponse')
|
||||||
|
|
||||||
class Es9pClient:
|
class Es9pClient:
|
||||||
def __init__(self, opts):
|
def __init__(self, opts):
|
||||||
self.opts = opts
|
self.opts = opts
|
||||||
@@ -91,6 +115,49 @@ class Es9pClient:
|
|||||||
self.peer = es9p.Es9pApiClient(opts.url, server_cert_verify=opts.server_ca_cert)
|
self.peer = es9p.Es9pApiClient(opts.url, server_cert_verify=opts.server_ca_cert)
|
||||||
|
|
||||||
|
|
||||||
|
def do_notification(self):
|
||||||
|
|
||||||
|
ntf_metadata = {
|
||||||
|
'seqNumber': self.opts.sequence_nr,
|
||||||
|
'profileManagementOperation': PMO(self.opts.operation).to_bitstring(),
|
||||||
|
'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc,
|
||||||
|
}
|
||||||
|
if opts.iccid:
|
||||||
|
ntf_metadata['iccid'] = h2b(swap_nibbles(opts.iccid))
|
||||||
|
|
||||||
|
if self.opts.operation == 'download':
|
||||||
|
pird = {
|
||||||
|
'transactionId': self.opts.transaction_id,
|
||||||
|
'notificationMetadata': ntf_metadata,
|
||||||
|
'smdpOid': self.opts.smdpp_oid,
|
||||||
|
'finalResult': ('successResult', {
|
||||||
|
'aid': self.opts.isdp_aid,
|
||||||
|
'simaResponse': self.opts.sima_response,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
||||||
|
signature = self.cert_and_key.ecdsa_sign(pird_bin)
|
||||||
|
pn_dict = ('profileInstallationResult', {
|
||||||
|
'profileInstallationResultData': pird,
|
||||||
|
'euiccSignPIR': signature,
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
ntf_bin = rsp.asn1.encode('NotificationMetadata', ntf_metadata)
|
||||||
|
signature = self.cert_and_key.ecdsa_sign(ntf_bin)
|
||||||
|
pn_dict = ('otherSignedNotification', {
|
||||||
|
'tbsOtherNotification': ntf_metadata,
|
||||||
|
'euiccNotificationSignature': signature,
|
||||||
|
'euiccCertificate': rsp.asn1.decode('Certificate', self.cert_and_key.get_cert_as_der()),
|
||||||
|
'eumCertificate': rsp.asn1.decode('Certificate', self.eum_cert.public_bytes(Encoding.DER)),
|
||||||
|
})
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'pendingNotification': pn_dict,
|
||||||
|
}
|
||||||
|
#print(data)
|
||||||
|
res = self.peer.call_handleNotification(data)
|
||||||
|
|
||||||
|
|
||||||
def do_download(self):
|
def do_download(self):
|
||||||
|
|
||||||
print("Step 1: InitiateAuthentication...")
|
print("Step 1: InitiateAuthentication...")
|
||||||
@@ -243,3 +310,8 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
if opts.command == 'download':
|
if opts.command == 'download':
|
||||||
c.do_download()
|
c.do_download()
|
||||||
|
elif opts.command == 'notification':
|
||||||
|
c.do_notification()
|
||||||
|
elif opts.command == 'notification-install':
|
||||||
|
opts.operation = 'install'
|
||||||
|
c.do_notification()
|
||||||
|
|||||||
@@ -1,7 +1,51 @@
|
|||||||
import sys
|
import sys
|
||||||
from typing import Optional
|
from typing import Optional, Tuple
|
||||||
from importlib import resources
|
from importlib import resources
|
||||||
|
|
||||||
|
class PMO:
|
||||||
|
"""Convenience conversion class for ProfileManagementOperation as used in ES9+ notifications."""
|
||||||
|
pmo4operation = {
|
||||||
|
'install': 0x80,
|
||||||
|
'enable': 0x40,
|
||||||
|
'disable': 0x20,
|
||||||
|
'delete': 0x10,
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, op: str):
|
||||||
|
if not op in self.pmo4operation:
|
||||||
|
raise ValueError('Unknown operation "%s"' % op)
|
||||||
|
self.op = op
|
||||||
|
|
||||||
|
def to_int(self):
|
||||||
|
return self.pmo4operation[self.op]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _num_bits(data: int)-> int:
|
||||||
|
for i in range(0, 8):
|
||||||
|
if data & (1 << i):
|
||||||
|
return 8-i
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def to_bitstring(self) -> Tuple[bytes, int]:
|
||||||
|
"""return value in a format as used by asn1tools for BITSTRING."""
|
||||||
|
val = self.to_int()
|
||||||
|
return (bytes([val]), self._num_bits(val))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_int(cls, i: int) -> 'PMO':
|
||||||
|
"""Parse an integer representation."""
|
||||||
|
for k, v in cls.pmo4operation.items():
|
||||||
|
if v == i:
|
||||||
|
return cls(k)
|
||||||
|
raise ValueError('Unknown PMO 0x%02x' % i)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bitstring(cls, bstr: Tuple[bytes, int]) -> 'PMO':
|
||||||
|
"""Parse a asn1tools BITSTRING representation."""
|
||||||
|
return cls.from_int(bstr[0][0])
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.op
|
||||||
|
|
||||||
def compile_asn1_subdir(subdir_name:str, codec='der'):
|
def compile_asn1_subdir(subdir_name:str, codec='der'):
|
||||||
"""Helper function that compiles ASN.1 syntax from all files within given subdir"""
|
"""Helper function that compiles ASN.1 syntax from all files within given subdir"""
|
||||||
|
|||||||
Reference in New Issue
Block a user