es9p_client: Move code into a class; do common steps in constructor

This is in preparation of supporting more than just 'download'

Change-Id: I5a165efcb97d9264369a9c6571cd92022cbcdfb0
This commit is contained in:
Harald Welte
2024-07-15 16:52:15 +02:00
parent 0519e2b7e1
commit 9d0c2947f1

View File

@@ -41,19 +41,21 @@ Utility to manually issue requests against the ES9+ API of an SM-DP+ according t
parser.add_argument('--url', required=True, help='Base URL of ES9+ API endpoint') parser.add_argument('--url', required=True, help='Base URL of ES9+ API endpoint')
parser.add_argument('--server-ca-cert', help="""X.509 CA certificates acceptable for the server side. In parser.add_argument('--server-ca-cert', help="""X.509 CA certificates acceptable for the server side. In
production use cases, this would be the GSMA Root CA (CI) certificate.""") production use cases, this would be the GSMA Root CA (CI) certificate.""")
parser.add_argument('--certificate-path', default='.',
help="Path in which to look for certificate and key files.")
parser.add_argument('--euicc-certificate', default='CERT_EUICC_ECDSA_NIST.der',
help="File name of DER-encoded eUICC certificate file.")
parser.add_argument('--euicc-private-key', default='SK_EUICC_ECDSA_NIST.pem',
help="File name of PEM-format eUICC secret key file.")
parser.add_argument('--eum-certificate', default='CERT_EUM_ECDSA_NIST.der',
help="File name of DER-encoded EUM certificate file.")
parser.add_argument('--ci-certificate', default='CERT_CI_ECDSA_NIST.der',
help="File name of DER-encoded CI certificate file.")
subparsers = parser.add_subparsers(dest='command',help="The command (API function) to call", required=True) subparsers = parser.add_subparsers(dest='command',help="The command (API function) to call", required=True)
# download
parser_dl = subparsers.add_parser('download', help="ES9+ download") parser_dl = subparsers.add_parser('download', help="ES9+ download")
parser_dl.add_argument('--certificate-path', default='.',
help="Path in which to look for certificate and key files.")
parser_dl.add_argument('--euicc-certificate', default='CERT_EUICC_ECDSA_NIST.der',
help="File name of DER-encoded eUICC certificate file.")
parser_dl.add_argument('--euicc-private-key', default='SK_EUICC_ECDSA_NIST.pem',
help="File name of PEM-format eUICC secret key file.")
parser_dl.add_argument('--eum-certificate', default='CERT_EUM_ECDSA_NIST.der',
help="File name of DER-encoded EUM certificate file.")
parser_dl.add_argument('--ci-certificate', default='CERT_CI_ECDSA_NIST.der',
help="File name of DER-encoded CI certificate file.")
parser_dl.add_argument('--matchingId', required=True, parser_dl.add_argument('--matchingId', required=True,
help='MatchingID that shall be used by profile download') help='MatchingID that shall be used by profile download')
parser_dl.add_argument('--output-path', default='.', parser_dl.add_argument('--output-path', default='.',
@@ -61,178 +63,183 @@ parser_dl.add_argument('--output-path', default='.',
parser_dl.add_argument('--confirmation-code', parser_dl.add_argument('--confirmation-code',
help="Confirmation Code for the eSIM download") help="Confirmation Code for the eSIM download")
class Es9pClient:
def __init__(self, opts):
self.opts = opts
self.cert_and_key = CertAndPrivkey()
self.cert_and_key.cert_from_der_file(os.path.join(opts.certificate_path, opts.euicc_certificate))
self.cert_and_key.privkey_from_pem_file(os.path.join(opts.certificate_path, opts.euicc_private_key))
def do_download(opts): with open(os.path.join(opts.certificate_path, opts.eum_certificate), 'rb') as f:
self.eum_cert = x509.load_der_x509_certificate(f.read())
cert_and_key = CertAndPrivkey() with open(os.path.join(opts.certificate_path, opts.ci_certificate), 'rb') as f:
cert_and_key.cert_from_der_file(os.path.join(opts.certificate_path, opts.euicc_certificate)) self.ci_cert = x509.load_der_x509_certificate(f.read())
cert_and_key.privkey_from_pem_file(os.path.join(opts.certificate_path, opts.euicc_private_key)) subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), self.ci_cert.extensions))
subject_pkid = subject_exts[0].value
self.ci_pkid = subject_pkid.key_identifier
with open(os.path.join(opts.certificate_path, opts.eum_certificate), 'rb') as f: print("EUICC: %s" % self.cert_and_key.cert.subject)
eum_cert = x509.load_der_x509_certificate(f.read()) print("EUM: %s" % self.eum_cert.subject)
print("CI: %s" % self.ci_cert.subject)
with open(os.path.join(opts.certificate_path, opts.ci_certificate), 'rb') as f: self.eid = self.cert_and_key.cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
ci_cert = x509.load_der_x509_certificate(f.read()) print("EID: %s" % self.eid)
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), ci_cert.extensions)) print("CI PKID: %s" % b2h(self.ci_pkid))
subject_pkid = subject_exts[0].value print()
ci_pkid = subject_pkid.key_identifier
print("EUICC: %s" % cert_and_key.cert.subject) self.peer = es9p.Es9pApiClient(opts.url, server_cert_verify=opts.server_ca_cert)
print("EUM: %s" % eum_cert.subject)
print("CI: %s" % ci_cert.subject)
eid = cert_and_key.cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
print("EID: %s" % eid)
print("CI PKID: %s" % b2h(ci_pkid))
print()
peer = es9p.Es9pApiClient(opts.url, server_cert_verify=opts.server_ca_cert) def do_download(self):
print("Step 1: InitiateAuthentication...") print("Step 1: InitiateAuthentication...")
euiccInfo1 = { euiccInfo1 = {
'svn': b'\x02\x04\x00', 'svn': b'\x02\x04\x00',
'euiccCiPKIdListForVerification': [ 'euiccCiPKIdListForVerification': [
ci_pkid, self.ci_pkid,
], ],
'euiccCiPKIdListForSigning': [ 'euiccCiPKIdListForSigning': [
ci_pkid, self.ci_pkid,
], ],
} }
data = { data = {
'euiccChallenge': os.urandom(16), 'euiccChallenge': os.urandom(16),
'euiccInfo1': euiccInfo1, 'euiccInfo1': euiccInfo1,
'smdpAddress': urlparse(opts.url).netloc, 'smdpAddress': urlparse(self.opts.url).netloc,
} }
init_auth_res = peer.call_initiateAuthentication(data) init_auth_res = self.peer.call_initiateAuthentication(data)
print(init_auth_res) print(init_auth_res)
print("Step 2: AuthenticateClient...") print("Step 2: AuthenticateClient...")
#res['serverSigned1'] #res['serverSigned1']
#res['serverSignature1'] #res['serverSignature1']
print("TODO: verify serverSignature1 over serverSigned1") print("TODO: verify serverSignature1 over serverSigned1")
#res['transactionId'] #res['transactionId']
print("TODO: verify transactionId matches the signed one in serverSigned1") print("TODO: verify transactionId matches the signed one in serverSigned1")
#res['euiccCiPKIdToBeUsed'] #res['euiccCiPKIdToBeUsed']
# TODO: select eUICC certificate based on CI # TODO: select eUICC certificate based on CI
#res['serverCertificate'] #res['serverCertificate']
# TODO: verify server certificate against CI # TODO: verify server certificate against CI
euiccInfo2 = { euiccInfo2 = {
'profileVersion': b'\x02\x03\x01', 'profileVersion': b'\x02\x03\x01',
'svn': euiccInfo1['svn'], 'svn': euiccInfo1['svn'],
'euiccFirmwareVer': b'\x23\x42\x00', 'euiccFirmwareVer': b'\x23\x42\x00',
'extCardResource': b'\x81\x01\x00\x82\x04\x00\x04\x9ch\x83\x02"#', 'extCardResource': b'\x81\x01\x00\x82\x04\x00\x04\x9ch\x83\x02"#',
'uiccCapability': (b'k6\xd3\xc3', 32), 'uiccCapability': (b'k6\xd3\xc3', 32),
'javacardVersion': b'\x11\x02\x00', 'javacardVersion': b'\x11\x02\x00',
'globalplatformVersion': b'\x02\x03\x00', 'globalplatformVersion': b'\x02\x03\x00',
'rspCapability': (b'\x9c', 6), 'rspCapability': (b'\x9c', 6),
'euiccCiPKIdListForVerification': euiccInfo1['euiccCiPKIdListForVerification'], 'euiccCiPKIdListForVerification': euiccInfo1['euiccCiPKIdListForVerification'],
'euiccCiPKIdListForSigning': euiccInfo1['euiccCiPKIdListForSigning'], 'euiccCiPKIdListForSigning': euiccInfo1['euiccCiPKIdListForSigning'],
#'euiccCategory': #'euiccCategory':
#'forbiddenProfilePolicyRules': #'forbiddenProfilePolicyRules':
'ppVersion': b'\x01\x00\x00', 'ppVersion': b'\x01\x00\x00',
'sasAcreditationNumber': 'OSMOCOM-TEST-1', #TODO: make configurable 'sasAcreditationNumber': 'OSMOCOM-TEST-1', #TODO: make configurable
#'certificationDataObject': #'certificationDataObject':
} }
euiccSigned1 = { euiccSigned1 = {
'transactionId': h2b(init_auth_res['transactionId']), 'transactionId': h2b(init_auth_res['transactionId']),
'serverAddress': init_auth_res['serverSigned1']['serverAddress'], 'serverAddress': init_auth_res['serverSigned1']['serverAddress'],
'serverChallenge': init_auth_res['serverSigned1']['serverChallenge'], 'serverChallenge': init_auth_res['serverSigned1']['serverChallenge'],
'euiccInfo2': euiccInfo2, 'euiccInfo2': euiccInfo2,
'ctxParams1': 'ctxParams1':
('ctxParamsForCommonAuthentication', { ('ctxParamsForCommonAuthentication', {
'matchingId': opts.matchingId, 'matchingId': self.opts.matchingId,
'deviceInfo': { 'deviceInfo': {
'tac': b'\x35\x23\x01\x45', # same as lpac 'tac': b'\x35\x23\x01\x45', # same as lpac
'deviceCapabilities': {}, 'deviceCapabilities': {},
#imei: #imei:
} }
}), }),
} }
euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1) euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1)
euiccSignature1 = cert_and_key.ecdsa_sign(euiccSigned1_bin) euiccSignature1 = self.cert_and_key.ecdsa_sign(euiccSigned1_bin)
auth_clnt_req = { auth_clnt_req = {
'transactionId': init_auth_res['transactionId'], 'transactionId': init_auth_res['transactionId'],
'authenticateServerResponse': 'authenticateServerResponse':
('authenticateResponseOk', { ('authenticateResponseOk', {
'euiccSigned1': euiccSigned1, 'euiccSigned1': euiccSigned1,
'euiccSignature1': euiccSignature1, 'euiccSignature1': euiccSignature1,
'euiccCertificate': rsp.asn1.decode('Certificate', cert_and_key.get_cert_as_der()), 'euiccCertificate': rsp.asn1.decode('Certificate', self.cert_and_key.get_cert_as_der()),
'eumCertificate': rsp.asn1.decode('Certificate', eum_cert.public_bytes(Encoding.DER)) 'eumCertificate': rsp.asn1.decode('Certificate', self.eum_cert.public_bytes(Encoding.DER))
}) })
} }
auth_clnt_res = peer.call_authenticateClient(auth_clnt_req) auth_clnt_res = self.peer.call_authenticateClient(auth_clnt_req)
print(auth_clnt_res) print(auth_clnt_res)
#auth_clnt_res['transactionId'] #auth_clnt_res['transactionId']
print("TODO: verify transactionId matches previous ones") print("TODO: verify transactionId matches previous ones")
#auth_clnt_res['profileMetadata'] #auth_clnt_res['profileMetadata']
# TODO: what's in here? # TODO: what's in here?
#auth_clnt_res['smdpSigned2']['bppEuiccOtpk'] #auth_clnt_res['smdpSigned2']['bppEuiccOtpk']
#auth_clnt_res['smdpSignature2'] #auth_clnt_res['smdpSignature2']
print("TODO: verify serverSignature2 over smdpSigned2") print("TODO: verify serverSignature2 over smdpSigned2")
smdp_cert = x509.load_der_x509_certificate(auth_clnt_res['smdpCertificate']) smdp_cert = x509.load_der_x509_certificate(auth_clnt_res['smdpCertificate'])
print("Step 3: GetBoundProfilePackage...") print("Step 3: GetBoundProfilePackage...")
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
# Reference value of CERT.DPpb.ECDSA # Reference value of CERT.DPpb.ECDSA
euicc_ot = ec.generate_private_key(smdp_cert.public_key().public_numbers().curve) euicc_ot = ec.generate_private_key(smdp_cert.public_key().public_numbers().curve)
# extract the public key in (hopefully) the right format for the ES8+ interface # extract the public key in (hopefully) the right format for the ES8+ interface
euicc_otpk = euicc_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint) euicc_otpk = euicc_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
euiccSigned2 = { euiccSigned2 = {
'transactionId': h2b(auth_clnt_res['transactionId']), 'transactionId': h2b(auth_clnt_res['transactionId']),
'euiccOtpk': euicc_otpk, 'euiccOtpk': euicc_otpk,
#hashCC #hashCC
} }
# check for smdpSigned2 ccRequiredFlag, and send it in PrepareDownloadRequest hashCc # check for smdpSigned2 ccRequiredFlag, and send it in PrepareDownloadRequest hashCc
if auth_clnt_res['smdpSigned2']['ccRequiredFlag']: if auth_clnt_res['smdpSigned2']['ccRequiredFlag']:
if not opts.confirmation_code: if not self.opts.confirmation_code:
raise ValueError('Confirmation Code required but not provided') raise ValueError('Confirmation Code required but not provided')
cc_hash = hashlib.sha256(opts.confirmation_code.encode('ascii')).digest() cc_hash = hashlib.sha256(self.opts.confirmation_code.encode('ascii')).digest()
euiccSigned2['hashCc'] = hashlib.sha256(cc_hash + euiccSigned2['transactionId']).digest() euiccSigned2['hashCc'] = hashlib.sha256(cc_hash + euiccSigned2['transactionId']).digest()
euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2) euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2)
euiccSignature2 = cert_and_key.ecdsa_sign(euiccSigned2_bin + auth_clnt_res['smdpSignature2']) euiccSignature2 = self.cert_and_key.ecdsa_sign(euiccSigned2_bin + auth_clnt_res['smdpSignature2'])
gbp_req = { gbp_req = {
'transactionId': auth_clnt_res['transactionId'], 'transactionId': auth_clnt_res['transactionId'],
'prepareDownloadResponse': 'prepareDownloadResponse':
('downloadResponseOk', { ('downloadResponseOk', {
'euiccSigned2': euiccSigned2, 'euiccSigned2': euiccSigned2,
'euiccSignature2': euiccSignature2, 'euiccSignature2': euiccSignature2,
}) })
} }
gbp_res = peer.call_getBoundProfilePackage(gbp_req) gbp_res = self.peer.call_getBoundProfilePackage(gbp_req)
print(gbp_res) print(gbp_res)
#gbp_res['transactionId'] #gbp_res['transactionId']
# TODO: verify transactionId # TODO: verify transactionId
print("TODO: verify transactionId matches previous ones") print("TODO: verify transactionId matches previous ones")
bpp_bin = gbp_res['boundProfilePackage'] bpp_bin = gbp_res['boundProfilePackage']
print("TODO: verify boundProfilePackage smdpSignature") print("TODO: verify boundProfilePackage smdpSignature")
bpp = BoundProfilePackage() bpp = BoundProfilePackage()
upp_bin = bpp.decode(euicc_ot, eid, bpp_bin) upp_bin = bpp.decode(euicc_ot, self.eid, bpp_bin)
iccid = swap_nibbles(b2h(bpp.storeMetadataRequest['iccid'])) iccid = swap_nibbles(b2h(bpp.storeMetadataRequest['iccid']))
base_name = os.path.join(opts.output_path, '%s' % iccid) base_name = os.path.join(self.opts.output_path, '%s' % iccid)
print("SUCCESS: Storing files as %s.*.der" % base_name) print("SUCCESS: Storing files as %s.*.der" % base_name)
# write various output files # write various output files
with open(base_name+'.upp.der', 'wb') as f: with open(base_name+'.upp.der', 'wb') as f:
f.write(bpp.upp) f.write(bpp.upp)
with open(base_name+'.isdp.der', 'wb') as f: with open(base_name+'.isdp.der', 'wb') as f:
f.write(bpp.encoded_configureISDPRequest) f.write(bpp.encoded_configureISDPRequest)
with open(base_name+'.smr.der', 'wb') as f: with open(base_name+'.smr.der', 'wb') as f:
f.write(bpp.encoded_storeMetadataRequest) f.write(bpp.encoded_storeMetadataRequest)
if __name__ == '__main__': if __name__ == '__main__':
opts = parser.parse_args() opts = parser.parse_args()
c = Es9pClient(opts)
if opts.command == 'download': if opts.command == 'download':
do_download(opts) c.do_download()