c++ bpp verification code with pybind11

needs ext building:
$ python3 setup.py build_ext --inplace
This commit is contained in:
Eric Wild
2025-05-22 16:27:14 +02:00
parent c7c48718ba
commit 9ff35c651f
31 changed files with 1626 additions and 70 deletions

3
.gitignore vendored
View File

@@ -10,3 +10,6 @@
/smdpp-data/sm-dp-sessions
dist
tags
*.so
dhparam2048.pem
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem

1076
bsp_python_bindings.cpp Normal file

File diff suppressed because it is too large Load Diff

188
bsp_test_integration.py Normal file
View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python3
# (C) 2025 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
# All Rights Reserved
#
# Author: Eric Wild <ewild@sysmocom.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Integrates C++ BSP implementation for testing getBoundProfilePackage in osmo-smdpp.py
"""
import os
import sys
from typing import Dict, List, Optional, Tuple
from osmocom.utils import h2b, b2h
from osmocom.tlv import bertlv_parse_one_rawtag, bertlv_return_one_rawtlv
import base64
try:
import bsp_crypto
CPP_BSP_AVAILABLE = True
print("C++ BSP module loaded successfully")
except ImportError as e:
CPP_BSP_AVAILABLE = False
print(f"C++ BSP module not available: {e} - Please compile the C++ extension with: python setup.py build_ext --inplace")
class BspTestIntegration:
"""Integration class for testing BSP functionality with C++ implementation"""
def __init__(self):
self.cpp_available = CPP_BSP_AVAILABLE
def parse_bound_profile_package(self, bpp_der: bytes) -> Dict:
def split_bertlv_sequence(sequence: bytes) -> List[bytes]:
"""Split a SEQUENCE OF into individual TLV elements"""
remainder = sequence
ret = []
while remainder:
_tag, _l, tlv, remainder = bertlv_return_one_rawtlv(remainder)
ret.append(tlv)
return ret
# outer BoundProfilePackage structure
tag, _l, v, _remainder = bertlv_parse_one_rawtag(bpp_der)
if len(_remainder):
raise ValueError('Excess data at end of BPP TLV')
if tag != 0xbf36:
raise ValueError(f'Unexpected BPP outer tag: 0x{tag:x}')
result = {}
# InitialiseSecureChannelRequest
tag, _l, iscr_bin, remainder = bertlv_return_one_rawtlv(v)
if tag != 0xbf23: # Expected tag for InitialiseSecureChannelRequest
raise ValueError(f"Unexpected ISCR tag: 0x{tag:x}")
result['iscr'] = iscr_bin
# firstSequenceOf87 (ConfigureISDP)
tag, _l, firstSeqOf87, remainder = bertlv_parse_one_rawtag(remainder)
if tag != 0xa0:
raise ValueError(f"Unexpected 'firstSequenceOf87' tag: 0x{tag:x}")
result['firstSequenceOf87'] = split_bertlv_sequence(firstSeqOf87)
# sequenceOf88 (StoreMetadata)
tag, _l, seqOf88, remainder = bertlv_parse_one_rawtag(remainder)
if tag != 0xa1:
raise ValueError(f"Unexpected 'sequenceOf88' tag: 0x{tag:x}")
result['sequenceOf88'] = split_bertlv_sequence(seqOf88)
# optional secondSequenceOf87 or sequenceOf86
tag, _l, tlv, remainder = bertlv_parse_one_rawtag(remainder)
if tag == 0xa2: # secondSequenceOf87 (ReplaceSessionKeys)
result['secondSequenceOf87'] = split_bertlv_sequence(tlv)
# sequenceOf86
tag2, _l, seqOf86, remainder = bertlv_parse_one_rawtag(remainder)
if tag2 != 0xa3:
raise ValueError(f"Unexpected 'sequenceOf86' tag: 0x{tag2:x}")
result['sequenceOf86'] = split_bertlv_sequence(seqOf86)
elif tag == 0xa3: # straight sequenceOf86 (no ReplaceSessionKeys)
result['secondSequenceOf87'] = []
result['sequenceOf86'] = split_bertlv_sequence(tlv)
else:
raise ValueError(f"Unexpected tag after sequenceOf88: 0x{tag:x}")
if remainder:
raise ValueError("Unexpected data after BPP structure")
return result
def verify_bound_profile_package(self,
shared_secret: bytes,
key_type: int,
key_length: int,
host_id: bytes,
eid: bytes,
bpp_der: bytes,
expected_configure_isdp: Optional[bytes] = None,
expected_store_metadata: Optional[bytes] = None,
expected_profile_data: Optional[bytes] = None) -> Dict:
if not self.cpp_available:
raise RuntimeError("C++ BSP module not available")
parsed = self.parse_bound_profile_package(bpp_der)
print(f"BPP_VERIFY: Parsed BPP with {len(parsed['firstSequenceOf87'])} ConfigureISDP segments")
print(f"BPP_VERIFY: {len(parsed['sequenceOf88'])} StoreMetadata segments")
print(f"BPP_VERIFY: {len(parsed['secondSequenceOf87'])} ReplaceSessionKeys segments")
print(f"BPP_VERIFY: {len(parsed['sequenceOf86'])} profile data segments")
# Convert bytes to lists for C++ - just to be safe
shared_secret_list = list(shared_secret)
host_id_list = list(host_id)
eid_bytes_list = list(eid)
bsp = bsp_crypto.BspCrypto.from_kdf(shared_secret_list, key_type, key_length, host_id_list, eid_bytes_list)
try:
# result = bsp.process_bound_profile_package(
# parsed['firstSequenceOf87'][0],
# parsed['sequenceOf88'][0],
# parsed['secondSequenceOf87'][0],
# parsed['sequenceOf86'][0]
# )
result = bsp.process_bound_profile_package2(bpp_der)
verification_result = {
'success': True,
'error': None,
'configureIsdp': bytes(result['configureIsdp']),
'storeMetadata': bytes(result['storeMetadata']),
'profileData': bytes(result['profileData']),
'hasReplaceSessionKeys': result['hasReplaceSessionKeys']
}
if result['hasReplaceSessionKeys']:
rsk = result['replaceSessionKeys']
verification_result['replaceSessionKeys'] = {
'ppkEnc': bytes(rsk['ppkEnc']),
'ppkCmac': bytes(rsk['ppkCmac']),
'initialMacChainingValue': bytes(rsk['initialMacChainingValue'])
}
verification_result['verification'] = {}
if expected_configure_isdp is not None:
verification_result['verification']['configureIsdp'] = (
verification_result['configureIsdp'] == expected_configure_isdp
)
if expected_store_metadata is not None:
verification_result['verification']['storeMetadata'] = (
verification_result['storeMetadata'] == expected_store_metadata
)
if expected_profile_data is not None:
verification_result['verification']['profileData'] = (
verification_result['profileData'] == expected_profile_data
)
print("BPP_VERIFY: Successfully processed BoundProfilePackage")
print(f"BPP_VERIFY: ConfigureISDP: {len(verification_result['configureIsdp'])} bytes")
print(f"BPP_VERIFY: StoreMetadata: {len(verification_result['storeMetadata'])} bytes")
print(f"BPP_VERIFY: ProfileData: {len(verification_result['profileData'])} bytes")
print(f"BPP_VERIFY: Has ReplaceSessionKeys: {verification_result['hasReplaceSessionKeys']}")
return verification_result
except Exception as e:
return {
'success': False,
'error': str(e),
'configureIsdp': None,
'storeMetadata': None,
'profileData': None,
'hasReplaceSessionKeys': False
}

View File

@@ -17,6 +17,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
import json
import sys
import argparse
@@ -49,11 +55,187 @@ def b64encode2str(req: bytes) -> str:
"""Encode given input bytes as base64 and return result as string."""
return base64.b64encode(req).decode('ascii')
def set_headers(request: IRequest):
"""Set the request headers as mandatory by GSMA eSIM RSP."""
request.setHeader('Content-Type', 'application/json;charset=UTF-8')
request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
def validate_request_headers(request: IRequest):
"""Validate mandatory HTTP headers according to SGP.22."""
content_type = request.getHeader('Content-Type')
if not content_type or not content_type.startswith('application/json'):
raise ApiError('1.2.1', '2.1', 'Invalid Content-Type header')
admin_protocol = request.getHeader('X-Admin-Protocol')
if admin_protocol and not admin_protocol.startswith('gsma/rsp/v'):
raise ApiError('1.2.2', '2.1', 'Unsupported X-Admin-Protocol version')
def get_eum_certificate_variant(eum_cert) -> str:
"""Determine EUM certificate variant by checking Certificate Policies extension.
Returns 'O' for old variant, or 'NEW' for Ov3/A/B/C variants."""
try:
cert_policies_ext = eum_cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.CERTIFICATE_POLICIES
)
for policy in cert_policies_ext.value:
policy_oid = policy.policy_identifier.dotted_string
print(f"Found certificate policy: {policy_oid}")
if policy_oid == '2.23.146.1.2.1.2':
print("Detected EUM certificate variant: O (old)")
return 'O'
elif policy_oid == '2.23.146.1.2.1.0.0.0':
print("Detected EUM certificate variant: Ov3/A/B/C (new)")
return 'NEW'
except x509.ExtensionNotFound:
print("No Certificate Policies extension found")
except Exception as e:
print(f"Error checking certificate policies: {e}")
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
"""Extract permitted IINs from EUM certificate using the appropriate method
based on certificate variant (O vs Ov3/A/B/C).
Returns list of permitted IINs (basically prefixes that valid EIDs must start with)."""
# Determine certificate variant first
cert_variant = get_eum_certificate_variant(eum_cert)
permitted_iins = []
if cert_variant == 'O':
# Old variant - use nameConstraints extension
print("Using nameConstraints parsing for variant O certificate")
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
else:
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
print("Using GSMA permittedEins parsing for newer certificate variant")
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
unique_iins = list(set(permitted_iins))
print(f"Total unique permitted IINs found: {len(unique_iins)}")
return unique_iins
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
"""Parse the GSMA permittedEins extension using correct ASN.1 structure.
PermittedEins ::= SEQUENCE OF PrintableString
Each string contains an IIN (Issuer Identification Number) - a prefix of valid EIDs."""
permitted_iins = []
try:
permitted_eins_oid = x509.ObjectIdentifier('2.23.146.1.2.2.0') # sgp26: 2.23.146.1.2.2.0 = ASN1:SEQUENCE:permittedEins
for ext in eum_cert.extensions:
if ext.oid == permitted_eins_oid:
print(f"Found GSMA permittedEins extension: {ext.oid}")
# Get the DER-encoded extension value
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
if isinstance(ext_der, bytes):
try:
import asn1tools
permitted_eins_schema = """
PermittedEins DEFINITIONS ::= BEGIN
PermittedEins ::= SEQUENCE OF PrintableString
END
"""
decoder = asn1tools.compile_string(permitted_eins_schema)
decoded_strings = decoder.decode('PermittedEins', ext_der)
for iin_string in decoded_strings:
# Each string contains an IIN -> prefix of euicc EID
iin_clean = iin_string.strip().upper()
# IINs is 8 chars per sgp22, var len according to sgp29, fortunately we don't care
if (len(iin_clean) == 8 and
all(c in '0123456789ABCDEF' for c in iin_clean) and
len(iin_clean) % 2 == 0):
permitted_iins.append(iin_clean)
print(f"Found permitted IIN (GSMA): {iin_clean}")
else:
print(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
except Exception as e:
print(f"Error parsing GSMA permittedEins extension: {e}")
except Exception as e:
print(f"Error accessing GSMA certificate extensions: {e}")
return permitted_iins
def _parse_name_constraints_eins(eum_cert) -> List[str]:
"""Parse permitted IINs from nameConstraints extension (variant O)."""
permitted_iins = []
try:
# Look for nameConstraints extension
name_constraints_ext = eum_cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.NAME_CONSTRAINTS
)
print("Found nameConstraints extension (variant O)")
name_constraints = name_constraints_ext.value
# Check permittedSubtrees for IIN constraints
if name_constraints.permitted_subtrees:
for subtree in name_constraints.permitted_subtrees:
print(f"Processing permitted subtree: {subtree}")
if isinstance(subtree, x509.DirectoryName):
for attribute in subtree.value:
# IINs for O in serialNumber
if attribute.oid == x509.oid.NameOID.SERIAL_NUMBER:
serial_value = attribute.value.upper()
# sgp22 8, sgp29 var len, fortunately we don't care
if (len(serial_value) == 8 and
all(c in '0123456789ABCDEF' for c in serial_value) and
len(serial_value) % 2 == 0):
permitted_iins.append(serial_value)
print(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
except x509.ExtensionNotFound:
print("No nameConstraints extension found")
except Exception as e:
print(f"Error parsing nameConstraints: {e}")
return permitted_iins
def validate_eid_range(eid: str, eum_cert) -> bool:
"""Validate that EID is within the permitted EINs of the EUM certificate."""
if not eid or len(eid) != 32:
print(f"Invalid EID format: {eid}")
return False
try:
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
if not permitted_eins:
print("Warning: No permitted EINs found in EUM certificate")
return False
eid_normalized = eid.upper()
print(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
for permitted_ein in permitted_eins:
if eid_normalized.startswith(permitted_ein):
print(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
return True
print(f"EID {eid_normalized} is not in any permitted EIN list")
return False
except Exception as e:
print(f"Error validating EID: {e}")
return False
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
r = {'subjectCode': subject_code, 'reasonCode': reason_code}
if subject_id:
@@ -72,12 +254,6 @@ def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_da
if status_code_data:
js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
from cryptography import x509
def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
"""convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
@@ -116,9 +292,9 @@ class SmDppHttpServer:
with open(os.path.join(dirpath, filename), 'rb') as f:
cert = x509.load_pem_x509_certificate(f.read())
if cert:
# verify it is a CI certificate (keyCertSign + i-rspRole-ci)
if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
# # verify it is a CI certificate (keyCertSign + i-rspRole-ci)
# if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
# raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
certs.append(cert)
return certs
@@ -134,6 +310,20 @@ class SmDppHttpServer:
return cert
return None
def validate_certificate_chain_for_verification(self, euicc_ci_pkid_list: List[bytes]) -> bool:
"""Validate that SM-DP+ has valid certificate chains for the given CI PKIDs."""
for ci_pkid in euicc_ci_pkid_list:
ci_cert = self.ci_get_cert_for_pkid(ci_pkid)
if ci_cert:
# Check if our DPauth certificate chains to this CI
try:
cs = CertificateSet(ci_cert)
cs.verify_cert_chain(self.dp_auth.cert)
return True
except VerifyError:
continue
return False
def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
self.server_hostname = server_hostname
self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
@@ -179,11 +369,10 @@ class SmDppHttpServer:
functionality, such as JSON decoding/encoding and debug-printing."""
@functools.wraps(func)
def _api_wrapper(self, request: IRequest):
# TODO: evaluate User-Agent + X-Admin-Protocol header
# TODO: reject any non-JSON Content-type
validate_request_headers(request)
content = json.loads(request.content.read())
print("Rx JSON: %s" % json.dumps(content))
# print("Rx JSON: %s" % json.dumps(content))
set_headers(request)
output = func(self, request, content)
@@ -191,7 +380,7 @@ class SmDppHttpServer:
return ''
build_resp_header(output)
print("Tx JSON: %s" % json.dumps(output))
# print("Tx JSON: %s" % json.dumps(output))
return json.dumps(output)
return _api_wrapper
@@ -218,6 +407,12 @@ class SmDppHttpServer:
pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
# Validate that SM-DP+ supports certificate chains for verification
# verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', [])
# if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list):
# raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA Certificate with a Public Key supported by the eUICC')
# verify it supports one of the keys indicated by euiccCiPKIdListForSigning
ci_cert = None
for x in pkid_list:
@@ -232,13 +427,6 @@ class SmDppHttpServer:
if not ci_cert:
raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
# TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
# * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
# supported by the eUICC (indicated by euiccCiPKIdListForVerification).
# * Using a certificate chain that the eUICC and the LPA both support:
#euiccInfo1['euiccCiPKIdListForVerification']
# raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
# Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
# SHALL be unique within the scope and lifetime of each SM-DP+.
transactionId = uuid.uuid4().hex.upper()
@@ -330,17 +518,18 @@ class SmDppHttpServer:
raise ApiError('8.1.3', '6.1', 'Verification failed (certificate chain)')
# raise ApiError('8.1.3', '6.3', 'Expired')
# Verify euiccSignature1 over euiccSigned1 using pubkey from euiccCertificate.
# Otherwise, the SM-DP+ SHALL return a status code "eUICC - Verification failed"
if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
# TODO: verify EID of eUICC cert is within permitted range of EUM cert
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
print("EID (from eUICC cert): %s" % ss.eid)
# Verify EID is within permitted range of EUM certificate
if not validate_eid_range(ss.eid, eum_cert):
raise ApiError('8.1.4', '6.1', 'EID is not within the permitted range of the EUM certificate')
# Verify that the serverChallenge attached to the ongoing RSP session matches the
# serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -
# Verification failed".
@@ -444,8 +633,8 @@ class SmDppHttpServer:
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
# extract the public key in (hopefully) the right format for the ES8+ interface
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
# print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
# print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
ss.host_id = b'mahlzeit'
@@ -472,10 +661,24 @@ class SmDppHttpServer:
# update non-volatile state with updated ss object
self.rss[transactionId] = ss
return {
rv = {
'transactionId': transactionId,
'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
}
import bsp_test_integration as integ
integration = integ.BspTestIntegration()
bpp_der = base64.b64decode(rv['boundProfilePackage']) #.decode('ascii')
verification = integration.verify_bound_profile_package(
shared_secret=ss.shared_secret,
key_type=0x88,
key_length=16,
host_id=ss.host_id,
eid=h2b(ss.eid),
bpp_der=bpp_der
)
assert verification['success'], f"BPP verification failed: {verification['error']}"
return rv
@app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
@rsp_api_wrapper
@@ -588,8 +791,55 @@ def main(argv):
args = parser.parse_args()
hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=False)
#hs.app.run(endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
hs.app.run(args.host, args.port)
# hs.app.run(HOSTNAME,endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives import serialization
from pathlib import Path
cert_derpath = Path(DATA_DIR) / 'certs' / 'DPtls' / 'CERT_S_SM_DP_TLS_NIST.der'
cert_pempath = Path(DATA_DIR) / 'certs' / 'DPtls' / 'CERT_S_SM_DP_TLS_NIST.pem'
cert_skpath = Path(DATA_DIR) / 'certs' / 'DPtls' / 'SK_S_SM_DP_TLS_NIST.pem'
dhparam_path = Path("dhparam2048.pem")
if not dhparam_path.exists():
print("Generating dh params, this takes a few seconds..")
# Generate DH parameters with 2048-bit key size and generator 2
parameters = dh.generate_parameters(generator=2, key_size=2048)
pem_data = parameters.parameter_bytes(encoding=serialization.Encoding.PEM,format=serialization.ParameterFormat.PKCS3)
with open(dhparam_path, 'wb') as file:
file.write(pem_data)
print("DH params created successfully")
if not cert_pempath.exists():
print("Translating tls server cert from DER to PEM..")
with open(cert_derpath, 'rb') as der_file:
der_cert_data = der_file.read()
cert = x509.load_der_x509_certificate(der_cert_data)
pem_cert = cert.public_bytes(serialization.Encoding.PEM) #.decode('utf-8')
with open(cert_pempath, 'wb') as pem_file:
pem_file.write(pem_cert)
SERVER_STRING = f'ssl:8000:privateKey={cert_skpath}:certKey={cert_pempath}:dhParameters={dhparam_path}'
print(SERVER_STRING)
hs.app.run(HOSTNAME, endpoint_description=SERVER_STRING)
# hs.app.run(args.host, args.port)
if __name__ == "__main__":
main(sys.argv)
# (.venv) ➜ ~/work/smdp/pysim git:(master) ✗ cp -a ../sgp26/SGP.26_v1.5_Certificates_18_07_2024/SGP.26_v1.5-2024_files/Valid\ Test\ Cases/SM-DP+/DPtls/CERT_S_SM_DP_TLS_NIST.der .
# (.venv) ➜ ~/work/smdp/pysim git:(master) ✗ cp -a ../sgp26/SGP.26_v1.5_Certificates_18_07_2024/SGP.26_v1.5-2024_files/Valid\ Test\ Cases/SM-DP+/DPtls/SK_S_SM_DP_TLS_NIST.pem .
# (.venv) ➜ ~/work/smdp/pysim git:(master) ✗ openssl x509 -inform der -in CERT_S_SM_DP_TLS_NIST.der -out CERT_S_SM_DP_TLS_NIST.pem
# cp -a Variants\ A_B_C/CI/CERT_CI_SIG_* ../pysim/smdpp-data/certs/CertificateIssuer
# cp -a Variants\ A_B_C/CI_subCA/CERT_*_SIG_* ../pysim/smdpp-data/certs/CertificateIssuer
# cp -a Variants\ A_B_C/Variant\ C/SM-DP+/SM_DPauth/CERT* ../pysim/smdpp-data/certs/DPauth
# cp -a Variants\ A_B_C/Variant\ C/SM-DP+/SM_DPpb/CERT* ../pysim/smdpp-data/certs/DPpb
# cp -a Variants\ A_B_C/Variant\ C/SM-DP+/SM_DPtls/CERT* ../pysim/smdpp-data/certs/DPtls
# cp -a Variants\ A_B_C/Variant\ C/EUM_SUB_CA/CERT_EUMSubCA_VARC_SIG_* ../pysim/smdpp-data/certs/intermediate

View File

@@ -149,8 +149,18 @@ class BspAlgoMac(BspAlgo, abc.ABC):
temp_data = self.mac_chain + tag_and_length + data
old_mcv = self.mac_chain
c_mac = self._auth(temp_data)
# DEBUG: Show MAC computation details
print(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
print(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
print(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
print(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
print(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
ret = tag_and_length + data + c_mac
print(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret))
return ret
@@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
s_enc = out[l:2*l]
s_mac = out[l*2:3*l]
print(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
print(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
print(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
print(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
return s_enc, s_mac, initial_mac_chaining_value
@@ -231,9 +246,21 @@ class BspInstance:
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
assert tag <= 255
assert len(plaintext) <= self.max_payload_size
# DEBUG: Show what we're processing
print(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
print(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
print(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
print(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
ciphered = self.c_algo.encrypt(plaintext)
print(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
maced = self.m_algo.auth(tag, ciphered)
print(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
print(f"BSP_DEBUG: final_result_len: {len(maced)}")
return maced
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:

View File

@@ -196,8 +196,12 @@ class BoundProfilePackage(ProfilePackage):
# 'initialiseSecureChannelRequest'
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
# firstSequenceOf87
print(f"BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
print(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
print(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
# sequenceOF88
print(f"BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
if self.ppp: # we have to use session keys

View File

@@ -1,3 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
requires = ["setuptools", "wheel", "pybind11"]
build-backend = "setuptools.build_meta"

View File

@@ -15,3 +15,8 @@ git+https://github.com/osmocom/asn1tools
packaging
git+https://github.com/hologram-io/smpp.pdu
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
pybind11
klein
service-identity
pyopenssl
requests

View File

@@ -1,4 +1,15 @@
from setuptools import setup
from pybind11.setup_helpers import Pybind11Extension, build_ext
ext_modules = [
Pybind11Extension(
"bsp_crypto",
["bsp_python_bindings.cpp"],
libraries=["ssl", "crypto"],
extra_compile_args=["-ggdb", "-O0"],
cxx_std=17,
),
]
setup(
name='pySim',
@@ -34,6 +45,11 @@ setup(
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
"asn1tools",
"smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted",
"pybind11",
"klein",
"service-identity",
"pyopenssl",
"requests",
],
scripts=[
'pySim-prog.py',
@@ -49,4 +65,8 @@ setup(
'asn1/saip/*.asn',
],
},
ext_modules=ext_modules,
cmdclass={"build_ext": build_ext},
zip_safe=False,
python_requires=">=3.6",
)

View File

@@ -1 +0,0 @@
0D AL¶þV¿eRÌÍìAˆHÊt£×ôͺ„nìE<Nåû R¤~&Àk\þ~­ ÉRlÜÛ°Ÿ‰¥7ì¶NŒŽmWø

View File

@@ -1,16 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICgjCCAiigAwIBAgIBCjAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
SVQwHhcNMjUwNDIzMTUyMzA1WhcNMzUwNDIxMTUyMzA1WjAzMQ0wCwYDVQQKDARB
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
DwEB/wQEAwIHgDAgBgNVHSUBAf8EFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwFAYD
VR0gBA0wCzAJBgdngRIBAgEDMB0GA1UdDgQWBBQn/vHyKRh+x4Pt9uApZKRRjVfU
qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSAAw
RQIgYakBZP6y9/2iu9aZkgb89SQgfRb0TKVMGElWgtyoX2gCIQCT8o/TkAxhWCTY
yaBDMi1L9Ub+93ef5s+S+eHLmwuudA==
-----END CERTIFICATE-----