mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-30 10:12:10 +03:00
smdpp: less verbose by default
Those data blobs are huge. Change-Id: I04a72b8f52417862d4dcba1f0743700dd942ef49
This commit is contained in:
@@ -47,6 +47,9 @@ from pySim.esim.es8p import *
|
|||||||
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
|
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
|
||||||
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
|
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
|
||||||
|
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# HACK: make this configurable
|
# HACK: make this configurable
|
||||||
DATA_DIR = './smdpp-data'
|
DATA_DIR = './smdpp-data'
|
||||||
HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
|
HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
|
||||||
@@ -82,18 +85,18 @@ def get_eum_certificate_variant(eum_cert) -> str:
|
|||||||
|
|
||||||
for policy in cert_policies_ext.value:
|
for policy in cert_policies_ext.value:
|
||||||
policy_oid = policy.policy_identifier.dotted_string
|
policy_oid = policy.policy_identifier.dotted_string
|
||||||
print(f"Found certificate policy: {policy_oid}")
|
logger.debug(f"Found certificate policy: {policy_oid}")
|
||||||
|
|
||||||
if policy_oid == '2.23.146.1.2.1.2':
|
if policy_oid == '2.23.146.1.2.1.2':
|
||||||
print("Detected EUM certificate variant: O (old)")
|
logger.debug("Detected EUM certificate variant: O (old)")
|
||||||
return 'O'
|
return 'O'
|
||||||
elif policy_oid == '2.23.146.1.2.1.0.0.0':
|
elif policy_oid == '2.23.146.1.2.1.0.0.0':
|
||||||
print("Detected EUM certificate variant: Ov3/A/B/C (new)")
|
logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)")
|
||||||
return 'NEW'
|
return 'NEW'
|
||||||
except x509.ExtensionNotFound:
|
except x509.ExtensionNotFound:
|
||||||
print("No Certificate Policies extension found")
|
logger.debug("No Certificate Policies extension found")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error checking certificate policies: {e}")
|
logger.debug(f"Error checking certificate policies: {e}")
|
||||||
|
|
||||||
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
||||||
"""Extract permitted IINs from EUM certificate using the appropriate method
|
"""Extract permitted IINs from EUM certificate using the appropriate method
|
||||||
@@ -106,17 +109,15 @@ def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
|||||||
|
|
||||||
if cert_variant == 'O':
|
if cert_variant == 'O':
|
||||||
# Old variant - use nameConstraints extension
|
# Old variant - use nameConstraints extension
|
||||||
print("Using nameConstraints parsing for variant O certificate")
|
|
||||||
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
|
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
|
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
|
||||||
print("Using GSMA permittedEins parsing for newer certificate variant")
|
|
||||||
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
|
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
|
||||||
|
|
||||||
unique_iins = list(set(permitted_iins))
|
unique_iins = list(set(permitted_iins))
|
||||||
|
|
||||||
print(f"Total unique permitted IINs found: {len(unique_iins)}")
|
logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}")
|
||||||
return unique_iins
|
return unique_iins
|
||||||
|
|
||||||
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
||||||
@@ -130,7 +131,7 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
|||||||
|
|
||||||
for ext in eum_cert.extensions:
|
for ext in eum_cert.extensions:
|
||||||
if ext.oid == permitted_eins_oid:
|
if ext.oid == permitted_eins_oid:
|
||||||
print(f"Found GSMA permittedEins extension: {ext.oid}")
|
logger.debug(f"Found GSMA permittedEins extension: {ext.oid}")
|
||||||
|
|
||||||
# Get the DER-encoded extension value
|
# Get the DER-encoded extension value
|
||||||
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
|
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
|
||||||
@@ -156,14 +157,14 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
|||||||
all(c in '0123456789ABCDEF' for c in iin_clean) and
|
all(c in '0123456789ABCDEF' for c in iin_clean) and
|
||||||
len(iin_clean) % 2 == 0):
|
len(iin_clean) % 2 == 0):
|
||||||
permitted_iins.append(iin_clean)
|
permitted_iins.append(iin_clean)
|
||||||
print(f"Found permitted IIN (GSMA): {iin_clean}")
|
logger.debug(f"Found permitted IIN (GSMA): {iin_clean}")
|
||||||
else:
|
else:
|
||||||
print(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
|
logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error parsing GSMA permittedEins extension: {e}")
|
logger.debug(f"Error parsing GSMA permittedEins extension: {e}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error accessing GSMA certificate extensions: {e}")
|
logger.debug(f"Error accessing GSMA certificate extensions: {e}")
|
||||||
|
|
||||||
return permitted_iins
|
return permitted_iins
|
||||||
|
|
||||||
@@ -178,13 +179,11 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
|||||||
x509.oid.ExtensionOID.NAME_CONSTRAINTS
|
x509.oid.ExtensionOID.NAME_CONSTRAINTS
|
||||||
)
|
)
|
||||||
|
|
||||||
print("Found nameConstraints extension (variant O)")
|
|
||||||
name_constraints = name_constraints_ext.value
|
name_constraints = name_constraints_ext.value
|
||||||
|
|
||||||
# Check permittedSubtrees for IIN constraints
|
# Check permittedSubtrees for IIN constraints
|
||||||
if name_constraints.permitted_subtrees:
|
if name_constraints.permitted_subtrees:
|
||||||
for subtree in name_constraints.permitted_subtrees:
|
for subtree in name_constraints.permitted_subtrees:
|
||||||
print(f"Processing permitted subtree: {subtree}")
|
|
||||||
|
|
||||||
if isinstance(subtree, x509.DirectoryName):
|
if isinstance(subtree, x509.DirectoryName):
|
||||||
for attribute in subtree.value:
|
for attribute in subtree.value:
|
||||||
@@ -196,12 +195,12 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
|||||||
all(c in '0123456789ABCDEF' for c in serial_value) and
|
all(c in '0123456789ABCDEF' for c in serial_value) and
|
||||||
len(serial_value) % 2 == 0):
|
len(serial_value) % 2 == 0):
|
||||||
permitted_iins.append(serial_value)
|
permitted_iins.append(serial_value)
|
||||||
print(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
|
logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
|
||||||
|
|
||||||
except x509.ExtensionNotFound:
|
except x509.ExtensionNotFound:
|
||||||
print("No nameConstraints extension found")
|
logger.debug("No nameConstraints extension found")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error parsing nameConstraints: {e}")
|
logger.debug(f"Error parsing nameConstraints: {e}")
|
||||||
|
|
||||||
return permitted_iins
|
return permitted_iins
|
||||||
|
|
||||||
@@ -209,29 +208,29 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
|||||||
def validate_eid_range(eid: str, eum_cert) -> bool:
|
def validate_eid_range(eid: str, eum_cert) -> bool:
|
||||||
"""Validate that EID is within the permitted EINs of the EUM certificate."""
|
"""Validate that EID is within the permitted EINs of the EUM certificate."""
|
||||||
if not eid or len(eid) != 32:
|
if not eid or len(eid) != 32:
|
||||||
print(f"Invalid EID format: {eid}")
|
logger.debug(f"Invalid EID format: {eid}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
|
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
|
||||||
|
|
||||||
if not permitted_eins:
|
if not permitted_eins:
|
||||||
print("Warning: No permitted EINs found in EUM certificate")
|
logger.debug("Warning: No permitted EINs found in EUM certificate")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
eid_normalized = eid.upper()
|
eid_normalized = eid.upper()
|
||||||
print(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
|
logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
|
||||||
|
|
||||||
for permitted_ein in permitted_eins:
|
for permitted_ein in permitted_eins:
|
||||||
if eid_normalized.startswith(permitted_ein):
|
if eid_normalized.startswith(permitted_ein):
|
||||||
print(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
|
logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
print(f"EID {eid_normalized} is not in any permitted EIN list")
|
logger.debug(f"EID {eid_normalized} is not in any permitted EIN list")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error validating EID: {e}")
|
logger.debug(f"Error validating EID: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
|
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
|
||||||
@@ -299,11 +298,11 @@ class SmDppHttpServer:
|
|||||||
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
|
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
|
||||||
"""Find CI certificate for given key identifier."""
|
"""Find CI certificate for given key identifier."""
|
||||||
for cert in self.ci_certs:
|
for cert in self.ci_certs:
|
||||||
print("cert: %s" % cert)
|
logger.debug("cert: %s" % cert)
|
||||||
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
|
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
|
||||||
print(subject_exts)
|
logger.debug(subject_exts)
|
||||||
subject_pkid = subject_exts[0].value
|
subject_pkid = subject_exts[0].value
|
||||||
print(subject_pkid)
|
logger.debug(subject_pkid)
|
||||||
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
|
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
|
||||||
return cert
|
return cert
|
||||||
return None
|
return None
|
||||||
@@ -356,7 +355,7 @@ class SmDppHttpServer:
|
|||||||
validate_request_headers(request)
|
validate_request_headers(request)
|
||||||
|
|
||||||
content = json.loads(request.content.read())
|
content = json.loads(request.content.read())
|
||||||
print("Rx JSON: %s" % json.dumps(content))
|
logger.debug("Rx JSON: %s" % json.dumps(content))
|
||||||
set_headers(request)
|
set_headers(request)
|
||||||
|
|
||||||
output = func(self, request, content)
|
output = func(self, request, content)
|
||||||
@@ -364,7 +363,7 @@ class SmDppHttpServer:
|
|||||||
return ''
|
return ''
|
||||||
|
|
||||||
build_resp_header(output)
|
build_resp_header(output)
|
||||||
print("Tx JSON: %s" % json.dumps(output))
|
logger.debug("Tx JSON: %s" % json.dumps(output))
|
||||||
return json.dumps(output)
|
return json.dumps(output)
|
||||||
return _api_wrapper
|
return _api_wrapper
|
||||||
|
|
||||||
@@ -383,7 +382,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
euiccInfo1_bin = b64decode(content['euiccInfo1'])
|
euiccInfo1_bin = b64decode(content['euiccInfo1'])
|
||||||
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
|
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
|
||||||
print("Rx euiccInfo1: %s" % euiccInfo1)
|
logger.debug("Rx euiccInfo1: %s" % euiccInfo1)
|
||||||
#euiccInfo1['svn']
|
#euiccInfo1['svn']
|
||||||
|
|
||||||
# TODO: If euiccCiPKIdListForSigningV3 is present ...
|
# TODO: If euiccCiPKIdListForSigningV3 is present ...
|
||||||
@@ -427,9 +426,9 @@ class SmDppHttpServer:
|
|||||||
'serverAddress': self.server_hostname,
|
'serverAddress': self.server_hostname,
|
||||||
'serverChallenge': serverChallenge,
|
'serverChallenge': serverChallenge,
|
||||||
}
|
}
|
||||||
print("Tx serverSigned1: %s" % serverSigned1)
|
logger.debug("Tx serverSigned1: %s" % serverSigned1)
|
||||||
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
|
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
|
||||||
print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
logger.debug("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
||||||
output = {}
|
output = {}
|
||||||
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
|
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
|
||||||
|
|
||||||
@@ -458,7 +457,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
|
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
|
||||||
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
|
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
|
||||||
print("Rx %s: %s" % authenticateServerResp)
|
logger.debug("Rx %s: %s" % authenticateServerResp)
|
||||||
if authenticateServerResp[0] == 'authenticateResponseError':
|
if authenticateServerResp[0] == 'authenticateResponseError':
|
||||||
r_err = authenticateServerResp[1]
|
r_err = authenticateServerResp[1]
|
||||||
#r_err['transactionId']
|
#r_err['transactionId']
|
||||||
@@ -510,7 +509,7 @@ class SmDppHttpServer:
|
|||||||
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
|
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
|
||||||
|
|
||||||
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
|
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
|
||||||
print("EID (from eUICC cert): %s" % ss.eid)
|
logger.debug("EID (from eUICC cert): %s" % ss.eid)
|
||||||
|
|
||||||
# Verify EID is within permitted range of EUM certificate
|
# Verify EID is within permitted range of EUM certificate
|
||||||
if not validate_eid_range(ss.eid, eum_cert):
|
if not validate_eid_range(ss.eid, eum_cert):
|
||||||
@@ -589,7 +588,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
|
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
|
||||||
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
|
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
|
||||||
print("Rx %s: %s" % prepDownloadResp)
|
logger.debug("Rx %s: %s" % prepDownloadResp)
|
||||||
|
|
||||||
if prepDownloadResp[0] == 'downloadResponseError':
|
if prepDownloadResp[0] == 'downloadResponseError':
|
||||||
r_err = prepDownloadResp[1]
|
r_err = prepDownloadResp[1]
|
||||||
@@ -611,23 +610,23 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
# store otPK.EUICC.ECKA in session state
|
# store otPK.EUICC.ECKA in session state
|
||||||
ss.euicc_otpk = euiccSigned2['euiccOtpk']
|
ss.euicc_otpk = euiccSigned2['euiccOtpk']
|
||||||
print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
logger.debug("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||||
|
|
||||||
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
|
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
|
||||||
# Reference value of CERT.DPpb.ECDDSA
|
# Reference value of CERT.DPpb.ECDDSA
|
||||||
print("curve = %s" % self.dp_pb.get_curve())
|
logger.debug("curve = %s" % self.dp_pb.get_curve())
|
||||||
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
|
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
|
||||||
# extract the public key in (hopefully) the right format for the ES8+ interface
|
# extract the public key in (hopefully) the right format for the ES8+ interface
|
||||||
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
||||||
print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||||
print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||||
|
|
||||||
ss.host_id = b'mahlzeit'
|
ss.host_id = b'mahlzeit'
|
||||||
|
|
||||||
# Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
|
# Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
|
||||||
euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
|
euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
|
||||||
ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
|
ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
|
||||||
print("shared_secret: %s" % b2h(ss.shared_secret))
|
logger.debug("shared_secret: %s" % b2h(ss.shared_secret))
|
||||||
|
|
||||||
# TODO: Check if this order requires a Confirmation Code verification
|
# TODO: Check if this order requires a Confirmation Code verification
|
||||||
|
|
||||||
@@ -661,22 +660,22 @@ class SmDppHttpServer:
|
|||||||
request.setResponseCode(204)
|
request.setResponseCode(204)
|
||||||
pendingNotification_bin = b64decode(content['pendingNotification'])
|
pendingNotification_bin = b64decode(content['pendingNotification'])
|
||||||
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
|
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
|
||||||
print("Rx %s: %s" % pendingNotification)
|
logger.debug("Rx %s: %s" % pendingNotification)
|
||||||
if pendingNotification[0] == 'profileInstallationResult':
|
if pendingNotification[0] == 'profileInstallationResult':
|
||||||
profileInstallRes = pendingNotification[1]
|
profileInstallRes = pendingNotification[1]
|
||||||
pird = profileInstallRes['profileInstallationResultData']
|
pird = profileInstallRes['profileInstallationResultData']
|
||||||
transactionId = b2h(pird['transactionId'])
|
transactionId = b2h(pird['transactionId'])
|
||||||
ss = self.rss.get(transactionId, None)
|
ss = self.rss.get(transactionId, None)
|
||||||
if ss is None:
|
if ss is None:
|
||||||
print("Unable to find session for transactionId")
|
logger.warning(f"Unable to find session for transactionId: {transactionId}")
|
||||||
return
|
return None # Will return HTTP 204 with empty body
|
||||||
profileInstallRes['euiccSignPIR']
|
profileInstallRes['euiccSignPIR']
|
||||||
# TODO: use original data, don't re-encode?
|
# TODO: use original data, don't re-encode?
|
||||||
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
||||||
# verify eUICC signature
|
# verify eUICC signature
|
||||||
if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
|
if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
|
||||||
raise Exception('ECDSA signature verification failed on notification')
|
raise Exception('ECDSA signature verification failed on notification')
|
||||||
print("Profile Installation Final Result: ", pird['finalResult'])
|
logger.debug("Profile Installation Final Result: %s", pird['finalResult'])
|
||||||
# remove session state
|
# remove session state
|
||||||
del self.rss[transactionId]
|
del self.rss[transactionId]
|
||||||
elif pendingNotification[0] == 'otherSignedNotification':
|
elif pendingNotification[0] == 'otherSignedNotification':
|
||||||
@@ -701,7 +700,7 @@ class SmDppHttpServer:
|
|||||||
iccid = other_notif.get('iccid', None)
|
iccid = other_notif.get('iccid', None)
|
||||||
if iccid:
|
if iccid:
|
||||||
iccid = swap_nibbles(b2h(iccid))
|
iccid = swap_nibbles(b2h(iccid))
|
||||||
print("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
logger.debug("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
||||||
else:
|
else:
|
||||||
raise ValueError(pendingNotification)
|
raise ValueError(pendingNotification)
|
||||||
|
|
||||||
@@ -714,7 +713,7 @@ class SmDppHttpServer:
|
|||||||
@rsp_api_wrapper
|
@rsp_api_wrapper
|
||||||
def cancelSession(self, request: IRequest, content: dict) -> dict:
|
def cancelSession(self, request: IRequest, content: dict) -> dict:
|
||||||
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
|
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
|
||||||
print("Rx JSON: %s" % content)
|
logger.debug("Rx JSON: %s" % content)
|
||||||
transactionId = content['transactionId']
|
transactionId = content['transactionId']
|
||||||
|
|
||||||
# Verify that the received transactionId is known and relates to an ongoing RSP session
|
# Verify that the received transactionId is known and relates to an ongoing RSP session
|
||||||
@@ -724,7 +723,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
|
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
|
||||||
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
|
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
|
||||||
print("Rx %s: %s" % cancelSessionResponse)
|
logger.debug("Rx %s: %s" % cancelSessionResponse)
|
||||||
|
|
||||||
if cancelSessionResponse[0] == 'cancelSessionResponseError':
|
if cancelSessionResponse[0] == 'cancelSessionResponseError':
|
||||||
# FIXME: print some error
|
# FIXME: print some error
|
||||||
@@ -760,8 +759,11 @@ def main(argv):
|
|||||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
|
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
|
||||||
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
||||||
parser.add_argument("-s", "--nossl", help="do NOT use ssl", action='store_true', default=False)
|
parser.add_argument("-s", "--nossl", help="do NOT use ssl", action='store_true', default=False)
|
||||||
|
parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
|
||||||
|
|
||||||
common_cert_path = os.path.join(DATA_DIR, args.certdir)
|
common_cert_path = os.path.join(DATA_DIR, args.certdir)
|
||||||
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=False)
|
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=False)
|
||||||
if(args.nossl):
|
if(args.nossl):
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
|
|||||||
block_nr = self.block_nr
|
block_nr = self.block_nr
|
||||||
ciphertext = self._encrypt(padded_data)
|
ciphertext = self._encrypt(padded_data)
|
||||||
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
|
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
|
||||||
block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext))
|
block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20])
|
||||||
return ciphertext
|
return ciphertext
|
||||||
|
|
||||||
def decrypt(self, data:bytes) -> bytes:
|
def decrypt(self, data:bytes) -> bytes:
|
||||||
@@ -149,10 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC):
|
|||||||
temp_data = self.mac_chain + tag_and_length + data
|
temp_data = self.mac_chain + tag_and_length + data
|
||||||
old_mcv = self.mac_chain
|
old_mcv = self.mac_chain
|
||||||
c_mac = self._auth(temp_data)
|
c_mac = self._auth(temp_data)
|
||||||
|
|
||||||
|
# DEBUG: Show MAC computation details
|
||||||
|
logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
|
||||||
|
logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
|
||||||
|
logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
|
||||||
|
logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
|
||||||
|
logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
|
||||||
|
|
||||||
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
|
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
|
||||||
ret = tag_and_length + data + c_mac
|
ret = tag_and_length + data + c_mac
|
||||||
|
logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
|
||||||
|
|
||||||
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
|
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
|
||||||
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret))
|
tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20])
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def verify(self, ciphertext: bytes) -> bool:
|
def verify(self, ciphertext: bytes) -> bool:
|
||||||
@@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
|
|||||||
s_enc = out[l:2*l]
|
s_enc = out[l:2*l]
|
||||||
s_mac = out[l*2:3*l]
|
s_mac = out[l*2:3*l]
|
||||||
|
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
|
||||||
|
|
||||||
return s_enc, s_mac, initial_mac_chaining_value
|
return s_enc, s_mac, initial_mac_chaining_value
|
||||||
|
|
||||||
|
|
||||||
@@ -231,9 +246,21 @@ class BspInstance:
|
|||||||
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
|
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||||
assert tag <= 255
|
assert tag <= 255
|
||||||
assert len(plaintext) <= self.max_payload_size
|
assert len(plaintext) <= self.max_payload_size
|
||||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
|
|
||||||
|
# DEBUG: Show what we're processing
|
||||||
|
logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
|
||||||
|
logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
|
||||||
|
logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
|
||||||
|
logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
|
||||||
|
|
||||||
|
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20])
|
||||||
ciphered = self.c_algo.encrypt(plaintext)
|
ciphered = self.c_algo.encrypt(plaintext)
|
||||||
|
logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
|
||||||
|
|
||||||
maced = self.m_algo.auth(tag, ciphered)
|
maced = self.m_algo.auth(tag, ciphered)
|
||||||
|
logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
|
||||||
|
logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}")
|
||||||
|
|
||||||
return maced
|
return maced
|
||||||
|
|
||||||
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
|
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
|
||||||
|
|||||||
@@ -25,6 +25,9 @@ import pySim.esim.rsp as rsp
|
|||||||
from pySim.esim.bsp import BspInstance
|
from pySim.esim.bsp import BspInstance
|
||||||
from pySim.esim import PMO
|
from pySim.esim import PMO
|
||||||
|
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
|
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
|
||||||
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
|
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
|
||||||
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
|
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
|
||||||
@@ -196,8 +199,12 @@ class BoundProfilePackage(ProfilePackage):
|
|||||||
# 'initialiseSecureChannelRequest'
|
# 'initialiseSecureChannelRequest'
|
||||||
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
|
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
|
||||||
# firstSequenceOf87
|
# firstSequenceOf87
|
||||||
|
logger.debug("BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
|
||||||
|
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
|
||||||
|
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
|
||||||
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
|
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
|
||||||
# sequenceOF88
|
# sequenceOF88
|
||||||
|
logger.debug("BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
|
||||||
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
|
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
|
||||||
|
|
||||||
if self.ppp: # we have to use session keys
|
if self.ppp: # we have to use session keys
|
||||||
|
|||||||
Reference in New Issue
Block a user