diff --git a/osmo-smdpp.py b/osmo-smdpp.py index c3fd6d57..30514165 100755 --- a/osmo-smdpp.py +++ b/osmo-smdpp.py @@ -47,6 +47,9 @@ from pySim.esim.es8p import * from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError +import logging +logger = logging.getLogger(__name__) + # HACK: make this configurable DATA_DIR = './smdpp-data' HOSTNAME = 'testsmdpplus1.example.com' # must match certificates! @@ -82,18 +85,18 @@ def get_eum_certificate_variant(eum_cert) -> str: for policy in cert_policies_ext.value: policy_oid = policy.policy_identifier.dotted_string - print(f"Found certificate policy: {policy_oid}") + logger.debug(f"Found certificate policy: {policy_oid}") if policy_oid == '2.23.146.1.2.1.2': - print("Detected EUM certificate variant: O (old)") + logger.debug("Detected EUM certificate variant: O (old)") return 'O' elif policy_oid == '2.23.146.1.2.1.0.0.0': - print("Detected EUM certificate variant: Ov3/A/B/C (new)") + logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)") return 'NEW' except x509.ExtensionNotFound: - print("No Certificate Policies extension found") + logger.debug("No Certificate Policies extension found") except Exception as e: - print(f"Error checking certificate policies: {e}") + logger.debug(f"Error checking certificate policies: {e}") def parse_permitted_eins_from_cert(eum_cert) -> List[str]: """Extract permitted IINs from EUM certificate using the appropriate method @@ -106,17 +109,15 @@ def parse_permitted_eins_from_cert(eum_cert) -> List[str]: if cert_variant == 'O': # Old variant - use nameConstraints extension - print("Using nameConstraints parsing for variant O certificate") permitted_iins.extend(_parse_name_constraints_eins(eum_cert)) else: # New variants (Ov3, A, B, C) - use GSMA permittedEins extension - print("Using GSMA permittedEins parsing for newer certificate variant") permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert)) unique_iins = list(set(permitted_iins)) - print(f"Total unique permitted IINs found: {len(unique_iins)}") + logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}") return unique_iins def _parse_gsma_permitted_eins(eum_cert) -> List[str]: @@ -130,7 +131,7 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]: for ext in eum_cert.extensions: if ext.oid == permitted_eins_oid: - print(f"Found GSMA permittedEins extension: {ext.oid}") + logger.debug(f"Found GSMA permittedEins extension: {ext.oid}") # Get the DER-encoded extension value ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value @@ -156,14 +157,14 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]: all(c in '0123456789ABCDEF' for c in iin_clean) and len(iin_clean) % 2 == 0): permitted_iins.append(iin_clean) - print(f"Found permitted IIN (GSMA): {iin_clean}") + logger.debug(f"Found permitted IIN (GSMA): {iin_clean}") else: - print(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})") + logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})") except Exception as e: - print(f"Error parsing GSMA permittedEins extension: {e}") + logger.debug(f"Error parsing GSMA permittedEins extension: {e}") except Exception as e: - print(f"Error accessing GSMA certificate extensions: {e}") + logger.debug(f"Error accessing GSMA certificate extensions: {e}") return permitted_iins @@ -178,13 +179,11 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]: x509.oid.ExtensionOID.NAME_CONSTRAINTS ) - print("Found nameConstraints extension (variant O)") name_constraints = name_constraints_ext.value # Check permittedSubtrees for IIN constraints if name_constraints.permitted_subtrees: for subtree in name_constraints.permitted_subtrees: - print(f"Processing permitted subtree: {subtree}") if isinstance(subtree, x509.DirectoryName): for attribute in subtree.value: @@ -196,12 +195,12 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]: all(c in '0123456789ABCDEF' for c in serial_value) and len(serial_value) % 2 == 0): permitted_iins.append(serial_value) - print(f"Found permitted IIN (nameConstraints/DN): {serial_value}") + logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}") except x509.ExtensionNotFound: - print("No nameConstraints extension found") + logger.debug("No nameConstraints extension found") except Exception as e: - print(f"Error parsing nameConstraints: {e}") + logger.debug(f"Error parsing nameConstraints: {e}") return permitted_iins @@ -209,29 +208,29 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]: def validate_eid_range(eid: str, eum_cert) -> bool: """Validate that EID is within the permitted EINs of the EUM certificate.""" if not eid or len(eid) != 32: - print(f"Invalid EID format: {eid}") + logger.debug(f"Invalid EID format: {eid}") return False try: permitted_eins = parse_permitted_eins_from_cert(eum_cert) if not permitted_eins: - print("Warning: No permitted EINs found in EUM certificate") + logger.debug("Warning: No permitted EINs found in EUM certificate") return False eid_normalized = eid.upper() - print(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs") + logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs") for permitted_ein in permitted_eins: if eid_normalized.startswith(permitted_ein): - print(f"EID {eid_normalized} matches permitted EIN {permitted_ein}") + logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}") return True - print(f"EID {eid_normalized} is not in any permitted EIN list") + logger.debug(f"EID {eid_normalized} is not in any permitted EIN list") return False except Exception as e: - print(f"Error validating EID: {e}") + logger.debug(f"Error validating EID: {e}") return False def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict: @@ -299,11 +298,11 @@ class SmDppHttpServer: def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]: """Find CI certificate for given key identifier.""" for cert in self.ci_certs: - print("cert: %s" % cert) + logger.debug("cert: %s" % cert) subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions)) - print(subject_exts) + logger.debug(subject_exts) subject_pkid = subject_exts[0].value - print(subject_pkid) + logger.debug(subject_pkid) if subject_pkid and subject_pkid.key_identifier == ci_pkid: return cert return None @@ -356,7 +355,7 @@ class SmDppHttpServer: validate_request_headers(request) content = json.loads(request.content.read()) - print("Rx JSON: %s" % json.dumps(content)) + logger.debug("Rx JSON: %s" % json.dumps(content)) set_headers(request) output = func(self, request, content) @@ -364,7 +363,7 @@ class SmDppHttpServer: return '' build_resp_header(output) - print("Tx JSON: %s" % json.dumps(output)) + logger.debug("Tx JSON: %s" % json.dumps(output)) return json.dumps(output) return _api_wrapper @@ -383,7 +382,7 @@ class SmDppHttpServer: euiccInfo1_bin = b64decode(content['euiccInfo1']) euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin) - print("Rx euiccInfo1: %s" % euiccInfo1) + logger.debug("Rx euiccInfo1: %s" % euiccInfo1) #euiccInfo1['svn'] # TODO: If euiccCiPKIdListForSigningV3 is present ... @@ -427,9 +426,9 @@ class SmDppHttpServer: 'serverAddress': self.server_hostname, 'serverChallenge': serverChallenge, } - print("Tx serverSigned1: %s" % serverSigned1) + logger.debug("Tx serverSigned1: %s" % serverSigned1) serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1) - print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin)) + logger.debug("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin)) output = {} output['serverSigned1'] = b64encode2str(serverSigned1_bin) @@ -458,7 +457,7 @@ class SmDppHttpServer: authenticateServerResp_bin = b64decode(content['authenticateServerResponse']) authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin) - print("Rx %s: %s" % authenticateServerResp) + logger.debug("Rx %s: %s" % authenticateServerResp) if authenticateServerResp[0] == 'authenticateResponseError': r_err = authenticateServerResp[1] #r_err['transactionId'] @@ -510,7 +509,7 @@ class SmDppHttpServer: raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)') ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value - print("EID (from eUICC cert): %s" % ss.eid) + logger.debug("EID (from eUICC cert): %s" % ss.eid) # Verify EID is within permitted range of EUM certificate if not validate_eid_range(ss.eid, eum_cert): @@ -589,7 +588,7 @@ class SmDppHttpServer: prepDownloadResp_bin = b64decode(content['prepareDownloadResponse']) prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin) - print("Rx %s: %s" % prepDownloadResp) + logger.debug("Rx %s: %s" % prepDownloadResp) if prepDownloadResp[0] == 'downloadResponseError': r_err = prepDownloadResp[1] @@ -611,23 +610,23 @@ class SmDppHttpServer: # store otPK.EUICC.ECKA in session state ss.euicc_otpk = euiccSigned2['euiccOtpk'] - print("euiccOtpk: %s" % (b2h(ss.euicc_otpk))) + logger.debug("euiccOtpk: %s" % (b2h(ss.euicc_otpk))) # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter # Reference value of CERT.DPpb.ECDDSA - print("curve = %s" % self.dp_pb.get_curve()) + logger.debug("curve = %s" % self.dp_pb.get_curve()) ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve()) # extract the public key in (hopefully) the right format for the ES8+ interface ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint) - print("smdpOtpk: %s" % b2h(ss.smdp_otpk)) - print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption()))) + logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk)) + logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption()))) ss.host_id = b'mahlzeit' # Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk) ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key) - print("shared_secret: %s" % b2h(ss.shared_secret)) + logger.debug("shared_secret: %s" % b2h(ss.shared_secret)) # TODO: Check if this order requires a Confirmation Code verification @@ -661,22 +660,22 @@ class SmDppHttpServer: request.setResponseCode(204) pendingNotification_bin = b64decode(content['pendingNotification']) pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin) - print("Rx %s: %s" % pendingNotification) + logger.debug("Rx %s: %s" % pendingNotification) if pendingNotification[0] == 'profileInstallationResult': profileInstallRes = pendingNotification[1] pird = profileInstallRes['profileInstallationResultData'] transactionId = b2h(pird['transactionId']) ss = self.rss.get(transactionId, None) if ss is None: - print("Unable to find session for transactionId") - return + logger.warning(f"Unable to find session for transactionId: {transactionId}") + return None # Will return HTTP 204 with empty body profileInstallRes['euiccSignPIR'] # TODO: use original data, don't re-encode? pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird) # verify eUICC signature if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin): raise Exception('ECDSA signature verification failed on notification') - print("Profile Installation Final Result: ", pird['finalResult']) + logger.debug("Profile Installation Final Result: %s", pird['finalResult']) # remove session state del self.rss[transactionId] elif pendingNotification[0] == 'otherSignedNotification': @@ -701,7 +700,7 @@ class SmDppHttpServer: iccid = other_notif.get('iccid', None) if iccid: iccid = swap_nibbles(b2h(iccid)) - print("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid)) + logger.debug("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid)) else: raise ValueError(pendingNotification) @@ -714,7 +713,7 @@ class SmDppHttpServer: @rsp_api_wrapper def cancelSession(self, request: IRequest, content: dict) -> dict: """See ES9+ CancelSession in SGP.22 Section 5.6.5""" - print("Rx JSON: %s" % content) + logger.debug("Rx JSON: %s" % content) transactionId = content['transactionId'] # Verify that the received transactionId is known and relates to an ongoing RSP session @@ -724,7 +723,7 @@ class SmDppHttpServer: cancelSessionResponse_bin = b64decode(content['cancelSessionResponse']) cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin) - print("Rx %s: %s" % cancelSessionResponse) + logger.debug("Rx %s: %s" % cancelSessionResponse) if cancelSessionResponse[0] == 'cancelSessionResponseError': # FIXME: print some error @@ -760,8 +759,11 @@ def main(argv): parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000) parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs") parser.add_argument("-s", "--nossl", help="do NOT use ssl", action='store_true', default=False) + parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False) args = parser.parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING) + common_cert_path = os.path.join(DATA_DIR, args.certdir) hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=False) if(args.nossl): diff --git a/pySim/esim/bsp.py b/pySim/esim/bsp.py index fb4c0b37..70e4a1f4 100644 --- a/pySim/esim/bsp.py +++ b/pySim/esim/bsp.py @@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC): block_nr = self.block_nr ciphertext = self._encrypt(padded_data) logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s", - block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext)) + block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20]) return ciphertext def decrypt(self, data:bytes) -> bytes: @@ -149,10 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC): temp_data = self.mac_chain + tag_and_length + data old_mcv = self.mac_chain c_mac = self._auth(temp_data) + + # DEBUG: Show MAC computation details + logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}") + logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}") + logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}") + logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}") + logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}") + # The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value. ret = tag_and_length + data + c_mac + logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}") + logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s", - tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret)) + tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20]) return ret def verify(self, ciphertext: bytes) -> bool: @@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos s_enc = out[l:2*l] s_mac = out[l*2:3*l] + logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}") + logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}") + logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}") + logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}") + return s_enc, s_mac, initial_mac_chaining_value @@ -231,9 +246,21 @@ class BspInstance: """Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext.""" assert tag <= 255 assert len(plaintext) <= self.max_payload_size - logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)) + + # DEBUG: Show what we're processing + logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})") + logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}") + logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}") + logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}") + + logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20]) ciphered = self.c_algo.encrypt(plaintext) + logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}") + maced = self.m_algo.auth(tag, ciphered) + logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}") + logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}") + return maced def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]: diff --git a/pySim/esim/es8p.py b/pySim/esim/es8p.py index 8bd7e14b..9c76084f 100644 --- a/pySim/esim/es8p.py +++ b/pySim/esim/es8p.py @@ -25,6 +25,9 @@ import pySim.esim.rsp as rsp from pySim.esim.bsp import BspInstance from pySim.esim import PMO +import logging +logger = logging.getLogger(__name__) + # Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before # signing, but we have to build parts of it separately first, then sign that, so we can put the signature # into the same sequence as the signed data. We use the existing pySim TLV code for this. @@ -196,8 +199,12 @@ class BoundProfilePackage(ProfilePackage): # 'initialiseSecureChannelRequest' bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr) # firstSequenceOf87 + logger.debug("BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys") + logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}") + logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}") bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin)) # sequenceOF88 + logger.debug("BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys") bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin)) if self.ppp: # we have to use session keys