7 Commits

Author SHA1 Message Date
Eric Wild
4e27b5107b smdpp: proper checks, proper check order, test mode
Change-Id: Ie4db65594b2eaf6c95ffc5e73ff9ae61c4d9d3a3
2025-06-25 10:25:35 +02:00
Eric Wild
0d24f35776 memory backed ephermal session store for easy concurrent runs
Change-Id: I05bfd6ff471ccf1c8c2b5f2b748b9d4125ddd4f7
2025-06-25 10:22:42 +02:00
Eric Wild
89e6e0b0bc smdpp: fix asn1tool OBJECT IDENTIFIER decoding
Change-Id: Ic678e6c4a4c1a01de87a8dce26f4a5e452e8562a
2025-06-25 10:22:42 +02:00
Eric Wild
3196f2fadf smdpp: add proper brp cert support
Change-Id: I6906732f7d193a9c2234075f4a82df5e0ed46100
2025-06-25 10:22:42 +02:00
Eric Wild
f98b1a0080 smdpp: verify cert chain
Change-Id: I1e4e4b1b032dc6a8b7d15bd80d533a50fe0cff15
2025-06-25 10:22:42 +02:00
Eric Wild
dc5fdd34bf x509 cert: fix weird cert check
Change-Id: I18beab0e1b24579724704c4141a2c457b2d4cf99
2025-06-25 10:22:42 +02:00
Eric Wild
67995146eb smdpp: less verbose by default
Those data blobs are huge.

Change-Id: I04a72b8f52417862d4dcba1f0743700dd942ef49
2025-06-25 10:22:42 +02:00
7 changed files with 1069 additions and 138 deletions

3
.gitignore vendored
View File

@@ -7,9 +7,10 @@
/.local /.local
/build /build
/pySim.egg-info /pySim.egg-info
/smdpp-data/sm-dp-sessions /smdpp-data/sm-dp-sessions*
dist dist
tags tags
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
smdpp-data/generated smdpp-data/generated
smdpp-data/certs/dhparam2048.pem smdpp-data/certs/dhparam2048.pem

File diff suppressed because it is too large Load Diff

View File

@@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
block_nr = self.block_nr block_nr = self.block_nr
ciphertext = self._encrypt(padded_data) ciphertext = self._encrypt(padded_data)
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s", logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext)) block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20])
return ciphertext return ciphertext
def decrypt(self, data:bytes) -> bytes: def decrypt(self, data:bytes) -> bytes:
@@ -149,10 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC):
temp_data = self.mac_chain + tag_and_length + data temp_data = self.mac_chain + tag_and_length + data
old_mcv = self.mac_chain old_mcv = self.mac_chain
c_mac = self._auth(temp_data) c_mac = self._auth(temp_data)
# DEBUG: Show MAC computation details
logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value. # The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
ret = tag_and_length + data + c_mac ret = tag_and_length + data + c_mac
logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s", logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret)) tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20])
return ret return ret
def verify(self, ciphertext: bytes) -> bool: def verify(self, ciphertext: bytes) -> bool:
@@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
s_enc = out[l:2*l] s_enc = out[l:2*l]
s_mac = out[l*2:3*l] s_mac = out[l*2:3*l]
logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
return s_enc, s_mac, initial_mac_chaining_value return s_enc, s_mac, initial_mac_chaining_value
@@ -231,9 +246,21 @@ class BspInstance:
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext.""" """Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
assert tag <= 255 assert tag <= 255
assert len(plaintext) <= self.max_payload_size assert len(plaintext) <= self.max_payload_size
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
# DEBUG: Show what we're processing
logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20])
ciphered = self.c_algo.encrypt(plaintext) ciphered = self.c_algo.encrypt(plaintext)
logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
maced = self.m_algo.auth(tag, ciphered) maced = self.m_algo.auth(tag, ciphered)
logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}")
return maced return maced
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]: def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:

View File

@@ -25,6 +25,9 @@ import pySim.esim.rsp as rsp
from pySim.esim.bsp import BspInstance from pySim.esim.bsp import BspInstance
from pySim.esim import PMO from pySim.esim import PMO
import logging
logger = logging.getLogger(__name__)
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before # Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature # signing, but we have to build parts of it separately first, then sign that, so we can put the signature
# into the same sequence as the signed data. We use the existing pySim TLV code for this. # into the same sequence as the signed data. We use the existing pySim TLV code for this.
@@ -196,8 +199,12 @@ class BoundProfilePackage(ProfilePackage):
# 'initialiseSecureChannelRequest' # 'initialiseSecureChannelRequest'
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr) bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
# firstSequenceOf87 # firstSequenceOf87
logger.debug(f"BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin)) bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
# sequenceOF88 # sequenceOF88
logger.debug(f"BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin)) bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
if self.ppp: # we have to use session keys if self.ppp: # we have to use session keys

View File

@@ -94,9 +94,57 @@ class RspSessionState:
self.__dict__.update(state) self.__dict__.update(state)
class RspSessionStore(shelve.DbfilenameShelf): class RspSessionStore:
"""A derived class as wrapper around the database-backed non-volatile storage 'shelve', in case we might """A wrapper around the database-backed storage 'shelve' for storing RspSessionState objects.
need to extend it in the future. We use it to store RspSessionState objects indexed by transactionId.""" Can be configured to use either file-based storage or in-memory storage.
We use it to store RspSessionState objects indexed by transactionId."""
def __init__(self, filename: Optional[str] = None, in_memory: bool = False):
self._in_memory = in_memory
if in_memory:
self._shelf = shelve.Shelf(dict())
else:
if filename is None:
raise ValueError("filename is required for file-based session store")
self._shelf = shelve.open(filename)
# dunder magic
def __getitem__(self, key):
return self._shelf[key]
def __setitem__(self, key, value):
self._shelf[key] = value
def __delitem__(self, key):
del self._shelf[key]
def __contains__(self, key):
return key in self._shelf
def __iter__(self):
return iter(self._shelf)
def __len__(self):
return len(self._shelf)
# everything else
def __getattr__(self, name):
"""Delegate attribute access to the underlying shelf object."""
return getattr(self._shelf, name)
def close(self):
"""Close the session store."""
if hasattr(self._shelf, 'close'):
self._shelf.close()
if self._in_memory:
# For in-memory store, clear the reference
self._shelf = None
def sync(self):
"""Synchronize the cache with the underlying storage."""
if hasattr(self._shelf, 'sync'):
self._shelf.sync()
def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes: def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes:
"""Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This """Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This

View File

@@ -25,6 +25,7 @@ from cryptography.hazmat.primitives.serialization import load_pem_private_key, E
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
from pySim.utils import b2h from pySim.utils import b2h
from . import x509_err
def check_signed(signed: x509.Certificate, signer: x509.Certificate) -> bool: def check_signed(signed: x509.Certificate, signer: x509.Certificate) -> bool:
"""Verify if 'signed' certificate was signed using 'signer'.""" """Verify if 'signed' certificate was signed using 'signer'."""
@@ -64,9 +65,6 @@ class oid:
id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6') id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7') id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
class VerifyError(Exception):
"""An error during certificate verification,"""
class CertificateSet: class CertificateSet:
"""A set of certificates consisting of a trusted [self-signed] CA root certificate, """A set of certificates consisting of a trusted [self-signed] CA root certificate,
and an optional number of intermediate certificates. Can be used to verify the certificate chain and an optional number of intermediate certificates. Can be used to verify the certificate chain
@@ -135,7 +133,7 @@ class CertificateSet:
# we cannot check if there's no CRL # we cannot check if there's no CRL
return return
if self.crl.get_revoked_certificate_by_serial_number(cert.serial_nr): if self.crl.get_revoked_certificate_by_serial_number(cert.serial_nr):
raise VerifyError('Certificate is present in CRL, verification failed') raise x509_err.CertificateRevoked()
def verify_cert_chain(self, cert: x509.Certificate, max_depth: int = 100): def verify_cert_chain(self, cert: x509.Certificate, max_depth: int = 100):
"""Verify if a given certificate's signature chain can be traced back to the root CA of this """Verify if a given certificate's signature chain can be traced back to the root CA of this
@@ -149,14 +147,14 @@ class CertificateSet:
check_signed(c, self.root_cert) check_signed(c, self.root_cert)
return return
parent_cert = self.intermediate_certs.get(aki, None) parent_cert = self.intermediate_certs.get(aki, None)
if not aki: if not parent_cert:
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki)) raise x509_err.MissingIntermediateCert(b2h(aki))
check_signed(c, parent_cert) check_signed(c, parent_cert)
# if we reach here, we passed (no exception raised) # if we reach here, we passed (no exception raised)
c = parent_cert c = parent_cert
depth += 1 depth += 1
if depth > max_depth: if depth > max_depth:
raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth) raise x509_err.MaxDepthExceeded(max_depth, depth)
def ecdsa_dss_to_tr03111(sig: bytes) -> bytes: def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:

58
pySim/esim/x509_err.py Normal file
View File

@@ -0,0 +1,58 @@
"""X.509 certificate verification exceptions for GSMA eSIM."""
class VerifyError(Exception):
"""Base class for certificate verification errors."""
pass
class MissingIntermediateCert(VerifyError):
"""Raised when an intermediate certificate in the chain cannot be found."""
def __init__(self, auth_key_id: str):
self.auth_key_id = auth_key_id
super().__init__(f'Could not find intermediate certificate for AuthKeyId {auth_key_id}')
class CertificateRevoked(VerifyError):
"""Raised when a certificate is found in the CRL."""
def __init__(self, cert_serial: str = None):
self.cert_serial = cert_serial
msg = 'Certificate is present in CRL, verification failed'
if cert_serial:
msg += f' (serial: {cert_serial})'
super().__init__(msg)
class MaxDepthExceeded(VerifyError):
"""Raised when certificate chain depth exceeds the maximum allowed."""
def __init__(self, max_depth: int, actual_depth: int):
self.max_depth = max_depth
self.actual_depth = actual_depth
super().__init__(f'Maximum depth {max_depth} exceeded while verifying certificate chain (actual: {actual_depth})')
class SignatureVerification(VerifyError):
"""Raised when certificate signature verification fails."""
def __init__(self, cert_subject: str = None, signer_subject: str = None):
self.cert_subject = cert_subject
self.signer_subject = signer_subject
msg = 'Certificate signature verification failed'
if cert_subject and signer_subject:
msg += f': {cert_subject} not signed by {signer_subject}'
super().__init__(msg)
class InvalidCertificate(VerifyError):
"""Raised when a certificate is invalid (missing required fields, wrong type, etc)."""
def __init__(self, reason: str):
self.reason = reason
super().__init__(f'Invalid certificate: {reason}')
class CertificateExpired(VerifyError):
"""Raised when a certificate has expired."""
def __init__(self, cert_subject: str = None):
self.cert_subject = cert_subject
msg = 'Certificate has expired'
if cert_subject:
msg += f': {cert_subject}'
super().__init__(msg)