Compare commits
1 Commits
daniel/ota
...
ewild/ppk_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9ff35c651f |
7
.gitignore
vendored
7
.gitignore
vendored
@@ -7,10 +7,9 @@
|
||||
/.local
|
||||
/build
|
||||
/pySim.egg-info
|
||||
/smdpp-data/sm-dp-sessions*
|
||||
/smdpp-data/sm-dp-sessions
|
||||
dist
|
||||
tags
|
||||
*.so
|
||||
dhparam2048.pem
|
||||
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem
|
||||
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
|
||||
smdpp-data/generated
|
||||
smdpp-data/certs/dhparam2048.pem
|
||||
|
||||
1076
bsp_python_bindings.cpp
Normal file
1076
bsp_python_bindings.cpp
Normal file
File diff suppressed because it is too large
Load Diff
188
bsp_test_integration.py
Normal file
188
bsp_test_integration.py
Normal file
@@ -0,0 +1,188 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Eric Wild <ewild@sysmocom.de>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation; either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
"""
|
||||
Integrates C++ BSP implementation for testing getBoundProfilePackage in osmo-smdpp.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from osmocom.utils import h2b, b2h
|
||||
from osmocom.tlv import bertlv_parse_one_rawtag, bertlv_return_one_rawtlv
|
||||
import base64
|
||||
|
||||
try:
|
||||
import bsp_crypto
|
||||
CPP_BSP_AVAILABLE = True
|
||||
print("C++ BSP module loaded successfully")
|
||||
except ImportError as e:
|
||||
CPP_BSP_AVAILABLE = False
|
||||
print(f"C++ BSP module not available: {e} - Please compile the C++ extension with: python setup.py build_ext --inplace")
|
||||
|
||||
class BspTestIntegration:
|
||||
"""Integration class for testing BSP functionality with C++ implementation"""
|
||||
|
||||
def __init__(self):
|
||||
self.cpp_available = CPP_BSP_AVAILABLE
|
||||
|
||||
def parse_bound_profile_package(self, bpp_der: bytes) -> Dict:
|
||||
def split_bertlv_sequence(sequence: bytes) -> List[bytes]:
|
||||
"""Split a SEQUENCE OF into individual TLV elements"""
|
||||
remainder = sequence
|
||||
ret = []
|
||||
while remainder:
|
||||
_tag, _l, tlv, remainder = bertlv_return_one_rawtlv(remainder)
|
||||
ret.append(tlv)
|
||||
return ret
|
||||
|
||||
# outer BoundProfilePackage structure
|
||||
tag, _l, v, _remainder = bertlv_parse_one_rawtag(bpp_der)
|
||||
if len(_remainder):
|
||||
raise ValueError('Excess data at end of BPP TLV')
|
||||
if tag != 0xbf36:
|
||||
raise ValueError(f'Unexpected BPP outer tag: 0x{tag:x}')
|
||||
|
||||
result = {}
|
||||
|
||||
# InitialiseSecureChannelRequest
|
||||
tag, _l, iscr_bin, remainder = bertlv_return_one_rawtlv(v)
|
||||
if tag != 0xbf23: # Expected tag for InitialiseSecureChannelRequest
|
||||
raise ValueError(f"Unexpected ISCR tag: 0x{tag:x}")
|
||||
result['iscr'] = iscr_bin
|
||||
|
||||
# firstSequenceOf87 (ConfigureISDP)
|
||||
tag, _l, firstSeqOf87, remainder = bertlv_parse_one_rawtag(remainder)
|
||||
if tag != 0xa0:
|
||||
raise ValueError(f"Unexpected 'firstSequenceOf87' tag: 0x{tag:x}")
|
||||
result['firstSequenceOf87'] = split_bertlv_sequence(firstSeqOf87)
|
||||
|
||||
# sequenceOf88 (StoreMetadata)
|
||||
tag, _l, seqOf88, remainder = bertlv_parse_one_rawtag(remainder)
|
||||
if tag != 0xa1:
|
||||
raise ValueError(f"Unexpected 'sequenceOf88' tag: 0x{tag:x}")
|
||||
result['sequenceOf88'] = split_bertlv_sequence(seqOf88)
|
||||
|
||||
# optional secondSequenceOf87 or sequenceOf86
|
||||
tag, _l, tlv, remainder = bertlv_parse_one_rawtag(remainder)
|
||||
if tag == 0xa2: # secondSequenceOf87 (ReplaceSessionKeys)
|
||||
result['secondSequenceOf87'] = split_bertlv_sequence(tlv)
|
||||
# sequenceOf86
|
||||
tag2, _l, seqOf86, remainder = bertlv_parse_one_rawtag(remainder)
|
||||
if tag2 != 0xa3:
|
||||
raise ValueError(f"Unexpected 'sequenceOf86' tag: 0x{tag2:x}")
|
||||
result['sequenceOf86'] = split_bertlv_sequence(seqOf86)
|
||||
elif tag == 0xa3: # straight sequenceOf86 (no ReplaceSessionKeys)
|
||||
result['secondSequenceOf87'] = []
|
||||
result['sequenceOf86'] = split_bertlv_sequence(tlv)
|
||||
else:
|
||||
raise ValueError(f"Unexpected tag after sequenceOf88: 0x{tag:x}")
|
||||
|
||||
if remainder:
|
||||
raise ValueError("Unexpected data after BPP structure")
|
||||
|
||||
return result
|
||||
|
||||
def verify_bound_profile_package(self,
|
||||
shared_secret: bytes,
|
||||
key_type: int,
|
||||
key_length: int,
|
||||
host_id: bytes,
|
||||
eid: bytes,
|
||||
bpp_der: bytes,
|
||||
expected_configure_isdp: Optional[bytes] = None,
|
||||
expected_store_metadata: Optional[bytes] = None,
|
||||
expected_profile_data: Optional[bytes] = None) -> Dict:
|
||||
if not self.cpp_available:
|
||||
raise RuntimeError("C++ BSP module not available")
|
||||
|
||||
parsed = self.parse_bound_profile_package(bpp_der)
|
||||
|
||||
print(f"BPP_VERIFY: Parsed BPP with {len(parsed['firstSequenceOf87'])} ConfigureISDP segments")
|
||||
print(f"BPP_VERIFY: {len(parsed['sequenceOf88'])} StoreMetadata segments")
|
||||
print(f"BPP_VERIFY: {len(parsed['secondSequenceOf87'])} ReplaceSessionKeys segments")
|
||||
print(f"BPP_VERIFY: {len(parsed['sequenceOf86'])} profile data segments")
|
||||
|
||||
# Convert bytes to lists for C++ - just to be safe
|
||||
shared_secret_list = list(shared_secret)
|
||||
host_id_list = list(host_id)
|
||||
eid_bytes_list = list(eid)
|
||||
|
||||
bsp = bsp_crypto.BspCrypto.from_kdf(shared_secret_list, key_type, key_length, host_id_list, eid_bytes_list)
|
||||
|
||||
try:
|
||||
# result = bsp.process_bound_profile_package(
|
||||
# parsed['firstSequenceOf87'][0],
|
||||
# parsed['sequenceOf88'][0],
|
||||
# parsed['secondSequenceOf87'][0],
|
||||
# parsed['sequenceOf86'][0]
|
||||
# )
|
||||
|
||||
result = bsp.process_bound_profile_package2(bpp_der)
|
||||
|
||||
verification_result = {
|
||||
'success': True,
|
||||
'error': None,
|
||||
'configureIsdp': bytes(result['configureIsdp']),
|
||||
'storeMetadata': bytes(result['storeMetadata']),
|
||||
'profileData': bytes(result['profileData']),
|
||||
'hasReplaceSessionKeys': result['hasReplaceSessionKeys']
|
||||
}
|
||||
|
||||
if result['hasReplaceSessionKeys']:
|
||||
rsk = result['replaceSessionKeys']
|
||||
verification_result['replaceSessionKeys'] = {
|
||||
'ppkEnc': bytes(rsk['ppkEnc']),
|
||||
'ppkCmac': bytes(rsk['ppkCmac']),
|
||||
'initialMacChainingValue': bytes(rsk['initialMacChainingValue'])
|
||||
}
|
||||
|
||||
verification_result['verification'] = {}
|
||||
if expected_configure_isdp is not None:
|
||||
verification_result['verification']['configureIsdp'] = (
|
||||
verification_result['configureIsdp'] == expected_configure_isdp
|
||||
)
|
||||
if expected_store_metadata is not None:
|
||||
verification_result['verification']['storeMetadata'] = (
|
||||
verification_result['storeMetadata'] == expected_store_metadata
|
||||
)
|
||||
if expected_profile_data is not None:
|
||||
verification_result['verification']['profileData'] = (
|
||||
verification_result['profileData'] == expected_profile_data
|
||||
)
|
||||
|
||||
print("BPP_VERIFY: Successfully processed BoundProfilePackage")
|
||||
print(f"BPP_VERIFY: ConfigureISDP: {len(verification_result['configureIsdp'])} bytes")
|
||||
print(f"BPP_VERIFY: StoreMetadata: {len(verification_result['storeMetadata'])} bytes")
|
||||
print(f"BPP_VERIFY: ProfileData: {len(verification_result['profileData'])} bytes")
|
||||
print(f"BPP_VERIFY: Has ReplaceSessionKeys: {verification_result['hasReplaceSessionKeys']}")
|
||||
|
||||
return verification_result
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e),
|
||||
'configureIsdp': None,
|
||||
'storeMetadata': None,
|
||||
'profileData': None,
|
||||
'hasReplaceSessionKeys': False
|
||||
}
|
||||
@@ -24,12 +24,20 @@ import argparse
|
||||
from Cryptodome.Cipher import AES
|
||||
from osmocom.utils import h2b, b2h, Hexstr
|
||||
|
||||
from pySim.card_key_provider import CardKeyFieldCryptor
|
||||
from pySim.card_key_provider import CardKeyProviderCsv
|
||||
|
||||
class CsvColumnEncryptor(CardKeyFieldCryptor):
|
||||
def dict_keys_to_upper(d: dict) -> dict:
|
||||
return {k.upper():v for k,v in d.items()}
|
||||
|
||||
class CsvColumnEncryptor:
|
||||
def __init__(self, filename: str, transport_keys: dict):
|
||||
self.filename = filename
|
||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||
self.transport_keys = dict_keys_to_upper(transport_keys)
|
||||
|
||||
def encrypt_col(self, colname:str, value: str) -> Hexstr:
|
||||
key = self.transport_keys[colname]
|
||||
cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV)
|
||||
return b2h(cipher.encrypt(h2b(value)))
|
||||
|
||||
def encrypt(self) -> None:
|
||||
with open(self.filename, 'r') as infile:
|
||||
@@ -41,8 +49,9 @@ class CsvColumnEncryptor(CardKeyFieldCryptor):
|
||||
cw.writeheader()
|
||||
|
||||
for row in cr:
|
||||
for fieldname in cr.fieldnames:
|
||||
row[fieldname] = self.crypt.encrypt_field(fieldname, row[fieldname])
|
||||
for key_colname in self.transport_keys:
|
||||
if key_colname in row:
|
||||
row[key_colname] = self.encrypt_col(key_colname, row[key_colname])
|
||||
cw.writerow(row)
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -62,5 +71,9 @@ if __name__ == "__main__":
|
||||
print("You must specify at least one key!")
|
||||
sys.exit(1)
|
||||
|
||||
csv_column_keys = CardKeyProviderCsv.process_transport_keys(csv_column_keys)
|
||||
for name, key in csv_column_keys.items():
|
||||
print("Encrypting column %s using AES key %s" % (name, key))
|
||||
|
||||
cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
|
||||
cce.encrypt()
|
||||
|
||||
@@ -24,7 +24,7 @@ ICCID_HELP='The ICCID of the eSIM that shall be made available'
|
||||
MATCHID_HELP='MatchingID that shall be used by profile download'
|
||||
|
||||
parser = argparse.ArgumentParser(description="""
|
||||
Utility to manually issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
|
||||
Utility to manuall issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
|
||||
parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint')
|
||||
parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+')
|
||||
parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server')
|
||||
@@ -63,7 +63,7 @@ if __name__ == '__main__':
|
||||
data = {}
|
||||
for k, v in vars(opts).items():
|
||||
if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']:
|
||||
# remove keys from dict that should not end up in JSON...
|
||||
# remove keys from dict that shold not end up in JSON...
|
||||
continue
|
||||
if v is not None:
|
||||
data[k] = v
|
||||
|
||||
@@ -68,7 +68,7 @@ parser_dl.add_argument('--confirmation-code',
|
||||
# notification
|
||||
parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification')
|
||||
parser_ntf.add_argument('operation', choices=['enable','disable','delete'],
|
||||
help='Profile Management Operation whoise occurrence shall be notififed')
|
||||
help='Profile Management Opreation whoise occurrence shall be notififed')
|
||||
parser_ntf.add_argument('--sequence-nr', type=int, required=True,
|
||||
help='eUICC global notification sequence number')
|
||||
parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL')
|
||||
@@ -123,8 +123,8 @@ class Es9pClient:
|
||||
'profileManagementOperation': PMO(self.opts.operation).to_bitstring(),
|
||||
'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc,
|
||||
}
|
||||
if self.opts.iccid:
|
||||
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid))
|
||||
if opts.iccid:
|
||||
ntf_metadata['iccid'] = h2b(swap_nibbles(opts.iccid))
|
||||
|
||||
if self.opts.operation == 'download':
|
||||
pird = {
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import argparse
|
||||
from osmocom.utils import h2b, swap_nibbles
|
||||
from pySim.esim.es8p import ProfileMetadata
|
||||
|
||||
parser = argparse.ArgumentParser(description="""Utility program to generate profile metadata in the
|
||||
StoreMetadataRequest format based on input values from the command line.""")
|
||||
parser.add_argument('--iccid', required=True, help="ICCID of eSIM profile");
|
||||
parser.add_argument('--spn', required=True, help="Service Provider Name");
|
||||
parser.add_argument('--profile-name', required=True, help="eSIM Profile Name");
|
||||
parser.add_argument('--profile-class', choices=['test', 'operational', 'provisioning'],
|
||||
default='operational', help="Profile Class");
|
||||
parser.add_argument('--outfile', required=True, help="Output File Name");
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = parser.parse_args()
|
||||
|
||||
iccid_bin = h2b(swap_nibbles(opts.iccid))
|
||||
pmd = ProfileMetadata(iccid_bin, spn=opts.spn, profile_name=opts.profile_name,
|
||||
profile_class=opts.profile_class)
|
||||
|
||||
with open(opts.outfile, 'wb') as f:
|
||||
f.write(pmd.gen_store_metadata_request())
|
||||
print("Written StoreMetadataRequest to '%s'" % opts.outfile)
|
||||
@@ -1,661 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Faithfully reproduces the smdpp certs contained in SGP.26_v1.5_Certificates_18_07_2024.zip
|
||||
available at https://www.gsma.com/solutions-and-impact/technologies/esim/gsma_resources/sgp-26-test-certificate-definition-v1-5/
|
||||
Only usable for testing, it obviously uses a different CI key.
|
||||
"""
|
||||
|
||||
import os
|
||||
import binascii
|
||||
from datetime import datetime
|
||||
from cryptography import x509
|
||||
from cryptography.x509.oid import NameOID, ExtensionOID
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
# Custom OIDs used in certificates
|
||||
OID_CERTIFICATE_POLICIES_CI = "2.23.146.1.2.1.0" # CI cert policy
|
||||
OID_CERTIFICATE_POLICIES_TLS = "2.23.146.1.2.1.3" # DPtls cert policy
|
||||
OID_CERTIFICATE_POLICIES_AUTH = "2.23.146.1.2.1.4" # DPauth cert policy
|
||||
OID_CERTIFICATE_POLICIES_PB = "2.23.146.1.2.1.5" # DPpb cert policy
|
||||
|
||||
# Subject Alternative Name OIDs
|
||||
OID_CI_RID = "2.999.1" # CI Registered ID
|
||||
OID_DP_RID = "2.999.10" # DP+ Registered ID
|
||||
OID_DP2_RID = "2.999.12" # DP+2 Registered ID
|
||||
OID_DP4_RID = "2.999.14" # DP+4 Registered ID
|
||||
OID_DP8_RID = "2.999.18" # DP+8 Registered ID
|
||||
|
||||
|
||||
class SimplifiedCertificateGenerator:
|
||||
def __init__(self):
|
||||
self.backend = default_backend()
|
||||
# Store generated CI keys to sign other certs
|
||||
self.ci_certs = {} # {"BRP": cert, "NIST": cert}
|
||||
self.ci_keys = {} # {"BRP": key, "NIST": key}
|
||||
|
||||
def get_curve(self, curve_type):
|
||||
"""Get the appropriate curve object."""
|
||||
if curve_type == "BRP":
|
||||
return ec.BrainpoolP256R1()
|
||||
else:
|
||||
return ec.SECP256R1()
|
||||
|
||||
def generate_key_pair(self, curve):
|
||||
"""Generate a new EC key pair."""
|
||||
private_key = ec.generate_private_key(curve, self.backend)
|
||||
return private_key
|
||||
|
||||
def load_private_key_from_hex(self, hex_key, curve):
|
||||
"""Load EC private key from hex string."""
|
||||
key_bytes = binascii.unhexlify(hex_key.replace(":", "").replace(" ", "").replace("\n", ""))
|
||||
key_int = int.from_bytes(key_bytes, 'big')
|
||||
return ec.derive_private_key(key_int, curve, self.backend)
|
||||
|
||||
def generate_ci_cert(self, curve_type):
|
||||
"""Generate CI certificate for either BRP or NIST curve."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.generate_key_pair(curve)
|
||||
|
||||
# Build subject and issuer (self-signed) - same for both
|
||||
subject = issuer = x509.Name([
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "Test CI"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "TESTCERT"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSPTEST"),
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "IT"),
|
||||
])
|
||||
|
||||
# Build certificate - all parameters same for both
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(issuer)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 8, 27, 51))
|
||||
builder = builder.not_valid_after(datetime(2055, 4, 1, 8, 27, 51))
|
||||
builder = builder.serial_number(0xb874f3abfa6c44d3)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
# Add extensions - all same for both
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.BasicConstraints(ca=True, path_length=None),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_CI),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=False,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=True,
|
||||
crl_sign=True,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier(OID_CI_RID))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(private_key, hashes.SHA256(), self.backend)
|
||||
|
||||
self.ci_keys[curve_type] = private_key
|
||||
self.ci_certs[curve_type] = certificate
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_dp_cert(self, curve_type, subject_cn, serial, key_hex,
|
||||
cert_policy_oid, rid_oid, validity_start, validity_end):
|
||||
"""Generate a DP certificate signed by CI - works for both BRP and NIST."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(validity_start)
|
||||
builder = builder.not_valid_after(validity_end)
|
||||
builder = builder.serial_number(serial)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(cert_policy_oid),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_tls_cert(self, curve_type, subject_cn, dns_name, serial, key_hex,
|
||||
rid_oid, validity_start, validity_end):
|
||||
"""Generate a TLS certificate signed by CI."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(validity_start)
|
||||
builder = builder.not_valid_after(validity_end)
|
||||
builder = builder.serial_number(serial)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.ExtendedKeyUsage([
|
||||
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
|
||||
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_TLS),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.DNSName(dns_name),
|
||||
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_eum_cert(self, curve_type, key_hex):
|
||||
"""Generate EUM certificate signed by CI."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "EUM Test"),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 28, 37))
|
||||
builder = builder.not_valid_after(datetime(2054, 3, 24, 9, 28, 37))
|
||||
builder = builder.serial_number(0x12345678)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=False,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=True,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier("2.23.146.1.2.1.2"), # EUM policy
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier("2.999.5"))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.BasicConstraints(ca=True, path_length=0),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
# Name Constraints
|
||||
constrained_name = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032"),
|
||||
])
|
||||
|
||||
name_constraints = x509.NameConstraints(
|
||||
permitted_subtrees=[
|
||||
x509.DirectoryName(constrained_name)
|
||||
],
|
||||
excluded_subtrees=None
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
name_constraints,
|
||||
critical=True
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_euicc_cert(self, curve_type, eum_cert, eum_key, key_hex):
|
||||
"""Generate eUICC certificate signed by EUM."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032123451234512345678901235"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "Test eUICC"),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(eum_cert.subject)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 48, 58))
|
||||
builder = builder.not_valid_after(datetime(7496, 1, 24, 9, 48, 58))
|
||||
builder = builder.serial_number(0x0200000000000001)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(eum_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier("2.23.146.1.2.1.1"), # eUICC policy
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
certificate = builder.sign(eum_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def save_cert_and_key(self, cert, key, cert_path_der, cert_path_pem, key_path_sk, key_path_pk):
|
||||
"""Save certificate and key in various formats."""
|
||||
# Create directories if needed
|
||||
os.makedirs(os.path.dirname(cert_path_der), exist_ok=True)
|
||||
|
||||
with open(cert_path_der, "wb") as f:
|
||||
f.write(cert.public_bytes(serialization.Encoding.DER))
|
||||
|
||||
if cert_path_pem:
|
||||
with open(cert_path_pem, "wb") as f:
|
||||
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
if key and key_path_sk:
|
||||
with open(key_path_sk, "wb") as f:
|
||||
f.write(key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
))
|
||||
|
||||
if key and key_path_pk:
|
||||
with open(key_path_pk, "wb") as f:
|
||||
f.write(key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
))
|
||||
|
||||
|
||||
def main():
|
||||
gen = SimplifiedCertificateGenerator()
|
||||
|
||||
output_dir = "smdpp-data/generated"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
print("=== Generating CI Certificates ===")
|
||||
|
||||
for curve_type in ["BRP", "NIST"]:
|
||||
ci_cert, ci_key = gen.generate_ci_cert(curve_type)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
ci_cert, ci_key,
|
||||
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.der",
|
||||
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.pem",
|
||||
None, None
|
||||
)
|
||||
print(f"Generated CI {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPauth Certificates ===")
|
||||
|
||||
dpauth_configs = [
|
||||
("BRP", "TEST SM-DP+", 256, "93:fb:33:d0:58:4f:34:9b:07:f8:b5:d2:af:93:d7:c3:e3:54:b3:49:a3:b9:13:50:2e:6a:bc:07:0e:4d:49:29", OID_DP_RID, "DPauth"),
|
||||
("NIST", "TEST SM-DP+", 256, "0a:7c:c1:c2:44:e6:0c:52:cd:5b:78:07:ab:8c:36:0c:26:52:46:01:50:7d:ca:bc:5d:d5:98:b5:a6:16:d5:d5", OID_DP_RID, "DPauth"),
|
||||
("BRP", "TEST SM-DP+2", 512, "0c:17:35:5c:01:1d:0f:e8:d7:da:dd:63:f1:97:85:cf:6c:51:cb:cd:46:6a:e8:8b:e8:f8:1b:c1:05:88:46:f6", OID_DP2_RID, "DP2auth"),
|
||||
("NIST", "TEST SM-DP+2", 512, "9c:32:a0:95:d4:88:42:d9:ff:a4:04:f7:12:51:2a:a2:c5:42:5a:1a:26:38:6a:b6:a1:45:d5:81:1e:03:91:41", OID_DP2_RID, "DP2auth"),
|
||||
]
|
||||
|
||||
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dpauth_configs:
|
||||
cert, key = gen.generate_dp_cert(
|
||||
curve_type, cn, serial, key_hex,
|
||||
OID_CERTIFICATE_POLICIES_AUTH, rid_oid,
|
||||
datetime(2020, 4, 1, 8, 31, 30),
|
||||
datetime(2030, 3, 30, 8, 31, 30)
|
||||
)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPauth/CERT_S_SM_{name_prefix}{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPauth/SK_S_SM_{name_prefix}{suffix}.pem",
|
||||
f"{output_dir}/DPauth/PK_S_SM_{name_prefix}{suffix}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPpb Certificates ===")
|
||||
|
||||
dppb_configs = [
|
||||
("BRP", "TEST SM-DP+", 257, "75:ff:32:2f:41:66:16:da:e1:a4:84:ef:71:d4:87:4f:b0:df:32:95:fd:35:c2:cb:a4:89:fb:b2:bb:9c:7b:f6", OID_DP_RID, "DPpb"),
|
||||
("NIST", "TEST SM-DP+", 257, "dc:d6:94:b7:78:95:7e:8e:9a:dd:bd:d9:44:33:e9:ef:8f:73:d1:1e:49:1c:48:d4:25:a3:8a:94:91:bd:3b:ed", OID_DP_RID, "DPpb"),
|
||||
("BRP", "TEST SM-DP+2", 513, "9c:ae:2e:1a:56:07:a9:d5:78:38:2e:ee:93:2e:25:1f:52:30:4f:86:ee:b1:f1:70:8c:db:d3:c0:7b:e2:cd:3d", OID_DP2_RID, "DP2pb"),
|
||||
("NIST", "TEST SM-DP+2", 513, "66:93:11:49:63:9d:ba:ac:1d:c3:d3:06:c5:8b:d2:df:d2:2f:73:bf:63:ac:86:31:98:32:90:b5:7f:90:93:45", OID_DP2_RID, "DP2pb"),
|
||||
]
|
||||
|
||||
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dppb_configs:
|
||||
cert, key = gen.generate_dp_cert(
|
||||
curve_type, cn, serial, key_hex,
|
||||
OID_CERTIFICATE_POLICIES_PB, rid_oid,
|
||||
datetime(2020, 4, 1, 8, 34, 46),
|
||||
datetime(2030, 3, 30, 8, 34, 46)
|
||||
)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPpb/CERT_S_SM_{name_prefix}{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPpb/SK_S_SM_{name_prefix}{suffix}.pem",
|
||||
f"{output_dir}/DPpb/PK_S_SM_{name_prefix}{suffix}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPtls Certificates ===")
|
||||
|
||||
dptls_configs = [
|
||||
("BRP", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "3f:67:15:28:02:b3:f4:c7:fa:e6:79:58:55:f6:82:54:1e:45:e3:5e:ff:f4:e8:a0:55:65:a0:f1:91:2a:78:2e", OID_DP_RID, "DP_TLS_BRP"),
|
||||
("NIST", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "a0:3e:7c:e4:55:04:74:be:a4:b7:a8:73:99:ce:5a:8c:9f:66:1b:68:0f:94:01:39:ff:f8:4e:9d:ec:6a:4d:8c", OID_DP_RID, "DP_TLS_NIST"),
|
||||
("NIST", "testsmdpplus2.example.com", "testsmdpplus2.example.com", 12, "4e:65:61:c6:40:88:f6:69:90:7a:db:e3:94:b1:1a:84:24:2e:03:3a:82:a8:84:02:31:63:6d:c9:1b:4e:e3:f5", OID_DP2_RID, "DP2_TLS"),
|
||||
("NIST", "testsmdpplus4.example.com", "testsmdpplus4.example.com", 14, "f2:65:9d:2f:52:8f:4b:11:37:40:d5:8a:0d:2a:f3:eb:2b:48:e1:22:c2:b6:0a:6a:f6:fc:96:ad:86:be:6f:a4", OID_DP4_RID, "DP4_TLS"),
|
||||
("NIST", "testsmdpplus8.example.com", "testsmdpplus8.example.com", 18, "ff:6e:4a:50:9b:ad:db:38:10:88:31:c2:3c:cc:2d:44:30:7a:f2:81:e9:25:96:7f:8c:df:1d:95:54:a0:28:8d", OID_DP8_RID, "DP8_TLS"),
|
||||
]
|
||||
|
||||
for curve_type, cn, dns, serial, key_hex, rid_oid, name_prefix in dptls_configs:
|
||||
cert, key = gen.generate_tls_cert(
|
||||
curve_type, cn, dns, serial, key_hex, rid_oid,
|
||||
datetime(2024, 7, 9, 15, 29, 36),
|
||||
datetime(2025, 8, 11, 15, 29, 36)
|
||||
)
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPtls/CERT_S_SM_{name_prefix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPtls/SK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem",
|
||||
f"{output_dir}/DPtls/PK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} certificate")
|
||||
|
||||
print("\n=== Generating EUM Certificates ===")
|
||||
|
||||
eum_configs = [
|
||||
("BRP", "12:9b:0a:b1:3f:17:e1:4a:40:b6:fa:4e:d8:23:e0:cf:46:5b:7b:3d:73:24:05:e6:29:5d:3b:23:b0:45:c9:9a"),
|
||||
("NIST", "25:e6:75:77:28:e1:e9:51:13:51:9c:dc:34:55:5c:29:ba:ed:23:77:3a:c5:af:dd:dc:da:d9:84:89:8a:52:f0"),
|
||||
]
|
||||
|
||||
eum_certs = {}
|
||||
eum_keys = {}
|
||||
|
||||
for curve_type, key_hex in eum_configs:
|
||||
cert, key = gen.generate_eum_cert(curve_type, key_hex)
|
||||
eum_certs[curve_type] = cert
|
||||
eum_keys[curve_type] = key
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/EUM/CERT_EUM{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/EUM/SK_EUM{suffix}.pem",
|
||||
f"{output_dir}/EUM/PK_EUM{suffix}.pem"
|
||||
)
|
||||
print(f"Generated EUM {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating eUICC Certificates ===")
|
||||
|
||||
euicc_configs = [
|
||||
("BRP", "8d:c3:47:a7:6d:b7:bd:d6:22:2d:d7:5e:a1:a1:68:8a:ca:81:1e:4c:bc:6a:7f:6a:ef:a4:b2:64:19:62:0b:90"),
|
||||
("NIST", "11:e1:54:67:dc:19:4f:33:71:83:e4:60:c9:f6:32:60:09:1e:12:e8:10:26:cd:65:61:e1:7c:6d:85:39:cc:9c"),
|
||||
]
|
||||
|
||||
for curve_type, key_hex in euicc_configs:
|
||||
cert, key = gen.generate_euicc_cert(curve_type, eum_certs[curve_type], eum_keys[curve_type], key_hex)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/eUICC/CERT_EUICC{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/eUICC/SK_EUICC{suffix}.pem",
|
||||
f"{output_dir}/eUICC/PK_EUICC{suffix}.pem"
|
||||
)
|
||||
print(f"Generated eUICC {curve_type} certificate")
|
||||
|
||||
print("\n=== Certificate generation complete! ===")
|
||||
print(f"All certificates saved to: {output_dir}/")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -82,6 +82,10 @@ case "$JOB_TYPE" in
|
||||
|
||||
pip install -r requirements.txt
|
||||
|
||||
# XXX: workaround for https://github.com/python-cmd2/cmd2/issues/1414
|
||||
# 2.4.3 was the last stable release not affected by this bug (OS#6776)
|
||||
pip install cmd2==2.4.3
|
||||
|
||||
rm -rf docs/_build
|
||||
make -C "docs" html latexpdf
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ parser_rpe.add_argument('--output-file', required=True, help='Output file name')
|
||||
parser_rpe.add_argument('--identification', default=[], type=int, action='append', help='Remove PEs matching specified identification')
|
||||
parser_rpe.add_argument('--type', default=[], action='append', help='Remove PEs matching specified type')
|
||||
|
||||
parser_rn = subparsers.add_parser('remove-naa', help='Remove specified NAAs from PE-Sequence')
|
||||
parser_rn = subparsers.add_parser('remove-naa', help='Remove speciifed NAAs from PE-Sequence')
|
||||
parser_rn.add_argument('--output-file', required=True, help='Output file name')
|
||||
parser_rn.add_argument('--naa-type', required=True, choices=NAAs.keys(), help='Network Access Application type to remove')
|
||||
# TODO: add an --naa-index or the like, so only one given instance can be removed
|
||||
|
||||
@@ -27,5 +27,5 @@ PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH add-app-i
|
||||
# Display the contents of the resulting application PE:
|
||||
PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH info --apps
|
||||
|
||||
# For an explanation of --uicc-toolkit-app-spec-pars, see:
|
||||
# For an explaination of --uicc-toolkit-app-spec-pars, see:
|
||||
# ETSI TS 102 226, section 8.2.1.3.2.2.1
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# A more useful version of the 'unber' tool provided with asn1c:
|
||||
# A more useful verion of the 'unber' tool provided with asn1c:
|
||||
# Give a hierarchical decode of BER/DER-encoded ASN.1 TLVs
|
||||
|
||||
import sys
|
||||
|
||||
10
docs/conf.py
10
docs/conf.py
@@ -18,17 +18,9 @@ sys.path.insert(0, os.path.abspath('..'))
|
||||
# -- Project information -----------------------------------------------------
|
||||
|
||||
project = 'osmopysim-usermanual'
|
||||
copyright = '2009-2025 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
copyright = '2009-2023 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
|
||||
# PDF: Avoid that the authors list exceeds the page by inserting '\and'
|
||||
# manually as line break (https://github.com/sphinx-doc/sphinx/issues/6875)
|
||||
latex_elements = {
|
||||
"maketitle":
|
||||
r"""\author{Sylvain Munaut, Harald Welte, Philipp Maier, \and Supreeth Herle, Merlin Chlosta}
|
||||
\sphinxmaketitle
|
||||
"""
|
||||
}
|
||||
|
||||
# -- General configuration ---------------------------------------------------
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ Two modes are possible:
|
||||
Ki and OPc will be generated during each programming cycle. This means fresh keys are generated, even when the
|
||||
``--num`` remains unchanged.
|
||||
|
||||
The parameter ``--num`` specifies a card individual number. This number will be managed into the random seed so that
|
||||
The parameter ``--num`` specifies a card individual number. This number will be manged into the random seed so that
|
||||
it serves as an identifier for a particular set of randomly generated parameters.
|
||||
|
||||
In the example above the parameters ``--mcc``, and ``--mnc`` are specified as well, since they identify the GSM
|
||||
@@ -77,7 +77,7 @@ the parameter ``--type``. The following card types are supported:
|
||||
|
||||
Specifying the card reader:
|
||||
|
||||
It is most common to use ``pySim-prog`` together with a PCSC reader. The PCSC reader number is specified via the
|
||||
It is most common to use ``pySim-prog`` together whith a PCSC reader. The PCSC reader number is specified via the
|
||||
``--pcsc-device`` or ``-p`` option. However, other reader types (such as serial readers and modems) are supported. Use
|
||||
the ``--help`` option of ``pySim-prog`` for more information.
|
||||
|
||||
|
||||
@@ -21,9 +21,9 @@ osmo-smdpp currently
|
||||
|
||||
* [by default] uses test certificates copied from GSMA SGP.26 into `./smdpp-data/certs`, assuming that your
|
||||
osmo-smdpp would be running at the host name `testsmdpplus1.example.com`. You can of course replace those
|
||||
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with matching eUICCs.
|
||||
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with mathcing eUICCs.
|
||||
* doesn't understand profile state. Any profile can always be downloaded any number of times, irrespective
|
||||
of the EID or whether it was downloaded before. This is actually very useful for R&D and testing, as it
|
||||
of the EID or whether it was donwloaded before. This is actually very useful for R&D and testing, as it
|
||||
doesn't require you to generate new profiles all the time. This logic of course is unsuitable for
|
||||
production usage.
|
||||
* doesn't perform any personalization, so the IMSI/ICCID etc. are always identical (the ones that are stored in
|
||||
@@ -40,21 +40,16 @@ osmo-smdpp currently
|
||||
Running osmo-smdpp
|
||||
------------------
|
||||
|
||||
osmo-smdpp comes with built-in TLS support which is enabled by default. However, it is always possible to
|
||||
disable the built-in TLS support if needed.
|
||||
|
||||
In order to use osmo-smdpp without the built-in TLS support, it has to be put behind a TLS reverse proxy,
|
||||
which terminates the ES9+ HTTPS traffic from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
|
||||
|
||||
NOTE: The built in TLS support in osmo-smdpp makes use of the python *twisted* framework. Older versions
|
||||
of this framework appear to have problems when using the example elliptic curve certificates (both NIST and
|
||||
Brainpool) from GSMA.
|
||||
osmo-smdpp does not have built-in TLS support as the used *twisted* framework appears to have
|
||||
problems when using the example elliptic curve certificates (both NIST and Brainpool) from GSMA.
|
||||
|
||||
So in order to use it, you have to put it behind a TLS reverse proxy, which terminates the ES9+
|
||||
HTTPS from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
|
||||
|
||||
nginx as TLS proxy
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If you chose to use `nginx` as TLS reverse proxy, you can use the following configuration snippet::
|
||||
If you use `nginx` as web server, you can use the following configuration snippet::
|
||||
|
||||
upstream smdpp {
|
||||
server localhost:8000;
|
||||
@@ -97,43 +92,32 @@ The `smdpp-data/upp` directory contains the UPP (Unprotected Profile Package) us
|
||||
commandline options
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Typically, you just run osmo-smdpp without any arguments, and it will bind its built-in HTTPS ES9+ interface to
|
||||
`localhost` TCP port 443. In this case an external TLS reverse proxy is not needed.
|
||||
Typically, you just run it without any arguments, and it will bind its plain-HTTP ES9+ interface to
|
||||
`localhost` TCP port 8000.
|
||||
|
||||
osmo-smdpp currently doesn't have any configuration file.
|
||||
|
||||
There are command line options for binding:
|
||||
|
||||
Bind the HTTPS ES9+ to a port other than 443::
|
||||
Bind the HTTP ES9+ to a port other than 8000::
|
||||
|
||||
./osmo-smdpp.py -p 8443
|
||||
|
||||
Disable the built-in TLS support and bind the plain-HTTP ES9+ to a port 8000::
|
||||
|
||||
./osmo-smdpp.py -p 8000 --nossl
|
||||
./osmo-smdpp.py -p 8001
|
||||
|
||||
Bind the HTTP ES9+ to a different local interface::
|
||||
|
||||
./osmo-smdpp.py -H 127.0.0.2
|
||||
./osmo-smdpp.py -H 127.0.0.1
|
||||
|
||||
DNS setup for your LPA
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The LPA must resolve `testsmdpplus1.example.com` to the IP address of your TLS proxy.
|
||||
|
||||
It must also accept the TLS certificates used by your TLS proxy. In case osmo-smdpp is used with built-in TLS support,
|
||||
it will use the certificates provided in smdpp-data.
|
||||
|
||||
NOTE: The HTTPS ES9+ interface cannot be addressed by the LPA directly via its IP address. The reason for this is that
|
||||
the included SGP.26 (DPtls) test certificates explicitly restrict the hostname to `testsmdpplus1.example.com` in the
|
||||
`X509v3 Subject Alternative Name` extension. Using a bare IP address as hostname may cause the certificate to be
|
||||
rejected by the LPA.
|
||||
|
||||
It must also accept the TLS certificates used by your TLS proxy.
|
||||
|
||||
Supported eUICC
|
||||
~~~~~~~~~~~~~~~
|
||||
|
||||
If you run osmo-smdpp with the included SGP.26 (DPauth, DPpb) certificates, you must use an eUICC with matching SGP.26
|
||||
If you run osmo-smdpp with the included SGP.26 certificates, you must use an eUICC with matching SGP.26
|
||||
certificates, i.e. the EUM certificate must be signed by a SGP.26 test root CA and the eUICC certificate
|
||||
in turn must be signed by that SGP.26 EUM certificate.
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ The response body is a JSON document, either
|
||||
#. key freshness failure
|
||||
#. unspecified card error
|
||||
|
||||
Example (success):
|
||||
Example (succcess):
|
||||
::
|
||||
|
||||
{
|
||||
|
||||
@@ -17,7 +17,7 @@ In any case, in order to operate a SUCI-enabled 5G SA network, you will have to
|
||||
#. deploy the public key on your USIMs
|
||||
#. deploy the private key on your 5GC, specifically the UDM function
|
||||
|
||||
pysim contains (in its `contrib` directory) a small utility program that can make it easy to generate
|
||||
pysim contains (int its `contrib` directory) a small utility program that can make it easy to generate
|
||||
such keys: `suci-keytool.py`
|
||||
|
||||
Generating keys
|
||||
|
||||
@@ -30,7 +30,7 @@ This guide covers the basic workflow of provisioning SIM cards with the 5G SUCI
|
||||
|
||||
For specific information on sysmocom SIM cards, refer to
|
||||
|
||||
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the current
|
||||
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the curent
|
||||
sysmoISIM-SJA5 product
|
||||
* the `sysmoISIM-SJA2 User Manual <https://sysmocom.de/manuals/sysmousim-manual.pdf>`__ for the older
|
||||
sysmoISIM-SJA2 product
|
||||
|
||||
46
osmo-ras.py
46
osmo-ras.py
@@ -1,46 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Remote Application Server for Remote Application Management over HTTP
|
||||
# See Amendment B of the GlobalPlatform Card Specification v2.2
|
||||
#
|
||||
# (C) 2025 sysmocom s.f.m.c.
|
||||
# Author: Daniel Willmann <dwillmann@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from http.server import HTTPServer, SimpleHTTPRequestHandler
|
||||
from ssl import PROTOCOL_TLS_SERVER, SSLContext, TLSVersion
|
||||
|
||||
|
||||
context = SSLContext(PROTOCOL_TLS_SERVER)
|
||||
context.maximum_version = TLSVersion.TLSv1_2
|
||||
CIPHERS_1_0 = "TLS_PSK_WITH_3DES_EDE_CBC_SHA,TLS_PSK_WITH_AES_128_CBC_SHA,TLS_PSK_WITH_NULL_SHA"
|
||||
CIPHERS_1_2 = "TLS_PSK_WITH_AES_128_CBC_SHA256,TLS_PSK_WITH_NULL_SHA256"
|
||||
context.set_ciphers(CIPHERS_1_2)
|
||||
|
||||
# A table using the identity of the client:
|
||||
psk_table = { 'ClientId_1': bytes.fromhex('c0ffee'),
|
||||
'ClientId_2': bytes.fromhex('facade')
|
||||
}
|
||||
|
||||
def get_psk(ident):
|
||||
""" Get the PSK for the client """
|
||||
print(f"Get PSK for {ident}")
|
||||
return psk_table.get(ident, b'')
|
||||
|
||||
context.set_psk_server_callback(get_psk)
|
||||
|
||||
server = HTTPServer(("0.0.0.0", 8080), SimpleHTTPRequestHandler)
|
||||
server.socket = context.wrap_socket(server.socket, server_side=True)
|
||||
server.serve_forever()
|
||||
418
osmo-smdpp.py
418
osmo-smdpp.py
@@ -17,139 +17,51 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# asn1tools issue https://github.com/eerimoq/asn1tools/issues/194
|
||||
# must be first here
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
|
||||
from cryptography import x509
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
import uuid
|
||||
import os
|
||||
import functools
|
||||
from typing import Optional, Dict, List
|
||||
from pprint import pprint as pp
|
||||
|
||||
import base64
|
||||
from base64 import b64decode
|
||||
from klein import Klein
|
||||
from twisted.web.iweb import IRequest
|
||||
import asn1tools
|
||||
import asn1tools.codecs.ber
|
||||
import asn1tools.codecs.der
|
||||
# do not move the code
|
||||
def fix_asn1_oid_decoding():
|
||||
fix_asn1_schema = """
|
||||
TestModule DEFINITIONS ::= BEGIN
|
||||
TestOid ::= SEQUENCE {
|
||||
oid OBJECT IDENTIFIER
|
||||
}
|
||||
END
|
||||
"""
|
||||
|
||||
fix_asn1_asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
|
||||
fix_asn1_oid_string = '2.999.10'
|
||||
fix_asn1_encoded = fix_asn1_asn1.encode('TestOid', {'oid': fix_asn1_oid_string})
|
||||
fix_asn1_decoded = fix_asn1_asn1.decode('TestOid', fix_asn1_encoded)
|
||||
from osmocom.utils import h2b, b2h, swap_nibbles
|
||||
|
||||
if (fix_asn1_decoded['oid'] != fix_asn1_oid_string):
|
||||
# ASN.1 OBJECT IDENTIFIER Decoding Issue:
|
||||
#
|
||||
# In ASN.1 BER/DER encoding, the first two arcs of an OBJECT IDENTIFIER are
|
||||
# combined into a single value: (40 * arc0) + arc1. This is encoded as a base-128
|
||||
# variable-length quantity (and commonly known as VLQ or base-128 encoding)
|
||||
# as specified in ITU-T X.690 §8.19, it can span multiple bytes if
|
||||
# the value is large.
|
||||
#
|
||||
# For arc0 = 0 or 1, arc1 must be in [0, 39]. For arc0 = 2, arc1 can be any non-negative integer.
|
||||
# All subsequent arcs (arc2, arc3, ...) are each encoded as a separate base-128 VLQ.
|
||||
#
|
||||
# The decoding bug occurs when the decoder does not properly split the first
|
||||
# subidentifier for arc0 = 2 and arc1 >= 40. Instead of decoding:
|
||||
# - arc0 = 2
|
||||
# - arc1 = (first_subidentifier - 80)
|
||||
# it may incorrectly interpret the first_subidentifier as arc0 = (first_subidentifier // 40),
|
||||
# arc1 = (first_subidentifier % 40), which is only valid for arc1 < 40.
|
||||
#
|
||||
# This patch handles it properly for all valid OBJECT IDENTIFIERs
|
||||
# with large second arcs, by applying the ASN.1 rules:
|
||||
# - if first_subidentifier < 40: arc0 = 0, arc1 = first_subidentifier
|
||||
# - elif first_subidentifier < 80: arc0 = 1, arc1 = first_subidentifier - 40
|
||||
# - else: arc0 = 2, arc1 = first_subidentifier - 80
|
||||
#
|
||||
# This problem is not uncommon, see for example https://github.com/randombit/botan/issues/4023
|
||||
|
||||
def fixed_decode_object_identifier(data, offset, end_offset):
|
||||
"""Decode ASN.1 OBJECT IDENTIFIER from bytes to dotted string, fixing large second arc handling."""
|
||||
def read_subidentifier(data, offset):
|
||||
value = 0
|
||||
while True:
|
||||
b = data[offset]
|
||||
value = (value << 7) | (b & 0x7F)
|
||||
offset += 1
|
||||
if not (b & 0x80):
|
||||
break
|
||||
return value, offset
|
||||
|
||||
subid, offset = read_subidentifier(data, offset)
|
||||
if subid < 40:
|
||||
first = 0
|
||||
second = subid
|
||||
elif subid < 80:
|
||||
first = 1
|
||||
second = subid - 40
|
||||
else:
|
||||
first = 2
|
||||
second = subid - 80
|
||||
arcs = [first, second]
|
||||
|
||||
while offset < end_offset:
|
||||
subid, offset = read_subidentifier(data, offset)
|
||||
arcs.append(subid)
|
||||
|
||||
return '.'.join(str(x) for x in arcs)
|
||||
|
||||
asn1tools.codecs.ber.decode_object_identifier = fixed_decode_object_identifier
|
||||
asn1tools.codecs.der.decode_object_identifier = fixed_decode_object_identifier
|
||||
|
||||
# test our patch
|
||||
asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
|
||||
decoded = asn1.decode('TestOid', fix_asn1_encoded)['oid']
|
||||
assert fix_asn1_oid_string == str(decoded)
|
||||
|
||||
fix_asn1_oid_decoding()
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature # noqa: E402
|
||||
from cryptography import x509 # noqa: E402
|
||||
from cryptography.exceptions import InvalidSignature # noqa: E402
|
||||
from cryptography.hazmat.primitives import hashes # noqa: E402
|
||||
from cryptography.hazmat.primitives.asymmetric import ec, dh # noqa: E402
|
||||
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption, ParameterFormat # noqa: E402
|
||||
from pathlib import Path # noqa: E402
|
||||
import json # noqa: E402
|
||||
import sys # noqa: E402
|
||||
import argparse # noqa: E402
|
||||
import uuid # noqa: E402
|
||||
import os # noqa: E402
|
||||
import functools # noqa: E402
|
||||
from typing import Optional, Dict, List # noqa: E402
|
||||
from pprint import pprint as pp # noqa: E402
|
||||
|
||||
import base64 # noqa: E402
|
||||
from base64 import b64decode # noqa: E402
|
||||
from klein import Klein # noqa: E402
|
||||
from twisted.web.iweb import IRequest # noqa: E402
|
||||
|
||||
from osmocom.utils import h2b, b2h, swap_nibbles # noqa: E402
|
||||
|
||||
import pySim.esim.rsp as rsp # noqa: E402
|
||||
from pySim.esim import saip, PMO # noqa: E402
|
||||
from pySim.esim.es8p import ProfileMetadata,UnprotectedProfilePackage,ProtectedProfilePackage,BoundProfilePackage,BspInstance # noqa: E402
|
||||
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id # noqa: E402
|
||||
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError # noqa: E402
|
||||
|
||||
import logging # noqa: E402
|
||||
logger = logging.getLogger(__name__)
|
||||
import pySim.esim.rsp as rsp
|
||||
from pySim.esim import saip, PMO
|
||||
from pySim.esim.es8p import *
|
||||
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
|
||||
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
|
||||
|
||||
# HACK: make this configurable
|
||||
DATA_DIR = './smdpp-data'
|
||||
HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
|
||||
HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
|
||||
|
||||
|
||||
def b64encode2str(req: bytes) -> str:
|
||||
"""Encode given input bytes as base64 and return result as string."""
|
||||
return base64.b64encode(req).decode('ascii')
|
||||
|
||||
|
||||
def set_headers(request: IRequest):
|
||||
"""Set the request headers as mandatory by GSMA eSIM RSP."""
|
||||
request.setHeader('Content-Type', 'application/json;charset=UTF-8')
|
||||
request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
|
||||
|
||||
|
||||
def validate_request_headers(request: IRequest):
|
||||
"""Validate mandatory HTTP headers according to SGP.22."""
|
||||
content_type = request.getHeader('Content-Type')
|
||||
@@ -171,18 +83,18 @@ def get_eum_certificate_variant(eum_cert) -> str:
|
||||
|
||||
for policy in cert_policies_ext.value:
|
||||
policy_oid = policy.policy_identifier.dotted_string
|
||||
logger.debug(f"Found certificate policy: {policy_oid}")
|
||||
print(f"Found certificate policy: {policy_oid}")
|
||||
|
||||
if policy_oid == '2.23.146.1.2.1.2':
|
||||
logger.debug("Detected EUM certificate variant: O (old)")
|
||||
print("Detected EUM certificate variant: O (old)")
|
||||
return 'O'
|
||||
elif policy_oid == '2.23.146.1.2.1.0.0.0':
|
||||
logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)")
|
||||
print("Detected EUM certificate variant: Ov3/A/B/C (new)")
|
||||
return 'NEW'
|
||||
except x509.ExtensionNotFound:
|
||||
logger.debug("No Certificate Policies extension found")
|
||||
print("No Certificate Policies extension found")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error checking certificate policies: {e}")
|
||||
print(f"Error checking certificate policies: {e}")
|
||||
|
||||
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
||||
"""Extract permitted IINs from EUM certificate using the appropriate method
|
||||
@@ -195,15 +107,17 @@ def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
||||
|
||||
if cert_variant == 'O':
|
||||
# Old variant - use nameConstraints extension
|
||||
print("Using nameConstraints parsing for variant O certificate")
|
||||
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
|
||||
|
||||
else:
|
||||
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
|
||||
print("Using GSMA permittedEins parsing for newer certificate variant")
|
||||
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
|
||||
|
||||
unique_iins = list(set(permitted_iins))
|
||||
|
||||
logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}")
|
||||
print(f"Total unique permitted IINs found: {len(unique_iins)}")
|
||||
return unique_iins
|
||||
|
||||
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
||||
@@ -217,13 +131,15 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
||||
|
||||
for ext in eum_cert.extensions:
|
||||
if ext.oid == permitted_eins_oid:
|
||||
logger.debug(f"Found GSMA permittedEins extension: {ext.oid}")
|
||||
print(f"Found GSMA permittedEins extension: {ext.oid}")
|
||||
|
||||
# Get the DER-encoded extension value
|
||||
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
|
||||
|
||||
if isinstance(ext_der, bytes):
|
||||
try:
|
||||
import asn1tools
|
||||
|
||||
permitted_eins_schema = """
|
||||
PermittedEins DEFINITIONS ::= BEGIN
|
||||
PermittedEins ::= SEQUENCE OF PrintableString
|
||||
@@ -241,14 +157,14 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
||||
all(c in '0123456789ABCDEF' for c in iin_clean) and
|
||||
len(iin_clean) % 2 == 0):
|
||||
permitted_iins.append(iin_clean)
|
||||
logger.debug(f"Found permitted IIN (GSMA): {iin_clean}")
|
||||
print(f"Found permitted IIN (GSMA): {iin_clean}")
|
||||
else:
|
||||
logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
|
||||
print(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error parsing GSMA permittedEins extension: {e}")
|
||||
print(f"Error parsing GSMA permittedEins extension: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error accessing GSMA certificate extensions: {e}")
|
||||
print(f"Error accessing GSMA certificate extensions: {e}")
|
||||
|
||||
return permitted_iins
|
||||
|
||||
@@ -263,11 +179,13 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
||||
x509.oid.ExtensionOID.NAME_CONSTRAINTS
|
||||
)
|
||||
|
||||
print("Found nameConstraints extension (variant O)")
|
||||
name_constraints = name_constraints_ext.value
|
||||
|
||||
# Check permittedSubtrees for IIN constraints
|
||||
if name_constraints.permitted_subtrees:
|
||||
for subtree in name_constraints.permitted_subtrees:
|
||||
print(f"Processing permitted subtree: {subtree}")
|
||||
|
||||
if isinstance(subtree, x509.DirectoryName):
|
||||
for attribute in subtree.value:
|
||||
@@ -279,12 +197,12 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
||||
all(c in '0123456789ABCDEF' for c in serial_value) and
|
||||
len(serial_value) % 2 == 0):
|
||||
permitted_iins.append(serial_value)
|
||||
logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
|
||||
print(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
|
||||
|
||||
except x509.ExtensionNotFound:
|
||||
logger.debug("No nameConstraints extension found")
|
||||
print("No nameConstraints extension found")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error parsing nameConstraints: {e}")
|
||||
print(f"Error parsing nameConstraints: {e}")
|
||||
|
||||
return permitted_iins
|
||||
|
||||
@@ -292,40 +210,41 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
||||
def validate_eid_range(eid: str, eum_cert) -> bool:
|
||||
"""Validate that EID is within the permitted EINs of the EUM certificate."""
|
||||
if not eid or len(eid) != 32:
|
||||
logger.debug(f"Invalid EID format: {eid}")
|
||||
print(f"Invalid EID format: {eid}")
|
||||
return False
|
||||
|
||||
try:
|
||||
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
|
||||
|
||||
if not permitted_eins:
|
||||
logger.debug("Warning: No permitted EINs found in EUM certificate")
|
||||
print("Warning: No permitted EINs found in EUM certificate")
|
||||
return False
|
||||
|
||||
eid_normalized = eid.upper()
|
||||
logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
|
||||
print(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
|
||||
|
||||
for permitted_ein in permitted_eins:
|
||||
if eid_normalized.startswith(permitted_ein):
|
||||
logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
|
||||
print(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
|
||||
return True
|
||||
|
||||
logger.debug(f"EID {eid_normalized} is not in any permitted EIN list")
|
||||
print(f"EID {eid_normalized} is not in any permitted EIN list")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error validating EID: {e}")
|
||||
print(f"Error validating EID: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
|
||||
r = {'subjectCode': subject_code, 'reasonCode': reason_code }
|
||||
r = {'subjectCode': subject_code, 'reasonCode': reason_code}
|
||||
if subject_id:
|
||||
r['subjectIdentifier'] = subject_id
|
||||
if message:
|
||||
r['message'] = message
|
||||
return r
|
||||
|
||||
def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data = None) -> None:
|
||||
def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data=None) -> None:
|
||||
# SGP.22 v3.0 6.5.1.4
|
||||
js['header'] = {
|
||||
'functionExecutionStatus': {
|
||||
@@ -373,20 +292,20 @@ class SmDppHttpServer:
|
||||
with open(os.path.join(dirpath, filename), 'rb') as f:
|
||||
cert = x509.load_pem_x509_certificate(f.read())
|
||||
if cert:
|
||||
# verify it is a CI certificate (keyCertSign + i-rspRole-ci)
|
||||
if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
|
||||
raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
|
||||
# # verify it is a CI certificate (keyCertSign + i-rspRole-ci)
|
||||
# if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
|
||||
# raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
|
||||
certs.append(cert)
|
||||
return certs
|
||||
|
||||
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
|
||||
"""Find CI certificate for given key identifier."""
|
||||
for cert in self.ci_certs:
|
||||
logger.debug("cert: %s" % cert)
|
||||
print("cert: %s" % cert)
|
||||
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
|
||||
logger.debug(subject_exts)
|
||||
print(subject_exts)
|
||||
subject_pkid = subject_exts[0].value
|
||||
logger.debug(subject_pkid)
|
||||
print(subject_pkid)
|
||||
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
|
||||
return cert
|
||||
return None
|
||||
@@ -405,13 +324,13 @@ class SmDppHttpServer:
|
||||
continue
|
||||
return False
|
||||
|
||||
def __init__(self, server_hostname: str, ci_certs_path: str, common_cert_path: str, use_brainpool: bool = False, in_memory: bool = False):
|
||||
def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
|
||||
self.server_hostname = server_hostname
|
||||
self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
|
||||
self.ci_certs = self.load_certs_from_path(ci_certs_path)
|
||||
# load DPauth cert + key
|
||||
self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2)
|
||||
cert_dir = common_cert_path
|
||||
cert_dir = os.path.join(DATA_DIR, 'certs')
|
||||
if use_brainpool:
|
||||
self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_BRP.der'))
|
||||
self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_BRP.pem'))
|
||||
@@ -426,15 +345,7 @@ class SmDppHttpServer:
|
||||
else:
|
||||
self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
|
||||
self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
|
||||
if in_memory:
|
||||
self.rss = rsp.RspSessionStore(in_memory=True)
|
||||
logger.info("Using in-memory session storage")
|
||||
else:
|
||||
# Use different session database files for BRP and NIST to avoid file locking during concurrent runs
|
||||
session_db_suffix = "BRP" if use_brainpool else "NIST"
|
||||
db_path = os.path.join(DATA_DIR, f"sm-dp-sessions-{session_db_suffix}")
|
||||
self.rss = rsp.RspSessionStore(filename=db_path, in_memory=False)
|
||||
logger.info(f"Using file-based session storage: {db_path}")
|
||||
self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
|
||||
|
||||
@app.handle_errors(ApiError)
|
||||
def handle_apierror(self, request: IRequest, failure):
|
||||
@@ -461,7 +372,7 @@ class SmDppHttpServer:
|
||||
validate_request_headers(request)
|
||||
|
||||
content = json.loads(request.content.read())
|
||||
logger.debug("Rx JSON: %s" % json.dumps(content))
|
||||
# print("Rx JSON: %s" % json.dumps(content))
|
||||
set_headers(request)
|
||||
|
||||
output = func(self, request, content)
|
||||
@@ -469,7 +380,7 @@ class SmDppHttpServer:
|
||||
return ''
|
||||
|
||||
build_resp_header(output)
|
||||
logger.debug("Tx JSON: %s" % json.dumps(output))
|
||||
# print("Tx JSON: %s" % json.dumps(output))
|
||||
return json.dumps(output)
|
||||
return _api_wrapper
|
||||
|
||||
@@ -480,7 +391,7 @@ class SmDppHttpServer:
|
||||
# Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be
|
||||
# case-insensitive. Otherwise, the SM-DP+ SHALL return a status code "SM-DP+ Address - Refused".
|
||||
if content['smdpAddress'] != self.server_hostname:
|
||||
raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
|
||||
raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
|
||||
|
||||
euiccChallenge = b64decode(content['euiccChallenge'])
|
||||
if len(euiccChallenge) != 16:
|
||||
@@ -488,8 +399,8 @@ class SmDppHttpServer:
|
||||
|
||||
euiccInfo1_bin = b64decode(content['euiccInfo1'])
|
||||
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
|
||||
logger.debug("Rx euiccInfo1: %s" % euiccInfo1)
|
||||
#euiccInfo1['svn']
|
||||
print("Rx euiccInfo1: %s" % euiccInfo1)
|
||||
# euiccInfo1['svn']
|
||||
|
||||
# TODO: If euiccCiPKIdListForSigningV3 is present ...
|
||||
|
||||
@@ -498,9 +409,9 @@ class SmDppHttpServer:
|
||||
pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
|
||||
|
||||
# Validate that SM-DP+ supports certificate chains for verification
|
||||
verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', [])
|
||||
if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list):
|
||||
raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA Certificate with a Public Key supported by the eUICC')
|
||||
# verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', [])
|
||||
# if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list):
|
||||
# raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA Certificate with a Public Key supported by the eUICC')
|
||||
|
||||
# verify it supports one of the keys indicated by euiccCiPKIdListForSigning
|
||||
ci_cert = None
|
||||
@@ -514,7 +425,7 @@ class SmDppHttpServer:
|
||||
else:
|
||||
ci_cert = None
|
||||
if not ci_cert:
|
||||
raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
|
||||
raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
|
||||
|
||||
# Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
|
||||
# SHALL be unique within the scope and lifetime of each SM-DP+.
|
||||
@@ -530,10 +441,10 @@ class SmDppHttpServer:
|
||||
'euiccChallenge': euiccChallenge,
|
||||
'serverAddress': self.server_hostname,
|
||||
'serverChallenge': serverChallenge,
|
||||
}
|
||||
logger.debug("Tx serverSigned1: %s" % serverSigned1)
|
||||
}
|
||||
print("Tx serverSigned1: %s" % serverSigned1)
|
||||
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
|
||||
logger.debug("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
||||
print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
||||
output = {}
|
||||
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
|
||||
|
||||
@@ -544,9 +455,9 @@ class SmDppHttpServer:
|
||||
output['transactionId'] = transactionId
|
||||
server_cert_aki = self.dp_auth.get_authority_key_identifier()
|
||||
output['euiccCiPKIdToBeUsed'] = b64encode2str(b'\x04\x14' + server_cert_aki.key_identifier)
|
||||
output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
|
||||
output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
|
||||
# FIXME: add those certificate
|
||||
#output['otherCertsInChain'] = b64encode2str()
|
||||
# output['otherCertsInChain'] = b64encode2str()
|
||||
|
||||
# create SessionState and store it in rss
|
||||
self.rss[transactionId] = rsp.RspSessionState(transactionId, serverChallenge,
|
||||
@@ -562,11 +473,11 @@ class SmDppHttpServer:
|
||||
|
||||
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
|
||||
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
|
||||
logger.debug("Rx %s: %s" % authenticateServerResp)
|
||||
print("Rx %s: %s" % authenticateServerResp)
|
||||
if authenticateServerResp[0] == 'authenticateResponseError':
|
||||
r_err = authenticateServerResp[1]
|
||||
#r_err['transactionId']
|
||||
#r_err['authenticateErrorCode']
|
||||
# r_err['transactionId']
|
||||
# r_err['authenticateErrorCode']
|
||||
raise ValueError("authenticateResponseError %s" % r_err)
|
||||
|
||||
r_ok = authenticateServerResp[1]
|
||||
@@ -590,7 +501,7 @@ class SmDppHttpServer:
|
||||
if ss is None:
|
||||
raise ApiError('8.10.1', '3.9', 'Unknown')
|
||||
ss.euicc_cert = euicc_cert
|
||||
ss.eum_cert = eum_cert # TODO: do we need this in the state?
|
||||
ss.eum_cert = eum_cert # TODO: do we need this in the state?
|
||||
|
||||
# Verify that the Root Certificate of the eUICC certificate chain corresponds to the
|
||||
# euiccCiPKIdToBeUsed or TODO: euiccCiPKIdToBeUsedV3
|
||||
@@ -607,14 +518,13 @@ class SmDppHttpServer:
|
||||
raise ApiError('8.1.3', '6.1', 'Verification failed (certificate chain)')
|
||||
# raise ApiError('8.1.3', '6.3', 'Expired')
|
||||
|
||||
|
||||
# Verify euiccSignature1 over euiccSigned1 using pubkey from euiccCertificate.
|
||||
# Otherwise, the SM-DP+ SHALL return a status code "eUICC - Verification failed"
|
||||
if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
|
||||
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
|
||||
|
||||
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
|
||||
logger.debug("EID (from eUICC cert): %s" % ss.eid)
|
||||
print("EID (from eUICC cert): %s" % ss.eid)
|
||||
|
||||
# Verify EID is within permitted range of EUM certificate
|
||||
if not validate_eid_range(ss.eid, eum_cert):
|
||||
@@ -629,7 +539,6 @@ class SmDppHttpServer:
|
||||
# If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...]
|
||||
# TODO: We really do a very simplistic job here, this needs to be properly implemented later,
|
||||
# considering all the various cases, profile state, etc.
|
||||
iccid_str = None
|
||||
if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication':
|
||||
cpca = euiccSigned1['ctxParams1'][1]
|
||||
matchingId = cpca.get('matchingId', None)
|
||||
@@ -640,7 +549,7 @@ class SmDppHttpServer:
|
||||
# look up profile based on matchingID. We simply check if a given file exists for now..
|
||||
path = os.path.join(self.upp_dir, matchingId) + '.der'
|
||||
# prevent directory traversal attack
|
||||
if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir:
|
||||
if os.path.commonprefix((os.path.realpath(path), self.upp_dir)) != self.upp_dir:
|
||||
raise ApiError('8.2.6', '3.8', 'Refused')
|
||||
if not os.path.isfile(path) or not os.access(path, os.R_OK):
|
||||
raise ApiError('8.2.6', '3.8', 'Refused')
|
||||
@@ -652,7 +561,7 @@ class SmDppHttpServer:
|
||||
# there's currently no other option in the ctxParams1 choice, so this cannot happen
|
||||
raise ApiError('1.3.1', '2.2', 'ctxParams1 missing mandatory ctxParamsForCommonAuthentication')
|
||||
|
||||
# FIXME: we actually want to perform the profile binding herr, and read the profile metadata from the profile
|
||||
# FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
|
||||
|
||||
# Put together profileMetadata + _bin
|
||||
ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId)
|
||||
@@ -665,8 +574,8 @@ class SmDppHttpServer:
|
||||
smdpSigned2 = {
|
||||
'transactionId': h2b(ss.transactionId),
|
||||
'ccRequiredFlag': False, # whether the Confirmation Code is required
|
||||
#'bppEuiccOtpk': None, # whether otPK.EUICC.ECKA already used for binding the BPP, tag '5F49'
|
||||
}
|
||||
# 'bppEuiccOtpk': None, # whether otPK.EUICC.ECKA already used for binding the BPP, tag '5F49'
|
||||
}
|
||||
smdpSigned2_bin = rsp.asn1.encode('SmdpSigned2', smdpSigned2)
|
||||
|
||||
ss.smdpSignature2_do = b'\x5f\x37\x40' + self.dp_pb.ecdsa_sign(smdpSigned2_bin + b'\x5f\x37\x40' + euiccSignature1_bin)
|
||||
@@ -678,7 +587,7 @@ class SmDppHttpServer:
|
||||
'profileMetadata': b64encode2str(profileMetadata_bin),
|
||||
'smdpSigned2': b64encode2str(smdpSigned2_bin),
|
||||
'smdpSignature2': b64encode2str(ss.smdpSignature2_do),
|
||||
'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
|
||||
'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
|
||||
}
|
||||
|
||||
@app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
|
||||
@@ -694,12 +603,12 @@ class SmDppHttpServer:
|
||||
|
||||
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
|
||||
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
|
||||
logger.debug("Rx %s: %s" % prepDownloadResp)
|
||||
print("Rx %s: %s" % prepDownloadResp)
|
||||
|
||||
if prepDownloadResp[0] == 'downloadResponseError':
|
||||
r_err = prepDownloadResp[1]
|
||||
#r_err['transactionId']
|
||||
#r_err['downloadErrorCode']
|
||||
# r_err['transactionId']
|
||||
# r_err['downloadErrorCode']
|
||||
raise ValueError("downloadResponseError %s" % r_err)
|
||||
|
||||
r_ok = prepDownloadResp[1]
|
||||
@@ -716,46 +625,60 @@ class SmDppHttpServer:
|
||||
|
||||
# store otPK.EUICC.ECKA in session state
|
||||
ss.euicc_otpk = euiccSigned2['euiccOtpk']
|
||||
logger.debug("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||
print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||
|
||||
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
|
||||
# Reference value of CERT.DPpb.ECDDSA
|
||||
logger.debug("curve = %s" % self.dp_pb.get_curve())
|
||||
print("curve = %s" % self.dp_pb.get_curve())
|
||||
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
|
||||
# extract the public key in (hopefully) the right format for the ES8+ interface
|
||||
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
||||
logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||
logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||
# print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||
# print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||
|
||||
ss.host_id = b'mahlzeit'
|
||||
|
||||
# Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
|
||||
euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
|
||||
ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
|
||||
logger.debug("shared_secret: %s" % b2h(ss.shared_secret))
|
||||
print("shared_secret: %s" % b2h(ss.shared_secret))
|
||||
|
||||
# TODO: Check if this order requires a Confirmation Code verification
|
||||
|
||||
# Perform actual protection + binding of profile package (or return pre-bound one)
|
||||
with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f:
|
||||
upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
|
||||
# HACK: Use empty PPP as we're still debugging the configureISDP step, and we want to avoid
|
||||
# HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
|
||||
# cluttering the log with stuff happening after the failure
|
||||
#upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
|
||||
# upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
|
||||
if False:
|
||||
# Use random keys
|
||||
bpp = BoundProfilePackage.from_upp(upp)
|
||||
else:
|
||||
# Use session keys
|
||||
# Use sesssion keys
|
||||
ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
|
||||
bpp = BoundProfilePackage.from_ppp(ppp)
|
||||
|
||||
# update non-volatile state with updated ss object
|
||||
self.rss[transactionId] = ss
|
||||
return {
|
||||
rv = {
|
||||
'transactionId': transactionId,
|
||||
'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
|
||||
}
|
||||
import bsp_test_integration as integ
|
||||
integration = integ.BspTestIntegration()
|
||||
bpp_der = base64.b64decode(rv['boundProfilePackage']) #.decode('ascii')
|
||||
verification = integration.verify_bound_profile_package(
|
||||
shared_secret=ss.shared_secret,
|
||||
key_type=0x88,
|
||||
key_length=16,
|
||||
host_id=ss.host_id,
|
||||
eid=h2b(ss.eid),
|
||||
bpp_der=bpp_der
|
||||
)
|
||||
|
||||
assert verification['success'], f"BPP verification failed: {verification['error']}"
|
||||
return rv
|
||||
|
||||
@app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
|
||||
@rsp_api_wrapper
|
||||
@@ -766,22 +689,22 @@ class SmDppHttpServer:
|
||||
request.setResponseCode(204)
|
||||
pendingNotification_bin = b64decode(content['pendingNotification'])
|
||||
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
|
||||
logger.debug("Rx %s: %s" % pendingNotification)
|
||||
print("Rx %s: %s" % pendingNotification)
|
||||
if pendingNotification[0] == 'profileInstallationResult':
|
||||
profileInstallRes = pendingNotification[1]
|
||||
pird = profileInstallRes['profileInstallationResultData']
|
||||
transactionId = b2h(pird['transactionId'])
|
||||
ss = self.rss.get(transactionId, None)
|
||||
if ss is None:
|
||||
logger.warning(f"Unable to find session for transactionId: {transactionId}")
|
||||
return None # Will return HTTP 204 with empty body
|
||||
print("Unable to find session for transactionId")
|
||||
return
|
||||
profileInstallRes['euiccSignPIR']
|
||||
# TODO: use original data, don't re-encode?
|
||||
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
||||
# verify eUICC signature
|
||||
if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
|
||||
raise Exception('ECDSA signature verification failed on notification')
|
||||
logger.debug("Profile Installation Final Result: %s", pird['finalResult'])
|
||||
print("Profile Installation Final Result: ", pird['finalResult'])
|
||||
# remove session state
|
||||
del self.rss[transactionId]
|
||||
elif pendingNotification[0] == 'otherSignedNotification':
|
||||
@@ -806,20 +729,20 @@ class SmDppHttpServer:
|
||||
iccid = other_notif.get('iccid', None)
|
||||
if iccid:
|
||||
iccid = swap_nibbles(b2h(iccid))
|
||||
logger.debug("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
||||
print("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
||||
else:
|
||||
raise ValueError(pendingNotification)
|
||||
|
||||
#@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
|
||||
#@rsp_api_wrapper
|
||||
#"""See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
|
||||
# @app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
|
||||
# @rsp_api_wrapper
|
||||
# """See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
|
||||
# TODO: implement this
|
||||
|
||||
@app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
|
||||
@rsp_api_wrapper
|
||||
def cancelSession(self, request: IRequest, content: dict) -> dict:
|
||||
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
|
||||
logger.debug("Rx JSON: %s" % content)
|
||||
print("Rx JSON: %s" % content)
|
||||
transactionId = content['transactionId']
|
||||
|
||||
# Verify that the received transactionId is known and relates to an ongoing RSP session
|
||||
@@ -829,7 +752,7 @@ class SmDppHttpServer:
|
||||
|
||||
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
|
||||
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
|
||||
logger.debug("Rx %s: %s" % cancelSessionResponse)
|
||||
print("Rx %s: %s" % cancelSessionResponse)
|
||||
|
||||
if cancelSessionResponse[0] == 'cancelSessionResponseError':
|
||||
# FIXME: print some error
|
||||
@@ -856,58 +779,67 @@ class SmDppHttpServer:
|
||||
|
||||
# delete actual session data
|
||||
del self.rss[transactionId]
|
||||
return { 'transactionId': transactionId }
|
||||
return {'transactionId': transactionId}
|
||||
|
||||
|
||||
def main(argv):
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP(S) to", default="localhost")
|
||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP(S) to", default=443)
|
||||
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
||||
parser.add_argument("-s", "--nossl", help="disable built in SSL/TLS support", action='store_true', default=False)
|
||||
parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False)
|
||||
parser.add_argument("-b", "--brainpool", help="Use Brainpool curves instead of NIST",
|
||||
action='store_true', default=False)
|
||||
parser.add_argument("-m", "--in-memory", help="Use ephermal in-memory session storage (for concurrent runs)",
|
||||
action='store_true', default=False)
|
||||
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
|
||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
|
||||
# parser.add_argument("-v", "--verbose", help="increase output verbosity", action='count', default=0)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
|
||||
hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=False)
|
||||
# hs.app.run(HOSTNAME,endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
|
||||
|
||||
common_cert_path = os.path.join(DATA_DIR, args.certdir)
|
||||
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool)
|
||||
if(args.nossl):
|
||||
hs.app.run(args.host, args.port)
|
||||
else:
|
||||
curve_type = 'BRP' if args.brainpool else 'NIST'
|
||||
cert_derpath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.der'
|
||||
cert_pempath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.pem'
|
||||
cert_skpath = Path(common_cert_path) / 'DPtls' / f'SK_S_SM_DP_TLS_{curve_type}.pem'
|
||||
dhparam_path = Path(common_cert_path) / "dhparam2048.pem"
|
||||
if not dhparam_path.exists():
|
||||
print("Generating dh params, this takes a few seconds..")
|
||||
# Generate DH parameters with 2048-bit key size and generator 2
|
||||
parameters = dh.generate_parameters(generator=2, key_size=2048)
|
||||
pem_data = parameters.parameter_bytes(encoding=Encoding.PEM,format=ParameterFormat.PKCS3)
|
||||
with open(dhparam_path, 'wb') as file:
|
||||
file.write(pem_data)
|
||||
print("DH params created successfully")
|
||||
from cryptography.hazmat.primitives.asymmetric import dh
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from pathlib import Path
|
||||
|
||||
if not cert_pempath.exists():
|
||||
print("Translating tls server cert from DER to PEM..")
|
||||
with open(cert_derpath, 'rb') as der_file:
|
||||
der_cert_data = der_file.read()
|
||||
cert_derpath = Path(DATA_DIR) / 'certs' / 'DPtls' / 'CERT_S_SM_DP_TLS_NIST.der'
|
||||
cert_pempath = Path(DATA_DIR) / 'certs' / 'DPtls' / 'CERT_S_SM_DP_TLS_NIST.pem'
|
||||
cert_skpath = Path(DATA_DIR) / 'certs' / 'DPtls' / 'SK_S_SM_DP_TLS_NIST.pem'
|
||||
dhparam_path = Path("dhparam2048.pem")
|
||||
if not dhparam_path.exists():
|
||||
print("Generating dh params, this takes a few seconds..")
|
||||
# Generate DH parameters with 2048-bit key size and generator 2
|
||||
parameters = dh.generate_parameters(generator=2, key_size=2048)
|
||||
pem_data = parameters.parameter_bytes(encoding=serialization.Encoding.PEM,format=serialization.ParameterFormat.PKCS3)
|
||||
with open(dhparam_path, 'wb') as file:
|
||||
file.write(pem_data)
|
||||
print("DH params created successfully")
|
||||
|
||||
cert = x509.load_der_x509_certificate(der_cert_data)
|
||||
pem_cert = cert.public_bytes(Encoding.PEM) #.decode('utf-8')
|
||||
if not cert_pempath.exists():
|
||||
print("Translating tls server cert from DER to PEM..")
|
||||
with open(cert_derpath, 'rb') as der_file:
|
||||
der_cert_data = der_file.read()
|
||||
|
||||
with open(cert_pempath, 'wb') as pem_file:
|
||||
pem_file.write(pem_cert)
|
||||
cert = x509.load_der_x509_certificate(der_cert_data)
|
||||
pem_cert = cert.public_bytes(serialization.Encoding.PEM) #.decode('utf-8')
|
||||
|
||||
SERVER_STRING = f'ssl:{args.port}:privateKey={cert_skpath}:certKey={cert_pempath}:dhParameters={dhparam_path}'
|
||||
print(SERVER_STRING)
|
||||
with open(cert_pempath, 'wb') as pem_file:
|
||||
pem_file.write(pem_cert)
|
||||
|
||||
SERVER_STRING = f'ssl:8000:privateKey={cert_skpath}:certKey={cert_pempath}:dhParameters={dhparam_path}'
|
||||
print(SERVER_STRING)
|
||||
|
||||
hs.app.run(HOSTNAME, endpoint_description=SERVER_STRING)
|
||||
# hs.app.run(args.host, args.port)
|
||||
|
||||
hs.app.run(host=HOSTNAME, port=args.port, endpoint_description=SERVER_STRING)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv)
|
||||
|
||||
|
||||
# (.venv) ➜ ~/work/smdp/pysim git:(master) ✗ cp -a ../sgp26/SGP.26_v1.5_Certificates_18_07_2024/SGP.26_v1.5-2024_files/Valid\ Test\ Cases/SM-DP+/DPtls/CERT_S_SM_DP_TLS_NIST.der .
|
||||
# (.venv) ➜ ~/work/smdp/pysim git:(master) ✗ cp -a ../sgp26/SGP.26_v1.5_Certificates_18_07_2024/SGP.26_v1.5-2024_files/Valid\ Test\ Cases/SM-DP+/DPtls/SK_S_SM_DP_TLS_NIST.pem .
|
||||
# (.venv) ➜ ~/work/smdp/pysim git:(master) ✗ openssl x509 -inform der -in CERT_S_SM_DP_TLS_NIST.der -out CERT_S_SM_DP_TLS_NIST.pem
|
||||
|
||||
|
||||
# cp -a Variants\ A_B_C/CI/CERT_CI_SIG_* ../pysim/smdpp-data/certs/CertificateIssuer
|
||||
# cp -a Variants\ A_B_C/CI_subCA/CERT_*_SIG_* ../pysim/smdpp-data/certs/CertificateIssuer
|
||||
# cp -a Variants\ A_B_C/Variant\ C/SM-DP+/SM_DPauth/CERT* ../pysim/smdpp-data/certs/DPauth
|
||||
# cp -a Variants\ A_B_C/Variant\ C/SM-DP+/SM_DPpb/CERT* ../pysim/smdpp-data/certs/DPpb
|
||||
# cp -a Variants\ A_B_C/Variant\ C/SM-DP+/SM_DPtls/CERT* ../pysim/smdpp-data/certs/DPtls
|
||||
# cp -a Variants\ A_B_C/Variant\ C/EUM_SUB_CA/CERT_EUMSubCA_VARC_SIG_* ../pysim/smdpp-data/certs/intermediate
|
||||
@@ -586,7 +586,7 @@ def read_params_csv(opts, imsi=None, iccid=None):
|
||||
else:
|
||||
row['mnc'] = row.get('mnc', mnc_from_imsi(row.get('imsi'), False))
|
||||
|
||||
# NOTE: We might consider to specify a new CSV field "mnclen" in our
|
||||
# NOTE: We might concider to specify a new CSV field "mnclen" in our
|
||||
# CSV files for a better automatization. However, this only makes sense
|
||||
# when the tools and databases we export our files from will also add
|
||||
# such a field.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
#
|
||||
# Utility to display some information about a SIM card
|
||||
# Utility to display some informations about a SIM card
|
||||
#
|
||||
#
|
||||
# Copyright (C) 2009 Sylvain Munaut <tnt@246tNt.com>
|
||||
|
||||
159
pySim-shell.py
159
pySim-shell.py
@@ -22,25 +22,19 @@ from typing import List, Optional
|
||||
import json
|
||||
import traceback
|
||||
import re
|
||||
|
||||
import cmd2
|
||||
from packaging import version
|
||||
from cmd2 import style
|
||||
|
||||
import logging
|
||||
from pySim.log import PySimLogger
|
||||
from osmocom.utils import auto_uint8
|
||||
|
||||
# cmd2 >= 2.3.0 has deprecated the bg/fg in favor of Bg/Fg :(
|
||||
if version.parse(cmd2.__version__) < version.parse("2.3.0"):
|
||||
from cmd2 import fg, bg # pylint: disable=no-name-in-module
|
||||
RED = fg.red
|
||||
YELLOW = fg.yellow
|
||||
LIGHT_RED = fg.bright_red
|
||||
LIGHT_GREEN = fg.bright_green
|
||||
else:
|
||||
from cmd2 import Fg, Bg # pylint: disable=no-name-in-module
|
||||
RED = Fg.RED
|
||||
YELLOW = Fg.YELLOW
|
||||
LIGHT_RED = Fg.LIGHT_RED
|
||||
LIGHT_GREEN = Fg.LIGHT_GREEN
|
||||
from cmd2 import CommandSet, with_default_category, with_argparser
|
||||
@@ -69,12 +63,10 @@ from pySim.ts_102_222 import Ts102222Commands
|
||||
from pySim.gsm_r import DF_EIRENE
|
||||
from pySim.cat import ProactiveCommand
|
||||
|
||||
from pySim.card_key_provider import CardKeyProviderCsv
|
||||
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
|
||||
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
|
||||
|
||||
from pySim.app import init_card
|
||||
|
||||
log = PySimLogger.get("main")
|
||||
|
||||
class Cmd2Compat(cmd2.Cmd):
|
||||
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
|
||||
@@ -100,19 +92,15 @@ class PysimApp(Cmd2Compat):
|
||||
(C) 2021-2023 by Harald Welte, sysmocom - s.f.m.c. GmbH and contributors
|
||||
Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/shell.html """
|
||||
|
||||
def __init__(self, verbose, card, rs, sl, ch, script=None):
|
||||
def __init__(self, card, rs, sl, ch, script=None):
|
||||
if version.parse(cmd2.__version__) < version.parse("2.0.0"):
|
||||
kwargs = {'use_ipython': True}
|
||||
else:
|
||||
kwargs = {'include_ipy': True}
|
||||
|
||||
self.verbose = verbose
|
||||
self._onchange_verbose('verbose', False, self.verbose);
|
||||
|
||||
# pylint: disable=unexpected-keyword-arg
|
||||
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
|
||||
auto_load_commands=False, startup_script=script, **kwargs)
|
||||
PySimLogger.setup(self.poutput, {logging.WARN: YELLOW})
|
||||
self.intro = style(self.BANNER, fg=RED)
|
||||
self.default_category = 'pySim-shell built-in commands'
|
||||
self.card = None
|
||||
@@ -138,9 +126,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
self.add_settable(Settable2Compat('apdu_strict', bool,
|
||||
'Enforce APDU responses according to ISO/IEC 7816-3, table 12', self,
|
||||
onchange_cb=self._onchange_apdu_strict))
|
||||
self.add_settable(Settable2Compat('verbose', bool,
|
||||
'Enable/disable verbose logging', self,
|
||||
onchange_cb=self._onchange_verbose))
|
||||
self.equip(card, rs)
|
||||
|
||||
def equip(self, card, rs):
|
||||
@@ -225,13 +210,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
else:
|
||||
self.card._scc._tp.apdu_strict = False
|
||||
|
||||
def _onchange_verbose(self, param_name, old, new):
|
||||
PySimLogger.set_verbose(new)
|
||||
if new == True:
|
||||
PySimLogger.set_level(logging.DEBUG)
|
||||
else:
|
||||
PySimLogger.set_level(logging.INFO)
|
||||
|
||||
class Cmd2ApduTracer(ApduTracer):
|
||||
def __init__(self, cmd2_app):
|
||||
self.cmd2 = cmd2_app
|
||||
@@ -287,7 +265,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
def do_apdu(self, opts):
|
||||
"""Send a raw APDU to the card, and print SW + Response.
|
||||
CAUTION: this command bypasses the logical channel handling of pySim-shell and card state changes are not
|
||||
tracked. Depending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
|
||||
tracked. Dpending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
|
||||
a different file."""
|
||||
|
||||
# When sending raw APDUs we access the scc object through _scc member of the card object. It should also be
|
||||
@@ -358,7 +336,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
|
||||
def _process_card(self, first, script_path):
|
||||
|
||||
# Early phase of card initialization (this part may fail with an exception)
|
||||
# Early phase of card initialzation (this part may fail with an exception)
|
||||
try:
|
||||
rs, card = init_card(self.sl)
|
||||
rc = self.equip(card, rs)
|
||||
@@ -399,7 +377,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
|
||||
bulk_script_parser = argparse.ArgumentParser()
|
||||
bulk_script_parser.add_argument('SCRIPT_PATH', help="path to the script file")
|
||||
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exception occurs',
|
||||
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exeption occurs',
|
||||
action='store_true')
|
||||
bulk_script_parser.add_argument('--tries', type=int, default=2,
|
||||
help='how many tries before trying the next card')
|
||||
@@ -499,23 +477,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
"""Echo (print) a string on the console"""
|
||||
self.poutput(' '.join(opts.STRING))
|
||||
|
||||
query_card_key_parser = argparse.ArgumentParser()
|
||||
query_card_key_parser.add_argument('FIELDS', help="fields to query", type=str, nargs='+')
|
||||
query_card_key_parser.add_argument('--key', help='lookup key (typically \'ICCID\' or \'EID\')',
|
||||
type=str, required=True)
|
||||
query_card_key_parser.add_argument('--value', help='lookup key match value (e.g \'8988211000000123456\')',
|
||||
type=str, required=True)
|
||||
@cmd2.with_argparser(query_card_key_parser)
|
||||
@cmd2.with_category(CUSTOM_CATEGORY)
|
||||
def do_query_card_key(self, opts):
|
||||
"""Manually query the Card Key Provider"""
|
||||
result = card_key_provider_get(opts.FIELDS, opts.key, opts.value)
|
||||
self.poutput("Result:")
|
||||
if result == {}:
|
||||
self.poutput(" (none)")
|
||||
for k in result.keys():
|
||||
self.poutput(" %s: %s" % (str(k), str(result.get(k))))
|
||||
|
||||
@cmd2.with_category(CUSTOM_CATEGORY)
|
||||
def do_version(self, opts):
|
||||
"""Print the pySim software version."""
|
||||
@@ -770,7 +731,7 @@ class PySimCommands(CommandSet):
|
||||
body = {}
|
||||
for t in tags:
|
||||
result = self._cmd.lchan.retrieve_data(t)
|
||||
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
|
||||
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
|
||||
body[t] = b2h(val)
|
||||
else:
|
||||
raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename))
|
||||
@@ -954,53 +915,36 @@ class Iso7816Commands(CommandSet):
|
||||
raise RuntimeError("cannot find %s for ICCID '%s'" % (field, iccid))
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def __select_pin_nr(pin_type:str, pin_nr:int) -> int:
|
||||
if pin_type:
|
||||
# pylint: disable=unsubscriptable-object
|
||||
return pin_names.inverse[pin_type]
|
||||
return pin_nr
|
||||
|
||||
@staticmethod
|
||||
def __add_pin_nr_to_ArgumentParser(chv_parser):
|
||||
group = chv_parser.add_mutually_exclusive_group()
|
||||
group.add_argument('--pin-type',
|
||||
choices=[x for x in pin_names.values()
|
||||
if (x.startswith('PIN') or x.startswith('2PIN'))],
|
||||
help='Specifiy pin type (default is PIN1)')
|
||||
group.add_argument('--pin-nr', type=auto_uint8, default=0x01,
|
||||
help='PIN Number, 1=PIN1, 0x81=2PIN1 or custom value (see also TS 102 221, Table 9.3")')
|
||||
|
||||
verify_chv_parser = argparse.ArgumentParser()
|
||||
verify_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
verify_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(verify_chv_parser)
|
||||
|
||||
@cmd2.with_argparser(verify_chv_parser)
|
||||
def do_verify_chv(self, opts):
|
||||
"""Verify (authenticate) using specified CHV (PIN) code, which is how the specifications
|
||||
call it if you authenticate yourself using the specified PIN. There usually is at least PIN1 and
|
||||
2PIN1 (see also TS 102 221 Section 9.5.1 / Table 9.3)."""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.verify_chv(pin_nr, h2b(pin))
|
||||
PIN2."""
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.verify_chv(opts.pin_nr, h2b(pin))
|
||||
self._cmd.poutput("CHV verification successful")
|
||||
|
||||
unblock_chv_parser = argparse.ArgumentParser()
|
||||
unblock_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
unblock_chv_parser.add_argument('PUK', nargs='?', type=is_decimal,
|
||||
help='PUK code value. If none given, CSV file will be queried')
|
||||
unblock_chv_parser.add_argument('NEWPIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(unblock_chv_parser)
|
||||
|
||||
@cmd2.with_argparser(unblock_chv_parser)
|
||||
def do_unblock_chv(self, opts):
|
||||
"""Unblock PIN code using specified PUK code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
|
||||
puk = self.get_code(opts.PUK, "PUK" + str(pin_nr))
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
|
||||
puk = self.get_code(opts.PUK, "PUK" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.unblock_chv(
|
||||
pin_nr, h2b(puk), h2b(new_pin))
|
||||
opts.pin_nr, h2b(puk), h2b(new_pin))
|
||||
self._cmd.poutput("CHV unblock successful")
|
||||
|
||||
change_chv_parser = argparse.ArgumentParser()
|
||||
@@ -1008,42 +952,42 @@ class Iso7816Commands(CommandSet):
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
change_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(change_chv_parser)
|
||||
change_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
|
||||
@cmd2.with_argparser(change_chv_parser)
|
||||
def do_change_chv(self, opts):
|
||||
"""Change PIN code to a new PIN code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.change_chv(
|
||||
pin_nr, h2b(pin), h2b(new_pin))
|
||||
opts.pin_nr, h2b(pin), h2b(new_pin))
|
||||
self._cmd.poutput("CHV change successful")
|
||||
|
||||
disable_chv_parser = argparse.ArgumentParser()
|
||||
disable_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
disable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(disable_chv_parser)
|
||||
|
||||
@cmd2.with_argparser(disable_chv_parser)
|
||||
def do_disable_chv(self, opts):
|
||||
"""Disable PIN code using specified PIN code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.disable_chv(pin_nr, h2b(pin))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.disable_chv(opts.pin_nr, h2b(pin))
|
||||
self._cmd.poutput("CHV disable successful")
|
||||
|
||||
enable_chv_parser = argparse.ArgumentParser()
|
||||
__add_pin_nr_to_ArgumentParser(enable_chv_parser)
|
||||
enable_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
enable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
|
||||
@cmd2.with_argparser(enable_chv_parser)
|
||||
def do_enable_chv(self, opts):
|
||||
"""Enable PIN code using specified PIN code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.enable_chv(pin_nr, h2b(pin))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.enable_chv(opts.pin_nr, h2b(pin))
|
||||
self._cmd.poutput("CHV enable successful")
|
||||
|
||||
def do_deactivate_file(self, opts):
|
||||
@@ -1127,23 +1071,16 @@ argparse_add_reader_args(option_parser)
|
||||
global_group = option_parser.add_argument_group('General Options')
|
||||
global_group.add_argument('--script', metavar='PATH', default=None,
|
||||
help='script with pySim-shell commands to be executed automatically at start-up')
|
||||
global_group.add_argument('--csv', metavar='FILE',
|
||||
default=None, help='Read card data from CSV file')
|
||||
global_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
help='per-CSV-column AES transport key')
|
||||
global_group.add_argument("--card_handler", dest="card_handler_config", metavar="FILE",
|
||||
help="Use automatic card handling machine")
|
||||
global_group.add_argument("--noprompt", help="Run in non interactive mode",
|
||||
action='store_true', default=False)
|
||||
global_group.add_argument("--skip-card-init", help="Skip all card/profile initialization",
|
||||
action='store_true', default=False)
|
||||
global_group.add_argument("--verbose", help="Enable verbose logging",
|
||||
action='store_true', default=False)
|
||||
|
||||
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
|
||||
card_key_group.add_argument('--csv', metavar='FILE',
|
||||
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
|
||||
help='Read card data from CSV file')
|
||||
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
help=argparse.SUPPRESS, dest='column_key')
|
||||
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
help='per-column AES transport key', dest='column_key')
|
||||
|
||||
adm_group = global_group.add_mutually_exclusive_group()
|
||||
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
|
||||
@@ -1158,27 +1095,23 @@ option_parser.add_argument("command", nargs='?',
|
||||
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
|
||||
help="Optional Arguments for command")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
startup_errors = False
|
||||
opts = option_parser.parse_args()
|
||||
|
||||
# Ensure that we are able to print formatted warnings from the beginning.
|
||||
PySimLogger.setup(print, {logging.WARN: YELLOW})
|
||||
if (opts.verbose):
|
||||
PySimLogger.set_verbose(True)
|
||||
PySimLogger.set_level(logging.DEBUG)
|
||||
else:
|
||||
PySimLogger.set_verbose(False)
|
||||
PySimLogger.set_level(logging.INFO)
|
||||
|
||||
# Register csv-file as card data provider, either from specified CSV
|
||||
# or from CSV file in home directory
|
||||
column_keys = {}
|
||||
for par in opts.column_key:
|
||||
csv_column_keys = {}
|
||||
for par in opts.csv_column_key:
|
||||
name, key = par.split(':')
|
||||
column_keys[name] = key
|
||||
if os.path.isfile(opts.csv):
|
||||
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
|
||||
csv_column_keys[name] = key
|
||||
csv_default = str(Path.home()) + "/.osmocom/pysim/card_data.csv"
|
||||
if opts.csv:
|
||||
card_key_provider_register(CardKeyProviderCsv(opts.csv, csv_column_keys))
|
||||
if os.path.isfile(csv_default):
|
||||
card_key_provider_register(CardKeyProviderCsv(csv_default, csv_column_keys))
|
||||
|
||||
# Init card reader driver
|
||||
sl = init_reader(opts, proactive_handler = Proact())
|
||||
@@ -1194,7 +1127,7 @@ if __name__ == '__main__':
|
||||
# able to tolerate and recover from that.
|
||||
try:
|
||||
rs, card = init_card(sl, opts.skip_card_init)
|
||||
app = PysimApp(opts.verbose, card, rs, sl, ch)
|
||||
app = PysimApp(card, rs, sl, ch)
|
||||
except:
|
||||
startup_errors = True
|
||||
print("Card initialization (%s) failed with an exception:" % str(sl))
|
||||
@@ -1206,7 +1139,7 @@ if __name__ == '__main__':
|
||||
print(" it should also be noted that some readers may behave strangely when no card")
|
||||
print(" is inserted.)")
|
||||
print("")
|
||||
app = PysimApp(opts.verbose, None, None, sl, ch)
|
||||
app = PysimApp(None, None, sl, ch)
|
||||
|
||||
# If the user supplies an ADM PIN at via commandline args authenticate
|
||||
# immediately so that the user does not have to use the shell commands
|
||||
|
||||
@@ -53,7 +53,7 @@ from pySim.cards import UiccCardBase
|
||||
from pySim.exceptions import *
|
||||
from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload, BearerDescription
|
||||
from pySim.cat import DeviceIdentities, Address, OtherAddress, UiccTransportLevel, BufferSize
|
||||
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength, EventDownload, EventList
|
||||
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -71,46 +71,24 @@ class MyApduTracer(ApduTracer):
|
||||
print("-> %s %s" % (cmd[:10], cmd[10:]))
|
||||
print("<- %s: %s" % (sw, resp))
|
||||
|
||||
class TcpProtocol(protocol.Protocol):
|
||||
def dataReceived(self, data):
|
||||
pass
|
||||
|
||||
def connectionLost(self, reason):
|
||||
pass
|
||||
|
||||
|
||||
def tcp_connected_callback(p: protocol.Protocol):
|
||||
"""called by twisted TCP client."""
|
||||
logger.error("%s: connected!" % p)
|
||||
for data in p.pending_tx:
|
||||
p.transport.write(data)
|
||||
|
||||
class ProactChannel(protocol.Protocol):
|
||||
class ProactChannel:
|
||||
"""Representation of a single proective channel."""
|
||||
def __init__(self, channels: 'ProactChannels', chan_nr: int):
|
||||
self.channels = channels
|
||||
self.chan_nr = chan_nr
|
||||
self.ep = None
|
||||
self.pending_tx = []
|
||||
self.pending_rx = bytearray()
|
||||
|
||||
def write(self, data: bytes):
|
||||
if self.connected:
|
||||
self.transport.write(data)
|
||||
else:
|
||||
self.pending_tx.append(data)
|
||||
|
||||
def dataReceived(self, data: bytes):
|
||||
logger.error(f"Got data (len={len(data)}): {data}")
|
||||
self.pending_rx.extend(data)
|
||||
# Send ENVELOPE with EventDownload Data available
|
||||
event_list_ie = EventList(decoded=[ EventList.Event.data_available])
|
||||
channel_status_ie = ChannelStatus(decoded='8100')
|
||||
channel_data_len_ie = ChannelDataLength(decoded=min(255,len(self.pending_rx)))
|
||||
dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
|
||||
event_dl = EventDownload(children=[event_list_ie, dev_ids, channel_status_ie, channel_data_len_ie])
|
||||
# 3) send to the card
|
||||
envelope_hex = b2h(event_dl.to_tlv())
|
||||
logger.info("ENVELOPE Event: %s" % envelope_hex)
|
||||
global g_ms
|
||||
(data, sw) = g_ms.scc.envelope(envelope_hex)
|
||||
logger.info("SW %s: %s" % (sw, data))
|
||||
# FIXME: Handle result?!
|
||||
|
||||
def connectionLost(self, reason):
|
||||
logger.error("connection lost: %s" % reason)
|
||||
|
||||
def close(self):
|
||||
"""Close the channel."""
|
||||
@@ -196,13 +174,14 @@ class Proact(ProactiveHandler):
|
||||
raise ValueError('Unsupported protocol_type')
|
||||
if other_addr_ie.decoded.get('type_of_address', None) != 'ipv4':
|
||||
raise ValueError('Unsupported type_of_address')
|
||||
ipv4_bytes = other_addr_ie.decoded['address']
|
||||
ipv4_bytes = h2b(other_addr_ie.decoded['address'])
|
||||
ipv4_str = '%u.%u.%u.%u' % (ipv4_bytes[0], ipv4_bytes[1], ipv4_bytes[2], ipv4_bytes[3])
|
||||
port_nr = transp_lvl_ie.decoded['port_number']
|
||||
logger.error("OpenChannel opening with %s:%u" % (ipv4_str, port_nr))
|
||||
print("%s:%u" % (ipv4_str, port_nr))
|
||||
channel = self.channels.channel_create()
|
||||
channel.ep = endpoints.TCP4ClientEndpoint(reactor, ipv4_str, port_nr)
|
||||
d = endpoints.connectProtocol(channel.ep, channel)
|
||||
channel.prot = TcpProtocol()
|
||||
d = endpoints.connectProtocol(channel.ep, channel.prot)
|
||||
# FIXME: why is this never called despite the client showing the inbound connection?
|
||||
d.addCallback(tcp_connected_callback)
|
||||
|
||||
@@ -234,17 +213,6 @@ class Proact(ProactiveHandler):
|
||||
# ]}
|
||||
logger.info("ReceiveData")
|
||||
logger.info(pcmd)
|
||||
dev_id_ie = Proact._find_first_element_of_type(pcmd.children, DeviceIdentities)
|
||||
chan_data_len_ie = Proact._find_first_element_of_type(pcmd.children, ChannelDataLength)
|
||||
len_requested = chan_data_len_ie.decoded
|
||||
chan_str = dev_id_ie.decoded['dest_dev_id']
|
||||
chan_nr = 1 # FIXME
|
||||
chan = self.channels.channels.get(chan_nr, None)
|
||||
|
||||
requested = chan.pending_rx[:len_requested]
|
||||
chan.pending_rx = chan.pending_rx[len_requested:]
|
||||
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
|
||||
|
||||
# Terminal Response example: [
|
||||
# {'command_details': {'command_number': 1,
|
||||
# 'type_of_command': 'receive_data',
|
||||
@@ -254,8 +222,7 @@ class Proact(ProactiveHandler):
|
||||
# {'channel_data': '16030100040e000000'},
|
||||
# {'channel_data_length': 0}
|
||||
# ]
|
||||
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
|
||||
return resp
|
||||
return self.prepare_response(pcmd) + []
|
||||
|
||||
def handle_SendData(self, pcmd: ProactiveCommand):
|
||||
"""Send/write data received from the SIM to the socket."""
|
||||
@@ -273,10 +240,7 @@ class Proact(ProactiveHandler):
|
||||
chan_str = dev_id_ie.decoded['dest_dev_id']
|
||||
chan_nr = 1 # FIXME
|
||||
chan = self.channels.channels.get(chan_nr, None)
|
||||
# FIXME
|
||||
logger.error(f"Chan data received: {chan_data_ie.decoded}")
|
||||
chan.write(chan_data_ie.decoded)
|
||||
#chan.write(h2b(chan_data_ie.decoded))
|
||||
# FIXME chan.prot.transport.write(h2b(chan_data_ie.decoded))
|
||||
# Terminal Response example: [
|
||||
# {'command_details': {'command_number': 1,
|
||||
# 'type_of_command': 'send_data',
|
||||
@@ -461,3 +425,4 @@ if __name__ == '__main__':
|
||||
g_ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip, opts.smpp_system_id, opts.smpp_password)
|
||||
g_ms.connect_to_card(tp)
|
||||
reactor.run()
|
||||
|
||||
|
||||
@@ -151,7 +151,7 @@ global_group.add_argument('--no-suppress-select', action='store_false', dest='su
|
||||
global_group.add_argument('--no-suppress-status', action='store_false', dest='suppress_status',
|
||||
help="""
|
||||
Don't suppress displaying STATUS APDUs. We normally suppress them as they don't provide any
|
||||
information that was not already received in response to the most recent SEELCT.""")
|
||||
information that was not already received in resposne to the most recent SEELCT.""")
|
||||
global_group.add_argument('--show-raw-apdu', action='store_true', dest='show_raw_apdu',
|
||||
help="""Show the raw APDU in addition to its parsed form.""")
|
||||
|
||||
@@ -188,7 +188,7 @@ parser_rspro_pyshark_live.add_argument('-i', '--interface', required=True,
|
||||
parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
|
||||
Read APDUs from a TCA Loader log file.""")
|
||||
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
|
||||
help='Name of the log file to be read')
|
||||
help='Name of te log file to be read')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
|
||||
@@ -317,7 +317,7 @@ class ADF_ARAM(CardADF):
|
||||
store_ref_ar_do_parse = argparse.ArgumentParser()
|
||||
# REF-DO
|
||||
store_ref_ar_do_parse.add_argument(
|
||||
'--device-app-id', required=True, help='Identifies the specific device application that the rule applies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
|
||||
'--device-app-id', required=True, help='Identifies the specific device application that the rule appplies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
|
||||
aid_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
|
||||
aid_grp.add_argument(
|
||||
'--aid', help='Identifies the specific SE application for which rules are to be stored. Can be a partial AID, containing for example only the RID. (5-16 or 0 hex bytes)')
|
||||
@@ -399,7 +399,7 @@ class ADF_ARAM(CardADF):
|
||||
sw_aram = {
|
||||
'ARA-M': {
|
||||
'6381': 'Rule successfully stored but an access rule already exists',
|
||||
'6382': 'Rule successfully stored but contained at least one unknown (discarded) BER-TLV',
|
||||
'6382': 'Rule successfully stored bu contained at least one unknown (discarded) BER-TLV',
|
||||
'6581': 'Memory Problem',
|
||||
'6700': 'Wrong Length in Lc',
|
||||
'6981': 'DO is not supported by the ARA-M/ARA-C',
|
||||
|
||||
@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
|
||||
operation with pySim-shell.
|
||||
"""
|
||||
|
||||
# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
|
||||
# (C) 2021-2024 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier, Harald Welte
|
||||
@@ -31,161 +31,128 @@ operation with pySim-shell.
|
||||
from typing import List, Dict, Optional
|
||||
from Cryptodome.Cipher import AES
|
||||
from osmocom.utils import h2b, b2h
|
||||
from pySim.log import PySimLogger
|
||||
|
||||
import abc
|
||||
import csv
|
||||
import logging
|
||||
|
||||
log = PySimLogger.get("CARDKEY")
|
||||
|
||||
card_key_providers = [] # type: List['CardKeyProvider']
|
||||
|
||||
class CardKeyFieldCryptor:
|
||||
"""
|
||||
A Card key field encryption class that may be used by Card key provider implementations to add support for
|
||||
a column-based encryption to protect sensitive material (cryptographic key material, ADM keys, etc.).
|
||||
The sensitive material is encrypted using a "key-encryption key", occasionally also known as "transport key"
|
||||
before it is stored into a file or database (see also GSMA FS.28). The "transport key" is then used to decrypt
|
||||
the key material on demand.
|
||||
"""
|
||||
|
||||
# well-known groups of columns relate to a given functionality. This avoids having
|
||||
# to specify the same transport key N number of times, if the same key is used for multiple
|
||||
# fields of one group, like KIC+KID+KID of one SD.
|
||||
__CRYPT_GROUPS = {
|
||||
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
|
||||
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
|
||||
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
|
||||
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
||||
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
|
||||
# well-known groups of columns relate to a given functionality. This avoids having
|
||||
# to specify the same transport key N number of times, if the same key is used for multiple
|
||||
# fields of one group, like KIC+KID+KID of one SD.
|
||||
CRYPT_GROUPS = {
|
||||
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
|
||||
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
|
||||
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
|
||||
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
||||
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
|
||||
}
|
||||
|
||||
__IV = b'\x23' * 16
|
||||
|
||||
@staticmethod
|
||||
def __dict_keys_to_upper(d: dict) -> dict:
|
||||
return {k.upper():v for k,v in d.items()}
|
||||
|
||||
@staticmethod
|
||||
def __process_transport_keys(transport_keys: dict, crypt_groups: dict):
|
||||
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
|
||||
new_dict = {}
|
||||
for name, key in transport_keys.items():
|
||||
if name in crypt_groups:
|
||||
for field in crypt_groups[name]:
|
||||
new_dict[field] = key
|
||||
else:
|
||||
new_dict[name] = key
|
||||
return new_dict
|
||||
|
||||
def __init__(self, transport_keys: dict):
|
||||
"""
|
||||
Create new field encryptor/decryptor object and set transport keys, usually one for each column. In some cases
|
||||
it is also possible to use a single key for multiple columns (see also __CRYPT_GROUPS)
|
||||
|
||||
Args:
|
||||
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
|
||||
respective field (column) of the CSV. This is done so that different fields
|
||||
(columns) can use different transport keys, which is strongly recommended by
|
||||
GSMA FS.28
|
||||
"""
|
||||
self.transport_keys = self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys),
|
||||
self.__CRYPT_GROUPS)
|
||||
for name, key in self.transport_keys.items():
|
||||
log.debug("Encrypting/decrypting field %s using AES key %s" % (name, key))
|
||||
|
||||
def decrypt_field(self, field_name: str, encrypted_val: str) -> str:
|
||||
"""
|
||||
Decrypt a single field. The decryption is only applied if we have a transport key is known under the provided
|
||||
field name, otherwise the field is treated as plaintext and passed through as it is.
|
||||
|
||||
Args:
|
||||
field_name : name of the field to decrypt (used to identify which key to use)
|
||||
encrypted_val : encrypted field value
|
||||
|
||||
Returns:
|
||||
plaintext field value
|
||||
"""
|
||||
if not field_name.upper() in self.transport_keys:
|
||||
return encrypted_val
|
||||
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
||||
return b2h(cipher.decrypt(h2b(encrypted_val)))
|
||||
|
||||
def encrypt_field(self, field_name: str, plaintext_val: str) -> str:
|
||||
"""
|
||||
Encrypt a single field. The encryption is only applied if we have a transport key is known under the provided
|
||||
field name, otherwise the field is treated as non sensitive and passed through as it is.
|
||||
|
||||
Args:
|
||||
field_name : name of the field to decrypt (used to identify which key to use)
|
||||
encrypted_val : encrypted field value
|
||||
|
||||
Returns:
|
||||
plaintext field value
|
||||
"""
|
||||
if not field_name.upper() in self.transport_keys:
|
||||
return plaintext_val
|
||||
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
||||
return b2h(cipher.encrypt(h2b(plaintext_val)))
|
||||
|
||||
class CardKeyProvider(abc.ABC):
|
||||
"""Base class, not containing any concrete implementation."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||
"""
|
||||
Get multiple card-individual fields for identified card. This method should not fail with an exception in
|
||||
case the entry, columns or even the key column itsself is not found.
|
||||
VALID_KEY_FIELD_NAMES = ['ICCID', 'EID', 'IMSI' ]
|
||||
|
||||
# check input parameters, but do nothing concrete yet
|
||||
def _verify_get_data(self, fields: List[str] = [], key: str = 'ICCID', value: str = "") -> Dict[str, str]:
|
||||
"""Verify multiple fields for identified card.
|
||||
|
||||
Args:
|
||||
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
|
||||
key : look-up key to identify card data, such as 'ICCID'
|
||||
value : value for look-up key to identify card data
|
||||
Returns:
|
||||
dictionary of {field : value, ...} strings for each requested field from 'fields'. In case nothing is
|
||||
fond None shall be returned.
|
||||
dictionary of {field, value} strings for each requested field from 'fields'
|
||||
"""
|
||||
|
||||
if key not in self.VALID_KEY_FIELD_NAMES:
|
||||
raise ValueError("Key field name '%s' is not a valid field name, valid field names are: %s" %
|
||||
(key, str(self.VALID_KEY_FIELD_NAMES)))
|
||||
|
||||
return {}
|
||||
|
||||
def get_field(self, field: str, key: str = 'ICCID', value: str = "") -> Optional[str]:
|
||||
"""get a single field from CSV file using a specified key/value pair"""
|
||||
fields = [field]
|
||||
result = self.get(fields, key, value)
|
||||
return result.get(field)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||
"""Get multiple card-individual fields for identified card.
|
||||
|
||||
Args:
|
||||
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
|
||||
key : look-up key to identify card data, such as 'ICCID'
|
||||
value : value for look-up key to identify card data
|
||||
Returns:
|
||||
dictionary of {field, value} strings for each requested field from 'fields'
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return type(self).__name__
|
||||
|
||||
class CardKeyProviderCsv(CardKeyProvider):
|
||||
"""Card key provider implementation that allows to query against a specified CSV file."""
|
||||
"""Card key provider implementation that allows to query against a specified CSV file.
|
||||
Supports column-based encryption as it is generally a bad idea to store cryptographic key material in
|
||||
plaintext. Instead, the key material should be encrypted by a "key-encryption key", occasionally also
|
||||
known as "transport key" (see GSMA FS.28)."""
|
||||
IV = b'\x23' * 16
|
||||
csv_file = None
|
||||
filename = None
|
||||
|
||||
def __init__(self, csv_filename: str, transport_keys: dict):
|
||||
def __init__(self, filename: str, transport_keys: dict):
|
||||
"""
|
||||
Args:
|
||||
csv_filename : file name (path) of CSV file containing card-individual key/data
|
||||
transport_keys : (see class CardKeyFieldCryptor)
|
||||
filename : file name (path) of CSV file containing card-individual key/data
|
||||
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
|
||||
respective field (column) of the CSV. This is done so that different fields
|
||||
(columns) can use different transport keys, which is strongly recommended by
|
||||
GSMA FS.28
|
||||
"""
|
||||
self.csv_file = open(csv_filename, 'r')
|
||||
self.csv_file = open(filename, 'r')
|
||||
if not self.csv_file:
|
||||
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
||||
self.csv_filename = csv_filename
|
||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||
raise RuntimeError("Could not open CSV file '%s'" % filename)
|
||||
self.filename = filename
|
||||
self.transport_keys = self.process_transport_keys(transport_keys)
|
||||
|
||||
@staticmethod
|
||||
def process_transport_keys(transport_keys: dict):
|
||||
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
|
||||
new_dict = {}
|
||||
for name, key in transport_keys.items():
|
||||
if name in CRYPT_GROUPS:
|
||||
for field in CRYPT_GROUPS[name]:
|
||||
new_dict[field] = key
|
||||
else:
|
||||
new_dict[name] = key
|
||||
return new_dict
|
||||
|
||||
def _decrypt_field(self, field_name: str, encrypted_val: str) -> str:
|
||||
"""decrypt a single field, if we have a transport key for the field of that name."""
|
||||
if not field_name in self.transport_keys:
|
||||
return encrypted_val
|
||||
cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, self.IV)
|
||||
return b2h(cipher.decrypt(h2b(encrypted_val)))
|
||||
|
||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||
super()._verify_get_data(fields, key, value)
|
||||
|
||||
self.csv_file.seek(0)
|
||||
cr = csv.DictReader(self.csv_file)
|
||||
if not cr:
|
||||
raise RuntimeError("Could not open DictReader for CSV-File '%s'" % self.csv_filename)
|
||||
raise RuntimeError(
|
||||
"Could not open DictReader for CSV-File '%s'" % self.filename)
|
||||
cr.fieldnames = [field.upper() for field in cr.fieldnames]
|
||||
|
||||
if key not in cr.fieldnames:
|
||||
return None
|
||||
return_dict = {}
|
||||
rc = {}
|
||||
for row in cr:
|
||||
if row[key] == value:
|
||||
for f in fields:
|
||||
if f in row:
|
||||
return_dict.update({f: self.crypt.decrypt_field(f, row[f])})
|
||||
rc.update({f: self._decrypt_field(f, row[f])})
|
||||
else:
|
||||
raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.csv_filename, f))
|
||||
if return_dict == {}:
|
||||
return None
|
||||
return return_dict
|
||||
|
||||
raise RuntimeError("CSV-File '%s' lacks column '%s'" %
|
||||
(self.filename, f))
|
||||
return rc
|
||||
|
||||
|
||||
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
||||
@@ -196,11 +163,11 @@ def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key
|
||||
provider_list : override the list of providers from the global default
|
||||
"""
|
||||
if not isinstance(provider, CardKeyProvider):
|
||||
raise ValueError("provider is not a card data provider")
|
||||
raise ValueError("provider is not a card data provier")
|
||||
provider_list.append(provider)
|
||||
|
||||
|
||||
def card_key_provider_get(fields: list[str], key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
|
||||
def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
|
||||
"""Query all registered card data providers for card-individual [key] data.
|
||||
|
||||
Args:
|
||||
@@ -211,21 +178,17 @@ def card_key_provider_get(fields: list[str], key: str, value: str, provider_list
|
||||
Returns:
|
||||
dictionary of {field, value} strings for each requested field from 'fields'
|
||||
"""
|
||||
key = key.upper()
|
||||
fields = [f.upper() for f in fields]
|
||||
for p in provider_list:
|
||||
if not isinstance(p, CardKeyProvider):
|
||||
raise ValueError("Provider list contains element which is not a card data provider")
|
||||
log.debug("Searching for card key data (key=%s, value=%s, provider=%s)" % (key, value, str(p)))
|
||||
raise ValueError(
|
||||
"provider list contains element which is not a card data provier")
|
||||
result = p.get(fields, key, value)
|
||||
if result:
|
||||
log.debug("Found card data: %s" % (str(result)))
|
||||
return result
|
||||
|
||||
raise ValueError("Unable to find card key data (key=%s, value=%s, fields=%s)" % (key, value, str(fields)))
|
||||
return {}
|
||||
|
||||
|
||||
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> str:
|
||||
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> Optional[str]:
|
||||
"""Query all registered card data providers for a single field.
|
||||
|
||||
Args:
|
||||
@@ -236,7 +199,11 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
|
||||
Returns:
|
||||
dictionary of {field, value} strings for the requested field
|
||||
"""
|
||||
|
||||
fields = [field]
|
||||
result = card_key_provider_get(fields, key, value, card_key_providers)
|
||||
return result.get(field.upper())
|
||||
for p in provider_list:
|
||||
if not isinstance(p, CardKeyProvider):
|
||||
raise ValueError(
|
||||
"provider list contains element which is not a card data provier")
|
||||
result = p.get_field(field, key, value)
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
@@ -112,8 +112,6 @@ class UiccCardBase(SimCardBase):
|
||||
def probe(self) -> bool:
|
||||
# EF.DIR is a mandatory EF on all ICCIDs; however it *may* also exist on a TS 51.011 SIM
|
||||
ef_dir = EF_DIR()
|
||||
# select MF first
|
||||
self.file_exists("3f00")
|
||||
return self.file_exists(ef_dir.fid)
|
||||
|
||||
def read_aids(self) -> List[Hexstr]:
|
||||
|
||||
@@ -316,19 +316,19 @@ class FileList(COMPR_TLV_IE, tag=0x92):
|
||||
_construct = Struct('number_of_files'/Int8ub,
|
||||
'files'/GreedyRange(FileId))
|
||||
|
||||
# TS 102 223 Section 8.19
|
||||
# TS 102 223 Secton 8.19
|
||||
class LocationInformation(COMPR_TLV_IE, tag=0x93):
|
||||
pass
|
||||
|
||||
# TS 102 223 Section 8.20
|
||||
# TS 102 223 Secton 8.20
|
||||
class IMEI(COMPR_TLV_IE, tag=0x94):
|
||||
_construct = BcdAdapter(GreedyBytes)
|
||||
|
||||
# TS 102 223 Section 8.21
|
||||
# TS 102 223 Secton 8.21
|
||||
class HelpRequest(COMPR_TLV_IE, tag=0x95):
|
||||
pass
|
||||
|
||||
# TS 102 223 Section 8.22
|
||||
# TS 102 223 Secton 8.22
|
||||
class NetworkMeasurementResults(COMPR_TLV_IE, tag=0x96):
|
||||
_construct = BcdAdapter(GreedyBytes)
|
||||
|
||||
|
||||
@@ -141,7 +141,7 @@ class SimCardCommands:
|
||||
Returns:
|
||||
Tuple of (decoded_data, sw)
|
||||
"""
|
||||
cmd = cmd_constr.build(cmd_data) if cmd_data else b''
|
||||
cmd = cmd_constr.build(cmd_data) if cmd_data else ''
|
||||
lc = i2h([len(cmd)]) if cmd_data else ''
|
||||
le = '00' if resp_constr else ''
|
||||
pdu = ''.join([cla, ins, p1, p2, lc, b2h(cmd), le])
|
||||
@@ -285,7 +285,7 @@ class SimCardCommands:
|
||||
return self.send_apdu_checksw(self.cla_byte + "a40304")
|
||||
|
||||
def select_adf(self, aid: Hexstr) -> ResTuple:
|
||||
"""Execute SELECT a given Application ADF.
|
||||
"""Execute SELECT a given Applicaiton ADF.
|
||||
|
||||
Args:
|
||||
aid : application identifier as hex string
|
||||
@@ -577,7 +577,7 @@ class SimCardCommands:
|
||||
|
||||
Args:
|
||||
rand : 16 byte random data as hex string (RAND)
|
||||
autn : 8 byte Authentication Token (AUTN)
|
||||
autn : 8 byte Autentication Token (AUTN)
|
||||
context : 16 byte random data ('3g' or 'gsm')
|
||||
"""
|
||||
# 3GPP TS 31.102 Section 7.1.2.1
|
||||
|
||||
@@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
|
||||
block_nr = self.block_nr
|
||||
ciphertext = self._encrypt(padded_data)
|
||||
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
|
||||
block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20])
|
||||
block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext))
|
||||
return ciphertext
|
||||
|
||||
def decrypt(self, data:bytes) -> bytes:
|
||||
@@ -149,20 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC):
|
||||
temp_data = self.mac_chain + tag_and_length + data
|
||||
old_mcv = self.mac_chain
|
||||
c_mac = self._auth(temp_data)
|
||||
|
||||
|
||||
# DEBUG: Show MAC computation details
|
||||
logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
|
||||
logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
|
||||
logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
|
||||
logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
|
||||
logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
|
||||
|
||||
print(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
|
||||
print(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
|
||||
print(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
|
||||
print(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
|
||||
print(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
|
||||
|
||||
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
|
||||
ret = tag_and_length + data + c_mac
|
||||
logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
|
||||
|
||||
print(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
|
||||
|
||||
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
|
||||
tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20])
|
||||
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret))
|
||||
return ret
|
||||
|
||||
def verify(self, ciphertext: bytes) -> bool:
|
||||
@@ -213,11 +213,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
|
||||
initial_mac_chaining_value = out[0:l]
|
||||
s_enc = out[l:2*l]
|
||||
s_mac = out[l*2:3*l]
|
||||
|
||||
logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
|
||||
|
||||
print(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
|
||||
print(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
|
||||
print(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
|
||||
print(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
|
||||
|
||||
return s_enc, s_mac, initial_mac_chaining_value
|
||||
|
||||
@@ -246,21 +246,21 @@ class BspInstance:
|
||||
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||
assert tag <= 255
|
||||
assert len(plaintext) <= self.max_payload_size
|
||||
|
||||
|
||||
# DEBUG: Show what we're processing
|
||||
logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
|
||||
logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
|
||||
|
||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20])
|
||||
print(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
|
||||
print(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
|
||||
print(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
|
||||
print(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
|
||||
|
||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
|
||||
ciphered = self.c_algo.encrypt(plaintext)
|
||||
logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
|
||||
|
||||
print(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
|
||||
|
||||
maced = self.m_algo.auth(tag, ciphered)
|
||||
logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}")
|
||||
|
||||
print(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
|
||||
print(f"BSP_DEBUG: final_result_len: {len(maced)}")
|
||||
|
||||
return maced
|
||||
|
||||
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
|
||||
@@ -282,7 +282,7 @@ class BspInstance:
|
||||
def mac_only_one(self, tag: int, plaintext: bytes) -> bytes:
|
||||
"""MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||
assert tag <= 255
|
||||
assert len(plaintext) <= self.max_payload_size
|
||||
assert len(plaintext) < self.max_payload_size
|
||||
maced = self.m_algo.auth(tag, plaintext)
|
||||
# The data block counter for ICV calculation is incremented also for each segment with C-MAC only.
|
||||
self.c_algo.block_nr += 1
|
||||
|
||||
@@ -116,7 +116,7 @@ class param:
|
||||
pass
|
||||
|
||||
class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||
"""Base class for representing an ES2+ API Function."""
|
||||
"""Base classs for representing an ES2+ API Function."""
|
||||
pass
|
||||
|
||||
# ES2+ DownloadOrder function (SGP.22 section 5.3.1)
|
||||
|
||||
@@ -25,9 +25,6 @@ import pySim.esim.rsp as rsp
|
||||
from pySim.esim.bsp import BspInstance
|
||||
from pySim.esim import PMO
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
|
||||
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
|
||||
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
|
||||
@@ -76,11 +73,10 @@ def gen_replace_session_keys(ppk_enc: bytes, ppk_cmac: bytes, initial_mcv: bytes
|
||||
class ProfileMetadata:
|
||||
"""Representation of Profile metadata. Right now only the mandatory bits are
|
||||
supported, but in general this should follow the StoreMetadataRequest of SGP.22 5.5.3"""
|
||||
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str, profile_class = 'operational'):
|
||||
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str):
|
||||
self.iccid_bin = iccid_bin
|
||||
self.spn = spn
|
||||
self.profile_name = profile_name
|
||||
self.profile_class = profile_class
|
||||
self.icon = None
|
||||
self.icon_type = None
|
||||
self.notifications = []
|
||||
@@ -106,14 +102,6 @@ class ProfileMetadata:
|
||||
'serviceProviderName': self.spn,
|
||||
'profileName': self.profile_name,
|
||||
}
|
||||
if self.profile_class == 'test':
|
||||
smr['profileClass'] = 0
|
||||
elif self.profile_class == 'provisioning':
|
||||
smr['profileClass'] = 1
|
||||
elif self.profile_class == 'operational':
|
||||
smr['profileClass'] = 2
|
||||
else:
|
||||
raise ValueError('Unsupported Profile Class %s' % self.profile_class)
|
||||
if self.icon:
|
||||
smr['icon'] = self.icon
|
||||
smr['iconType'] = self.icon_type
|
||||
@@ -208,12 +196,12 @@ class BoundProfilePackage(ProfilePackage):
|
||||
# 'initialiseSecureChannelRequest'
|
||||
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
|
||||
# firstSequenceOf87
|
||||
logger.debug("BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
|
||||
print(f"BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
|
||||
print(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
|
||||
print(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
|
||||
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
|
||||
# sequenceOF88
|
||||
logger.debug("BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
|
||||
print(f"BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
|
||||
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
|
||||
|
||||
if self.ppp: # we have to use session keys
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""GSMA eSIM RSP ES9+ interface according to SGP.22 v2.5"""
|
||||
"""GSMA eSIM RSP ES9+ interface according ot SGP.22 v2.5"""
|
||||
|
||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
|
||||
@@ -159,7 +159,7 @@ class ApiError(Exception):
|
||||
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
|
||||
|
||||
class JsonHttpApiFunction(abc.ABC):
|
||||
"""Base class for representing an HTTP[s] API Function."""
|
||||
"""Base classs for representing an HTTP[s] API Function."""
|
||||
# the below class variables are expected to be overridden in derived classes
|
||||
|
||||
path = None
|
||||
|
||||
@@ -90,61 +90,13 @@ class RspSessionState:
|
||||
# FIXME: how to add the public key from smdp_otpk to an instance of EllipticCurvePrivateKey?
|
||||
del state['_smdp_otsk']
|
||||
del state['_smdp_ot_curve']
|
||||
# automatically recover all the remaining state
|
||||
# automatically recover all the remainig state
|
||||
self.__dict__.update(state)
|
||||
|
||||
|
||||
class RspSessionStore:
|
||||
"""A wrapper around the database-backed storage 'shelve' for storing RspSessionState objects.
|
||||
Can be configured to use either file-based storage or in-memory storage.
|
||||
We use it to store RspSessionState objects indexed by transactionId."""
|
||||
|
||||
def __init__(self, filename: Optional[str] = None, in_memory: bool = False):
|
||||
self._in_memory = in_memory
|
||||
|
||||
if in_memory:
|
||||
self._shelf = shelve.Shelf(dict())
|
||||
else:
|
||||
if filename is None:
|
||||
raise ValueError("filename is required for file-based session store")
|
||||
self._shelf = shelve.open(filename)
|
||||
|
||||
# dunder magic
|
||||
def __getitem__(self, key):
|
||||
return self._shelf[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._shelf[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self._shelf[key]
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self._shelf
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._shelf)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._shelf)
|
||||
|
||||
# everything else
|
||||
def __getattr__(self, name):
|
||||
"""Delegate attribute access to the underlying shelf object."""
|
||||
return getattr(self._shelf, name)
|
||||
|
||||
def close(self):
|
||||
"""Close the session store."""
|
||||
if hasattr(self._shelf, 'close'):
|
||||
self._shelf.close()
|
||||
if self._in_memory:
|
||||
# For in-memory store, clear the reference
|
||||
self._shelf = None
|
||||
|
||||
def sync(self):
|
||||
"""Synchronize the cache with the underlying storage."""
|
||||
if hasattr(self._shelf, 'sync'):
|
||||
self._shelf.sync()
|
||||
class RspSessionStore(shelve.DbfilenameShelf):
|
||||
"""A derived class as wrapper around the database-backed non-volatile storage 'shelve', in case we might
|
||||
need to extend it in the future. We use it to store RspSessionState objects indexed by transactionId."""
|
||||
|
||||
def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes:
|
||||
"""Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This
|
||||
|
||||
@@ -334,7 +334,7 @@ class File:
|
||||
self.fill_pattern = pefi['fillPattern']
|
||||
self.fill_pattern_repeat = False
|
||||
elif fdb_dec['file_type'] == 'df':
|
||||
# only set it, if an earlier call to from_template() didn't already set it, as
|
||||
# only set it, if an earlier call to from_template() didn't alrady set it, as
|
||||
# the template can differentiate between MF, DF and ADF (unlike FDB)
|
||||
if not self.file_type:
|
||||
self.file_type = 'DF'
|
||||
@@ -427,7 +427,7 @@ class File:
|
||||
|
||||
class ProfileElement:
|
||||
"""Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly,
|
||||
but it's more likely sub-classed with a specific class for the specific profile element type, like e.g
|
||||
but ist more likely sub-classed with a specific class for the specific profile element type, like e.g
|
||||
ProfileElementHeader, ProfileElementMF, ...
|
||||
"""
|
||||
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
|
||||
@@ -440,7 +440,7 @@ class ProfileElement:
|
||||
'genericFileManagement': 'gfm-header',
|
||||
'akaParameter': 'aka-header',
|
||||
'cdmaParameter': 'cdma-header',
|
||||
# note how they couldn't even consistently capitalize the 'header' suffix :(
|
||||
# note how they couldn't even consistently captialize the 'header' suffix :(
|
||||
'application': 'app-Header',
|
||||
'pukCodes': 'puk-Header',
|
||||
'pinCodes': 'pin-Header',
|
||||
@@ -628,7 +628,7 @@ class FsProfileElement(ProfileElement):
|
||||
# this is a template that belongs into the [A]DF of another template
|
||||
# 1) find the PE for the referenced template
|
||||
parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid)
|
||||
# 2) resolve the [A]DF that forms the base of that parent PE
|
||||
# 2) resolve te [A]DF that forms the base of that parent PE
|
||||
pe_df = parent_pe.files[template.parent.base_df().pe_name].node
|
||||
self.pe_sequence.cur_df = pe_df
|
||||
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
|
||||
@@ -649,7 +649,7 @@ class FsProfileElement(ProfileElement):
|
||||
self.add_file(file)
|
||||
|
||||
def create_file(self, pename: str) -> File:
|
||||
"""Programmatically create a file by its PE-Name."""
|
||||
"""Programatically create a file by its PE-Name."""
|
||||
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
||||
file = File(pename, None, template.files_by_pename.get(pename, None))
|
||||
self.add_file(file)
|
||||
@@ -1409,7 +1409,7 @@ class ProfileElementHeader(ProfileElement):
|
||||
iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None,
|
||||
**kwargs):
|
||||
"""You would usually initialize an instance either with a "decoded" argument (as read from
|
||||
a DER-encoded SAIP file via asn1tools), or [some of] the other arguments in case you're
|
||||
a DER-encoded SAIP file via asn1tools), or [some of] the othe arguments in case you're
|
||||
constructing a Profile Header from scratch.
|
||||
|
||||
Args:
|
||||
@@ -1562,7 +1562,7 @@ class ProfileElementSequence:
|
||||
|
||||
def _rebuild_pes_by_naa(self) -> None:
|
||||
"""rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form,
|
||||
which basically means for every NAA there's a list of instances, and each consists
|
||||
which basically means for every NAA there's a lsit of instances, and each consists
|
||||
of a list of a list of PEs."""
|
||||
self.pres_by_naa = {}
|
||||
petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end']
|
||||
@@ -1690,7 +1690,7 @@ class ProfileElementSequence:
|
||||
i += 1
|
||||
|
||||
def get_index_by_pe(self, pe: ProfileElement) -> int:
|
||||
"""Return a list with the indices of all instances of PEs of petype."""
|
||||
"""Return a list with the indicies of all instances of PEs of petype."""
|
||||
ret = []
|
||||
i = 0
|
||||
for cur in self.pe_list:
|
||||
@@ -1711,7 +1711,7 @@ class ProfileElementSequence:
|
||||
self.insert_at_index(idx+1, pe_new)
|
||||
|
||||
def get_index_by_type(self, petype: str) -> List[int]:
|
||||
"""Return a list with the indices of all instances of PEs of petype."""
|
||||
"""Return a list with the indicies of all instances of PEs of petype."""
|
||||
ret = []
|
||||
i = 0
|
||||
for pe in self.pe_list:
|
||||
@@ -1736,7 +1736,7 @@ class ProfileElementSequence:
|
||||
for service in naa.mandatory_services:
|
||||
if service in hdr.decoded['eUICC-Mandatory-services']:
|
||||
del hdr.decoded['eUICC-Mandatory-services'][service]
|
||||
# remove any associated mandatory filesystem templates
|
||||
# remove any associaed mandatory filesystem templates
|
||||
for template in naa.templates:
|
||||
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
|
||||
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
|
||||
@@ -2040,8 +2040,7 @@ class FsNodeADF(FsNodeDF):
|
||||
super().__init__(fid, parent, file, name)
|
||||
|
||||
def __str__(self):
|
||||
# self.df_name is usually None for an ADF like ADF.USIM or ADF.ISIM so we need to guard against it
|
||||
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name) if self.df_name else None)
|
||||
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name))
|
||||
|
||||
class FsNodeMF(FsNodeDF):
|
||||
"""The MF (Master File) in the filesystem hierarchy."""
|
||||
|
||||
@@ -68,7 +68,7 @@ class CheckBasicStructure(ProfileConstraintChecker):
|
||||
|
||||
def check_optional_ordering(self, pes: ProfileElementSequence):
|
||||
"""Check the ordering of optional PEs following the respective mandatory ones."""
|
||||
# ordering and required dependencies
|
||||
# ordering and required depenencies
|
||||
self._is_after_if_exists(pes,'opt-usim', 'usim')
|
||||
self._is_after_if_exists(pes,'opt-isim', 'isim')
|
||||
self._is_after_if_exists(pes,'gsm-access', 'usim')
|
||||
|
||||
@@ -149,7 +149,7 @@ class CertificateSet:
|
||||
check_signed(c, self.root_cert)
|
||||
return
|
||||
parent_cert = self.intermediate_certs.get(aki, None)
|
||||
if not parent_cert:
|
||||
if not aki:
|
||||
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki))
|
||||
check_signed(c, parent_cert)
|
||||
# if we reach here, we passed (no exception raised)
|
||||
|
||||
@@ -39,7 +39,7 @@ from osmocom.utils import h2b, b2h, is_hex, auto_int, auto_uint8, auto_uint16, i
|
||||
from osmocom.tlv import bertlv_parse_one
|
||||
from osmocom.construct import filter_dict, parse_construct, build_construct
|
||||
|
||||
from pySim.utils import sw_match, decomposeATR
|
||||
from pySim.utils import sw_match
|
||||
from pySim.jsonpath import js_path_modify
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.exceptions import SwMatchError
|
||||
@@ -86,7 +86,7 @@ class CardFile:
|
||||
self.service = service
|
||||
self.shell_commands = [] # type: List[CommandSet]
|
||||
|
||||
# Note: the basic properties (fid, name, etc.) are verified when
|
||||
# Note: the basic properties (fid, name, ect.) are verified when
|
||||
# the file is attached to a parent file. See method add_file() in
|
||||
# class Card DF
|
||||
|
||||
@@ -266,7 +266,7 @@ class CardFile:
|
||||
def get_profile(self):
|
||||
"""Get the profile associated with this file. If this file does not have any
|
||||
profile assigned, try to find a file above (usually the MF) in the filesystem
|
||||
hierarchy that has a profile assigned
|
||||
hirarchy that has a profile assigned
|
||||
"""
|
||||
|
||||
# If we have a profile set, return it
|
||||
@@ -679,7 +679,7 @@ class TransparentEF(CardEF):
|
||||
Args:
|
||||
fid : File Identifier (4 hex digits)
|
||||
sfid : Short File Identifier (2 hex digits, optional)
|
||||
name : Brief name of the file, like EF_ICCID
|
||||
name : Brief name of the file, lik EF_ICCID
|
||||
desc : Description of the file
|
||||
parent : Parent CardFile object within filesystem hierarchy
|
||||
size : tuple of (minimum_size, recommended_size)
|
||||
@@ -982,11 +982,11 @@ class LinFixedEF(CardEF):
|
||||
Args:
|
||||
fid : File Identifier (4 hex digits)
|
||||
sfid : Short File Identifier (2 hex digits, optional)
|
||||
name : Brief name of the file, like EF_ICCID
|
||||
name : Brief name of the file, lik EF_ICCID
|
||||
desc : Description of the file
|
||||
parent : Parent CardFile object within filesystem hierarchy
|
||||
rec_len : Tuple of (minimum_length, recommended_length)
|
||||
leftpad: On write, data must be padded from the left to fit physical record length
|
||||
leftpad: On write, data must be padded from the left to fit pysical record length
|
||||
"""
|
||||
super().__init__(fid=fid, sfid=sfid, name=name, desc=desc, parent=parent, **kwargs)
|
||||
self.rec_len = rec_len
|
||||
@@ -1422,7 +1422,7 @@ class BerTlvEF(CardEF):
|
||||
Args:
|
||||
fid : File Identifier (4 hex digits)
|
||||
sfid : Short File Identifier (2 hex digits, optional)
|
||||
name : Brief name of the file, like EF_ICCID
|
||||
name : Brief name of the file, lik EF_ICCID
|
||||
desc : Description of the file
|
||||
parent : Parent CardFile object within filesystem hierarchy
|
||||
size : tuple of (minimum_size, recommended_size)
|
||||
@@ -1455,7 +1455,7 @@ class BerTlvEF(CardEF):
|
||||
export_str += "delete_all\n"
|
||||
for t in tags:
|
||||
result = lchan.retrieve_data(t)
|
||||
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
|
||||
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
|
||||
export_str += ("set_data 0x%02x %s\n" % (t, b2h(val)))
|
||||
return export_str.strip()
|
||||
|
||||
@@ -1495,7 +1495,7 @@ class CardApplication:
|
||||
self.name = name
|
||||
self.adf = adf
|
||||
self.sw = sw or {}
|
||||
# back-reference from ADF to Application
|
||||
# back-reference from ADF to Applicaiton
|
||||
if self.adf:
|
||||
self.aid = aid or self.adf.aid
|
||||
self.adf.application = self
|
||||
@@ -1545,13 +1545,6 @@ class CardModel(abc.ABC):
|
||||
if atr == card_atr:
|
||||
print("Detected CardModel:", cls.__name__)
|
||||
return True
|
||||
# if nothing found try to just compare the Historical Bytes of the ATR
|
||||
card_atr_hb = decomposeATR(card_atr)['hb']
|
||||
for atr in cls._atrs:
|
||||
atr_hb = decomposeATR(atr)['hb']
|
||||
if atr_hb == card_atr_hb:
|
||||
print("Detected CardModel:", cls.__name__)
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
@@ -1572,7 +1565,7 @@ class Path:
|
||||
p = p.split('/')
|
||||
elif len(p) and isinstance(p[0], int):
|
||||
p = ['%04x' % x for x in p]
|
||||
# make sure internal representation always is uppercase only
|
||||
# make sure internal representation alwas is uppercase only
|
||||
self.list = [x.upper() for x in p]
|
||||
|
||||
def __str__(self) -> str:
|
||||
|
||||
@@ -627,7 +627,7 @@ class ADF_SD(CardADF):
|
||||
kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b''
|
||||
kcv = b2h(kcv_bin)
|
||||
if self._cmd.lchan.scc.scp:
|
||||
# encrypted key data with DEK of current SCP
|
||||
# encrypte key data with DEK of current SCP
|
||||
kcb = b2h(self._cmd.lchan.scc.scp.encrypt_key(h2b(opts.key_data[i])))
|
||||
else:
|
||||
# (for example) during personalization, DEK might not be required)
|
||||
@@ -755,7 +755,7 @@ class ADF_SD(CardADF):
|
||||
|
||||
inst_load_parser = argparse.ArgumentParser()
|
||||
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
|
||||
help='AID of the loaded file')
|
||||
help='AID of the loded file')
|
||||
inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='',
|
||||
help='AID of the Security Domain into which the file shalle be added')
|
||||
inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='',
|
||||
@@ -845,7 +845,7 @@ class ADF_SD(CardADF):
|
||||
# TODO:tune chunk_len based on the overhead of the used SCP?
|
||||
# build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
|
||||
remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
|
||||
# transfer this in various chunks to the card
|
||||
# transfer this in vaious chunks to the card
|
||||
total_size = len(remainder)
|
||||
block_nr = 0
|
||||
while len(remainder):
|
||||
|
||||
@@ -104,4 +104,4 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
||||
# KID 0x02: SK.CASD.AUT (PK) and KS.CASD.AUT (Non-PK)
|
||||
# KID 0x03: SK.CASD.CT (P) and KS.CASD.CT (Non-PK)
|
||||
# KVN 0x75 KID 0x01: 16-byte DES key for Ciphered Load File Data Block
|
||||
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s support
|
||||
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s upport
|
||||
|
||||
@@ -97,7 +97,7 @@ class CapFile():
|
||||
raise ValueError("invalid cap file, %s missing!" % required_components[component])
|
||||
|
||||
def get_loadfile(self) -> bytes:
|
||||
"""Get the executable loadfile as hexstring"""
|
||||
"""Get the executeable loadfile as hexstring"""
|
||||
# Concatenate all cap file components in the specified order
|
||||
# see also: Java Card Platform Virtual Machine Specification, v3.2, section 6.3
|
||||
loadfile = self.__component['Header']
|
||||
|
||||
@@ -495,7 +495,7 @@ class IsimCard(UiccCardBase):
|
||||
|
||||
class MagicSimBase(abc.ABC, SimCard):
|
||||
"""
|
||||
These cards uses several record based EFs to store the provider infos,
|
||||
Theses cards uses several record based EFs to store the provider infos,
|
||||
each possible provider uses a specific record number in each EF. The
|
||||
indexes used are ( where N is the number of providers supported ) :
|
||||
- [2 .. N+1] for the operator name
|
||||
@@ -644,7 +644,7 @@ class MagicSim(MagicSimBase):
|
||||
|
||||
class FakeMagicSim(SimCard):
|
||||
"""
|
||||
These cards have a record based EF 3f00/000c that contains the provider
|
||||
Theses cards have a record based EF 3f00/000c that contains the provider
|
||||
information. See the program method for its format. The records go from
|
||||
1 to N.
|
||||
"""
|
||||
|
||||
@@ -296,7 +296,7 @@ def dec_addr_tlv(hexstr):
|
||||
|
||||
elif addr_type == 0x01: # IPv4
|
||||
# Skip address tye byte i.e. first byte in value list
|
||||
# Skip the unused byte in Octet 4 after address type byte as per 3GPP TS 31.102
|
||||
# Skip the unused byte in Octect 4 after address type byte as per 3GPP TS 31.102
|
||||
ipv4 = tlv[2][2:]
|
||||
content = '.'.join(str(x) for x in ipv4)
|
||||
return (content, '01')
|
||||
|
||||
125
pySim/log.py
125
pySim/log.py
@@ -1,125 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" pySim: Logging
|
||||
"""
|
||||
|
||||
#
|
||||
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import logging
|
||||
from cmd2 import style
|
||||
|
||||
class _PySimLogHandler(logging.Handler):
|
||||
def __init__(self, log_callback):
|
||||
super().__init__()
|
||||
self.log_callback = log_callback
|
||||
|
||||
def emit(self, record):
|
||||
formatted_message = self.format(record)
|
||||
self.log_callback(formatted_message, record)
|
||||
|
||||
class PySimLogger:
|
||||
"""
|
||||
Static class to centralize the log output of PySim applications. This class can be used to print log messages from
|
||||
any pySim module. Configuration of the log behaviour (see setup and set_ methods) is entirely optional. In case no
|
||||
print callback is set (see setup method), the logger will pass the log messages directly to print() without applying
|
||||
any formatting to the original log message.
|
||||
"""
|
||||
|
||||
LOG_FMTSTR = "%(levelname)s: %(message)s"
|
||||
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
|
||||
__formatter = logging.Formatter(LOG_FMTSTR)
|
||||
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
|
||||
|
||||
# No print callback by default, means that log messages are passed directly to print()
|
||||
print_callback = None
|
||||
|
||||
# No specific color scheme by default
|
||||
colors = {}
|
||||
|
||||
# The logging default is non-verbose logging on logging level DEBUG. This is a safe default that works for
|
||||
# applications that ignore the presence of the PySimLogger class.
|
||||
verbose = False
|
||||
logging.root.setLevel(logging.DEBUG)
|
||||
|
||||
def __init__(self):
|
||||
raise RuntimeError('static class, do not instantiate')
|
||||
|
||||
@staticmethod
|
||||
def setup(print_callback = None, colors:dict = {}):
|
||||
"""
|
||||
Set a print callback function and color scheme. This function call is optional. In case this method is not
|
||||
called, default settings apply.
|
||||
Args:
|
||||
print_callback : A callback function that accepts the resulting log string as input. The callback should
|
||||
have the following format: print_callback(message:str)
|
||||
colors : An optional dict through which certain log levels can be assigned a color.
|
||||
(e.g. {logging.WARN: YELLOW})
|
||||
"""
|
||||
PySimLogger.print_callback = print_callback
|
||||
PySimLogger.colors = colors
|
||||
|
||||
@staticmethod
|
||||
def set_verbose(verbose:bool = False):
|
||||
"""
|
||||
Enable/disable verbose logging. (has no effect in case no print callback is set, see method setup)
|
||||
Args:
|
||||
verbose: verbosity (True = verbose logging, False = normal logging)
|
||||
"""
|
||||
PySimLogger.verbose = verbose;
|
||||
|
||||
@staticmethod
|
||||
def set_level(level:int = logging.DEBUG):
|
||||
"""
|
||||
Set the logging level.
|
||||
Args:
|
||||
level: Logging level, valis log leves are: DEBUG, INFO, WARNING, ERROR and CRITICAL
|
||||
"""
|
||||
logging.root.setLevel(level)
|
||||
|
||||
@staticmethod
|
||||
def _log_callback(message, record):
|
||||
if not PySimLogger.print_callback:
|
||||
# In case no print callback has been set display the message as if it were printed trough a normal
|
||||
# python print statement.
|
||||
print(record.message)
|
||||
else:
|
||||
# When a print callback is set, use it to display the log line. Apply color if the API user chose one
|
||||
if PySimLogger.verbose:
|
||||
formatted_message = logging.Formatter.format(PySimLogger.__formatter_verbose, record)
|
||||
else:
|
||||
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
|
||||
color = PySimLogger.colors.get(record.levelno)
|
||||
if color:
|
||||
PySimLogger.print_callback(style(formatted_message, fg = color))
|
||||
else:
|
||||
PySimLogger.print_callback(formatted_message)
|
||||
|
||||
@staticmethod
|
||||
def get(log_facility: str):
|
||||
"""
|
||||
Set up and return a new python logger object
|
||||
Args:
|
||||
log_facility : Name of log facility (e.g. "MAIN", "RUNTIME"...)
|
||||
"""
|
||||
logger = logging.getLogger(log_facility)
|
||||
handler = _PySimLogHandler(log_callback=PySimLogger._log_callback)
|
||||
logger.addHandler(handler)
|
||||
return logger
|
||||
@@ -110,7 +110,7 @@ class CardProfile:
|
||||
@abc.abstractmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
"""Try to see if the specific profile matches the card. This method is a
|
||||
placeholder that is overloaded by specific derived classes. The method
|
||||
placeholder that is overloaded by specific dirived classes. The method
|
||||
actively probes the card to make sure the profile class matches the
|
||||
physical card. This usually also means that the card is reset during
|
||||
the process, so this method must not be called at random times. It may
|
||||
|
||||
@@ -23,9 +23,6 @@ from osmocom.tlv import bertlv_parse_one
|
||||
|
||||
from pySim.exceptions import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.log import PySimLogger
|
||||
|
||||
log = PySimLogger.get("RUNTIME")
|
||||
|
||||
def lchan_nr_from_cla(cla: int) -> int:
|
||||
"""Resolve the logical channel number from the CLA byte."""
|
||||
@@ -47,7 +44,6 @@ class RuntimeState:
|
||||
card : pysim.cards.Card instance
|
||||
profile : CardProfile instance
|
||||
"""
|
||||
|
||||
self.mf = CardMF(profile=profile)
|
||||
self.card = card
|
||||
self.profile = profile
|
||||
@@ -64,13 +60,10 @@ class RuntimeState:
|
||||
self.card.set_apdu_parameter(
|
||||
cla=self.profile.cla, sel_ctrl=self.profile.sel_ctrl)
|
||||
|
||||
# make sure MF is selected before probing for Addons
|
||||
self.lchan[0].select('MF')
|
||||
|
||||
for addon_cls in self.profile.addons:
|
||||
addon = addon_cls()
|
||||
if addon.probe(self.card):
|
||||
log.info("Detected %s Add-on \"%s\"" % (self.profile, addon))
|
||||
print("Detected %s Add-on \"%s\"" % (self.profile, addon))
|
||||
for f in addon.files_in_mf:
|
||||
self.mf.add_file(f)
|
||||
|
||||
@@ -104,18 +97,18 @@ class RuntimeState:
|
||||
apps_taken = []
|
||||
if aids_card:
|
||||
aids_taken = []
|
||||
log.info("AIDs on card:")
|
||||
print("AIDs on card:")
|
||||
for a in aids_card:
|
||||
for f in apps_profile:
|
||||
if f.aid in a:
|
||||
log.info(" %s: %s (EF.DIR)" % (f.name, a))
|
||||
print(" %s: %s (EF.DIR)" % (f.name, a))
|
||||
aids_taken.append(a)
|
||||
apps_taken.append(f)
|
||||
aids_unknown = set(aids_card) - set(aids_taken)
|
||||
for a in aids_unknown:
|
||||
log.info(" unknown: %s (EF.DIR)" % a)
|
||||
print(" unknown: %s (EF.DIR)" % a)
|
||||
else:
|
||||
log.warn("EF.DIR seems to be empty!")
|
||||
print("warning: EF.DIR seems to be empty!")
|
||||
|
||||
# Some card applications may not be registered in EF.DIR, we will actively
|
||||
# probe for those applications
|
||||
@@ -130,7 +123,7 @@ class RuntimeState:
|
||||
_data, sw = self.card.select_adf_by_aid(f.aid)
|
||||
self.selected_adf = f
|
||||
if sw == "9000":
|
||||
log.info(" %s: %s" % (f.name, f.aid))
|
||||
print(" %s: %s" % (f.name, f.aid))
|
||||
apps_taken.append(f)
|
||||
except (SwMatchError, ProtocolError):
|
||||
pass
|
||||
@@ -154,7 +147,7 @@ class RuntimeState:
|
||||
# select MF to reset internal state and to verify card really works
|
||||
self.lchan[0].select('MF', cmd_app)
|
||||
self.lchan[0].selected_adf = None
|
||||
# store ATR as part of our card identities dict
|
||||
# store ATR as part of our card identies dict
|
||||
self.identity['ATR'] = atr
|
||||
return atr
|
||||
|
||||
@@ -328,7 +321,7 @@ class RuntimeLchan:
|
||||
# If we succeed, we know that the file exists on the card and we may
|
||||
# proceed with creating a new CardEF object in the local file model at
|
||||
# run time. In case the file does not exist on the card, we just abort.
|
||||
# The state on the card (selected file/application) won't be changed,
|
||||
# The state on the card (selected file/application) wont't be changed,
|
||||
# so we do not have to update any state in that case.
|
||||
(data, _sw) = self.scc.select_file(fid)
|
||||
except SwMatchError as swm:
|
||||
@@ -514,47 +507,6 @@ class RuntimeLchan:
|
||||
dec_data = self.selected_file.decode_hex(data)
|
||||
return (dec_data, sw)
|
||||
|
||||
def __get_writeable_size(self):
|
||||
""" Determine the writable size (file or record) using the cached FCP parameters of the currently selected
|
||||
file. Return None in case the writeable size cannot be determined (no FCP available, FCP lacks size
|
||||
information).
|
||||
"""
|
||||
fcp = self.selected_file_fcp
|
||||
if not fcp:
|
||||
return None
|
||||
|
||||
structure = fcp.get('file_descriptor', {}).get('file_descriptor_byte', {}).get('structure')
|
||||
if not structure:
|
||||
return None
|
||||
|
||||
if structure == 'transparent':
|
||||
return fcp.get('file_size')
|
||||
elif structure == 'linear_fixed':
|
||||
return fcp.get('file_descriptor', {}).get('record_len')
|
||||
else:
|
||||
return None
|
||||
|
||||
def __check_writeable_size(self, data_len):
|
||||
""" Guard against unsuccessful writes caused by attempts to write data that exceeds the file limits. """
|
||||
|
||||
writeable_size = self.__get_writeable_size()
|
||||
if not writeable_size:
|
||||
return
|
||||
|
||||
if isinstance(self.selected_file, TransparentEF):
|
||||
writeable_name = "file"
|
||||
elif isinstance(self.selected_file, LinFixedEF):
|
||||
writeable_name = "record"
|
||||
else:
|
||||
writeable_name = "object"
|
||||
|
||||
if data_len > writeable_size:
|
||||
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
|
||||
(data_len, writeable_name, writeable_size, data_len - writeable_size))
|
||||
elif data_len < writeable_size:
|
||||
log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
|
||||
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
|
||||
|
||||
def update_binary(self, data_hex: str, offset: int = 0):
|
||||
"""Update transparent EF binary data.
|
||||
|
||||
@@ -565,7 +517,6 @@ class RuntimeLchan:
|
||||
if not isinstance(self.selected_file, TransparentEF):
|
||||
raise TypeError("Only works with TransparentEF, but %s is %s" % (self.selected_file,
|
||||
self.selected_file.__class__.__mro__))
|
||||
self.__check_writeable_size(len(data_hex) // 2 + offset)
|
||||
return self.scc.update_binary(self.selected_file.fid, data_hex, offset, conserve=self.rs.conserve_write)
|
||||
|
||||
def update_binary_dec(self, data: dict):
|
||||
@@ -613,7 +564,6 @@ class RuntimeLchan:
|
||||
if not isinstance(self.selected_file, LinFixedEF):
|
||||
raise TypeError("Only works with Linear Fixed EF, but %s is %s" % (self.selected_file,
|
||||
self.selected_file.__class__.__mro__))
|
||||
self.__check_writeable_size(len(data_hex) // 2)
|
||||
return self.scc.update_record(self.selected_file.fid, rec_nr, data_hex,
|
||||
conserve=self.rs.conserve_write,
|
||||
leftpad=self.selected_file.leftpad)
|
||||
|
||||
@@ -32,7 +32,7 @@ class SecureChannel(abc.ABC):
|
||||
pass
|
||||
|
||||
def send_apdu_wrapper(self, send_fn: callable, pdu: Hexstr, *args, **kwargs) -> ResTuple:
|
||||
"""Wrapper function to wrap command APDU and unwrap response APDU around send_apdu callable."""
|
||||
"""Wrapper function to wrap command APDU and unwrap repsonse APDU around send_apdu callable."""
|
||||
pdu_wrapped = b2h(self.wrap_cmd_apdu(h2b(pdu)))
|
||||
res, sw = send_fn(pdu_wrapped, *args, **kwargs)
|
||||
res_unwrapped = b2h(self.unwrap_rsp_apdu(h2b(sw), h2b(res)))
|
||||
|
||||
@@ -200,7 +200,7 @@ class LinkBase(abc.ABC):
|
||||
# It *was* successful after all -- the extra pieces FETCH handled
|
||||
# need not concern the caller.
|
||||
rv = (rv[0], '9000')
|
||||
# proactive sim as per TS 102 221 Section 7.4.2
|
||||
# proactive sim as per TS 102 221 Setion 7.4.2
|
||||
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
|
||||
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
|
||||
# Setting this in case we later decide not to send a terminal
|
||||
@@ -228,7 +228,7 @@ class LinkBase(abc.ABC):
|
||||
# Structure as per TS 102 223 V4.4.0 Section 6.8
|
||||
|
||||
# Testing hint: The value of tail does not influence the behavior
|
||||
# of an SJA2 that sent an SMS, so this is implemented only
|
||||
# of an SJA2 that sent ans SMS, so this is implemented only
|
||||
# following TS 102 223, and not fully tested.
|
||||
ti_list_bin = [x.to_tlv() for x in ti_list]
|
||||
tail = b''.join(ti_list_bin)
|
||||
|
||||
@@ -208,7 +208,7 @@ EF_5G_PROSE_ST_map = {
|
||||
5: '5G ProSe configuration data for usage information reporting',
|
||||
}
|
||||
|
||||
# Mapping between USIM Enabled Service Number and its description
|
||||
# Mapping between USIM Enbled Service Number and its description
|
||||
EF_EST_map = {
|
||||
1: 'Fixed Dialling Numbers (FDN)',
|
||||
2: 'Barred Dialling Numbers (BDN)',
|
||||
|
||||
@@ -119,7 +119,7 @@ class EF_AC_GBAUAPI(LinFixedEF):
|
||||
"""The use of this EF is eescribed in 3GPP TS 31.130"""
|
||||
class AppletNafAccessControl(BER_TLV_IE, tag=0x80):
|
||||
# the use of Int8ub as length field in Prefixed is strictly speaking incorrect, as it is a BER-TLV
|
||||
# length field which will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
|
||||
# length field whihc will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
|
||||
# safely be assumed shorter than that
|
||||
_construct = Struct('aid'/Prefixed(Int8ub, GreedyBytes),
|
||||
'naf_id'/Prefixed(Int8ub, GreedyBytes))
|
||||
|
||||
@@ -1007,7 +1007,7 @@ class EF_ICCID(TransparentEF):
|
||||
def _encode_hex(self, abstract, **kwargs):
|
||||
return enc_iccid(abstract['iccid'])
|
||||
|
||||
# TS 102 221 Section 13.3 / TS 31.101 Section 13 / TS 51.011 Section 10.1.2
|
||||
# TS 102 221 Section 13.3 / TS 31.101 Secction 13 / TS 51.011 Section 10.1.2
|
||||
class EF_PL(TransRecEF):
|
||||
_test_de_encode = [
|
||||
( '6465', "de" ),
|
||||
|
||||
139
pySim/utils.py
139
pySim/utils.py
@@ -15,7 +15,6 @@ from osmocom.tlv import bertlv_encode_tag, bertlv_encode_len
|
||||
|
||||
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
|
||||
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
|
||||
# Copyright (C) 2009-2022 Ludovic Rousseau
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
@@ -331,7 +330,7 @@ def derive_mnc(digit1: int, digit2: int, digit3: int = 0x0f) -> int:
|
||||
mnc = 0
|
||||
|
||||
# 3-rd digit is optional for the MNC. If present
|
||||
# the algorithm is the same as for the MCC.
|
||||
# the algorythm is the same as for the MCC.
|
||||
if digit3 != 0x0f:
|
||||
return derive_mcc(digit1, digit2, digit3)
|
||||
|
||||
@@ -411,7 +410,7 @@ def get_addr_type(addr):
|
||||
|
||||
fqdn_flag = True
|
||||
for i in addr_list:
|
||||
# Only Alphanumeric characters and hyphen - RFC 1035
|
||||
# Only Alpha-numeric characters and hyphen - RFC 1035
|
||||
import re
|
||||
if not re.match("^[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)?$", i):
|
||||
fqdn_flag = False
|
||||
@@ -477,7 +476,7 @@ def expand_hex(hexstring, length):
|
||||
"""Expand a given hexstring to a specified length by replacing "." or ".."
|
||||
with a filler that is derived from the neighboring nibbles respective
|
||||
bytes. Usually this will be the nibble respective byte before "." or
|
||||
"..", except when the string begins with "." or "..", then the nibble
|
||||
"..", execpt when the string begins with "." or "..", then the nibble
|
||||
respective byte after "." or ".." is used.". In case the string cannot
|
||||
be expanded for some reason, the input string is returned unmodified.
|
||||
|
||||
@@ -586,138 +585,10 @@ def parse_command_apdu(apdu: bytes) -> int:
|
||||
raise ValueError('invalid APDU (%s), too short!' % b2h(apdu))
|
||||
|
||||
|
||||
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
|
||||
def normalizeATR(atr):
|
||||
"""Transform an ATR in list of integers.
|
||||
valid input formats are
|
||||
"3B A7 00 40 18 80 65 A2 08 01 01 52"
|
||||
"3B:A7:00:40:18:80:65:A2:08:01:01:52"
|
||||
|
||||
Args:
|
||||
atr: string
|
||||
Returns:
|
||||
list of bytes
|
||||
|
||||
>>> normalize("3B:A7:00:40:18:80:65:A2:08:01:01:52")
|
||||
[59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82]
|
||||
"""
|
||||
atr = atr.replace(":", "")
|
||||
atr = atr.replace(" ", "")
|
||||
|
||||
res = []
|
||||
while len(atr) >= 2:
|
||||
byte, atr = atr[:2], atr[2:]
|
||||
res.append(byte)
|
||||
if len(atr) > 0:
|
||||
raise ValueError("warning: odd string, remainder: %r" % atr)
|
||||
|
||||
atr = [int(x, 16) for x in res]
|
||||
return atr
|
||||
|
||||
|
||||
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
|
||||
def decomposeATR(atr_txt):
|
||||
"""Decompose the ATR in elementary fields
|
||||
|
||||
Args:
|
||||
atr_txt: ATR as a hex bytes string
|
||||
Returns:
|
||||
dictionary of field and values
|
||||
|
||||
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
|
||||
{ 'T0': {'value': 167},
|
||||
'TB': {1: {'value': 0}},
|
||||
'TC': {2: {'value': 24}},
|
||||
'TD': {1: {'value': 64}},
|
||||
'TS': {'value': 59},
|
||||
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
|
||||
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
|
||||
'hbn': 7}
|
||||
"""
|
||||
ATR_PROTOCOL_TYPE_T0 = 0
|
||||
atr_txt = normalizeATR(atr_txt)
|
||||
atr = {}
|
||||
|
||||
# the ATR itself as a list of integers
|
||||
atr["atr"] = atr_txt
|
||||
|
||||
# store TS and T0
|
||||
atr["TS"] = {"value": atr_txt[0]}
|
||||
TDi = atr_txt[1]
|
||||
atr["T0"] = {"value": TDi}
|
||||
hb_length = TDi & 15
|
||||
pointer = 1
|
||||
# protocol number
|
||||
pn = 1
|
||||
|
||||
# store number of historical bytes
|
||||
atr["hbn"] = TDi & 0xF
|
||||
|
||||
while pointer < len(atr_txt):
|
||||
# Check TAi is present
|
||||
if (TDi | 0xEF) == 0xFF:
|
||||
pointer += 1
|
||||
if "TA" not in atr:
|
||||
atr["TA"] = {}
|
||||
atr["TA"][pn] = {"value": atr_txt[pointer]}
|
||||
|
||||
# Check TBi is present
|
||||
if (TDi | 0xDF) == 0xFF:
|
||||
pointer += 1
|
||||
if "TB" not in atr:
|
||||
atr["TB"] = {}
|
||||
atr["TB"][pn] = {"value": atr_txt[pointer]}
|
||||
|
||||
# Check TCi is present
|
||||
if (TDi | 0xBF) == 0xFF:
|
||||
pointer += 1
|
||||
if "TC" not in atr:
|
||||
atr["TC"] = {}
|
||||
atr["TC"][pn] = {"value": atr_txt[pointer]}
|
||||
|
||||
# Check TDi is present
|
||||
if (TDi | 0x7F) == 0xFF:
|
||||
pointer += 1
|
||||
if "TD" not in atr:
|
||||
atr["TD"] = {}
|
||||
TDi = atr_txt[pointer]
|
||||
atr["TD"][pn] = {"value": TDi}
|
||||
if (TDi & 0x0F) != ATR_PROTOCOL_TYPE_T0:
|
||||
atr["TCK"] = True
|
||||
pn += 1
|
||||
else:
|
||||
break
|
||||
|
||||
# Store historical bytes
|
||||
atr["hb"] = {"value": atr_txt[pointer + 1 : pointer + 1 + hb_length]}
|
||||
|
||||
# Store TCK
|
||||
last = pointer + 1 + hb_length
|
||||
if "TCK" in atr:
|
||||
try:
|
||||
atr["TCK"] = {"value": atr_txt[last]}
|
||||
except IndexError:
|
||||
atr["TCK"] = {"value": -1}
|
||||
last += 1
|
||||
|
||||
if len(atr_txt) > last:
|
||||
atr["extra"] = atr_txt[last:]
|
||||
|
||||
if len(atr["hb"]["value"]) < hb_length:
|
||||
missing = hb_length - len(atr["hb"]["value"])
|
||||
if missing > 1:
|
||||
(t1, t2) = ("s", "are")
|
||||
else:
|
||||
(t1, t2) = ("", "is")
|
||||
atr["warning"] = "ATR is truncated: %d byte%s %s missing" % (missing, t1, t2)
|
||||
|
||||
return atr
|
||||
|
||||
|
||||
class DataObject(abc.ABC):
|
||||
"""A DataObject (DO) in the sense of ISO 7816-4. Contrary to 'normal' TLVs where one
|
||||
simply has any number of different TLVs that may occur in any order at any point, ISO 7816
|
||||
has the habit of specifying TLV data but with very specific ordering, or specific choices of
|
||||
has the habit of specifying TLV data but with very spcific ordering, or specific choices of
|
||||
tags at specific points in a stream. This class tries to represent this."""
|
||||
|
||||
def __init__(self, name: str, desc: Optional[str] = None, tag: Optional[int] = None):
|
||||
@@ -839,7 +710,7 @@ class TL0_DataObject(DataObject):
|
||||
|
||||
|
||||
class DataObjectCollection:
|
||||
"""A DataObjectCollection consists of multiple Data Objects identified by their tags.
|
||||
"""A DataObjectCollection consits of multiple Data Objects identified by their tags.
|
||||
A given encoded DO may contain any of them in any order, and may contain multiple instances
|
||||
of each DO."""
|
||||
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
[build-system]
|
||||
requires = ["setuptools", "wheel"]
|
||||
requires = ["setuptools", "wheel", "pybind11"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.pylint.main]
|
||||
ignored-classes = ["twisted.internet.reactor"]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
pyscard
|
||||
pyserial
|
||||
pytlv
|
||||
cmd2>=2.6.2,<3.0
|
||||
cmd2>=1.5
|
||||
jsonpath-ng
|
||||
construct>=2.10.70
|
||||
bidict
|
||||
@@ -15,3 +15,8 @@ git+https://github.com/osmocom/asn1tools
|
||||
packaging
|
||||
git+https://github.com/hologram-io/smpp.pdu
|
||||
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
|
||||
pybind11
|
||||
klein
|
||||
service-identity
|
||||
pyopenssl
|
||||
requests
|
||||
28
setup.py
28
setup.py
@@ -1,4 +1,15 @@
|
||||
from setuptools import setup
|
||||
from pybind11.setup_helpers import Pybind11Extension, build_ext
|
||||
|
||||
ext_modules = [
|
||||
Pybind11Extension(
|
||||
"bsp_crypto",
|
||||
["bsp_python_bindings.cpp"],
|
||||
libraries=["ssl", "crypto"],
|
||||
extra_compile_args=["-ggdb", "-O0"],
|
||||
cxx_std=17,
|
||||
),
|
||||
]
|
||||
|
||||
setup(
|
||||
name='pySim',
|
||||
@@ -34,6 +45,11 @@ setup(
|
||||
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
|
||||
"asn1tools",
|
||||
"smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted",
|
||||
"pybind11",
|
||||
"klein",
|
||||
"service-identity",
|
||||
"pyopenssl",
|
||||
"requests",
|
||||
],
|
||||
scripts=[
|
||||
'pySim-prog.py',
|
||||
@@ -49,12 +65,8 @@ setup(
|
||||
'asn1/saip/*.asn',
|
||||
],
|
||||
},
|
||||
extras_require={
|
||||
"smdpp": [
|
||||
"klein",
|
||||
"service-identity",
|
||||
"pyopenssl",
|
||||
"requests",
|
||||
]
|
||||
},
|
||||
ext_modules=ext_modules,
|
||||
cmdclass={"build_ext": build_ext},
|
||||
zip_safe=False,
|
||||
python_requires=">=3.6",
|
||||
)
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
0D AL¶þV¿eRÌÍìAˆHÊt£×ôͺ„nìE<Nåû R¤~&Àk\þ~ ÉRlÜÛ°Ÿ‰¥7ì¶NŒŽmWø
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,16 +0,0 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICgzCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
|
||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI
|
||||
zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n
|
||||
MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV
|
||||
HQ8BAf8EBAMCB4AwIAYDVR0lAQH/BBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMBQG
|
||||
A1UdIAQNMAswCQYHZ4ESAQIBAzAdBgNVHQ4EFgQUPTMJg/OfzFvS5K1ophmnR0iu
|
||||
i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ
|
||||
dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG
|
||||
JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0
|
||||
cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0gA
|
||||
MEUCIQCfaGcMk+kuSJsbIyRPWttwWNftwQdHCQuu346PaiA2FAIgUrqhPw2um9gV
|
||||
C+eWHaXio7WQh5L6VgLZzNifTQcldD4=
|
||||
-----END CERTIFICATE-----
|
||||
Binary file not shown.
@@ -1,16 +0,0 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICgjCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
|
||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
|
||||
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
|
||||
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
|
||||
DwEB/wQEAwIHgDAgBgNVHSUBAf8EFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwFAYD
|
||||
VR0gBA0wCzAJBgdngRIBAgEDMB0GA1UdDgQWBBQn/vHyKRh+x4Pt9uApZKRRjVfU
|
||||
qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
|
||||
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
|
||||
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
|
||||
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSAAw
|
||||
RQIhAL+1lp/hGsj87/5RqOX2u3hS/VSftDN7EPrHJJFnTXLRAiBVxemKIKmC7+W1
|
||||
+RsTY5I51R+Cyoq4l5TEU49eplo5bw==
|
||||
-----END CERTIFICATE-----
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
107
smpp_ota_apdu.py
107
smpp_ota_apdu.py
@@ -1,107 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
import sys
|
||||
from pprint import pprint as pp
|
||||
|
||||
from pySim.ota import OtaKeyset, OtaDialectSms
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
import smpplib.gsm
|
||||
import smpplib.client
|
||||
import smpplib.consts
|
||||
import argparse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# if you want to know what's happening
|
||||
logging.basicConfig(level='DEBUG')
|
||||
|
||||
|
||||
|
||||
class Foo:
|
||||
def smpp_rx_handler(self, pdu):
|
||||
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
|
||||
if pdu.short_message:
|
||||
try:
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
|
||||
except ValueError:
|
||||
spi = self.spi.copy()
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
|
||||
pp(dec)
|
||||
return None
|
||||
|
||||
def __init__(self, kic, kid, idx, tar):
|
||||
# Two parts, UCS2, SMS with UDH
|
||||
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
|
||||
|
||||
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
|
||||
|
||||
# Print when obtain message_id
|
||||
client.set_message_sent_handler(
|
||||
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
|
||||
#client.set_message_received_handler(
|
||||
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
|
||||
client.set_message_received_handler(self.smpp_rx_handler)
|
||||
|
||||
client.connect()
|
||||
client.bind_transceiver(system_id='test', password='test')
|
||||
|
||||
self.client = client
|
||||
|
||||
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=idx, kic=h2b(kic),
|
||||
algo_auth='triple_des_cbc2', kid_idx=idx, kid=h2b(kid))
|
||||
self.ota_keyset.cntr = 0xdadb
|
||||
self.tar = h2b(tar)
|
||||
|
||||
self.ota_dialect = OtaDialectSms()
|
||||
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
|
||||
|
||||
def tx_sms_tpdu(self, tpdu: bytes):
|
||||
self.client.send_message(
|
||||
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure it is a byte string, not unicode:
|
||||
source_addr='12',
|
||||
|
||||
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure thease two params are byte strings, not unicode:
|
||||
destination_addr='23',
|
||||
short_message=tpdu,
|
||||
|
||||
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
|
||||
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
|
||||
protocol_id=0x7f,
|
||||
#registered_delivery=True,
|
||||
)
|
||||
|
||||
def tx_c_apdu(self, apdu: bytes):
|
||||
logger.info("C-APDU: %s" % b2h(apdu))
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
# add user data header
|
||||
tpdu = b'\x02\x70\x00' + secured
|
||||
# send via SMPP
|
||||
self.tx_sms_tpdu(tpdu)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument('-c', '--kic')
|
||||
parser.add_argument('-d', '--kid')
|
||||
parser.add_argument('-i', '--idx', type=int, default=1)
|
||||
parser.add_argument('-t', '--tar', default='b00011')
|
||||
parser.add_argument('apdu', default="", nargs='+')
|
||||
args = parser.parse_args()
|
||||
|
||||
f = Foo(args.kic, args.kid, args.idx, args.tar)
|
||||
print("initialized, sending APDU")
|
||||
f.tx_c_apdu(h2b("".join(args.apdu)))
|
||||
|
||||
f.client.listen()
|
||||
@@ -2,7 +2,7 @@ Detected UICC Add-on "SIM"
|
||||
Detected UICC Add-on "GSM-R"
|
||||
Detected UICC Add-on "RUIM"
|
||||
Can't read AIDs from SIM -- 'list' object has no attribute 'lower'
|
||||
EF.DIR seems to be empty!
|
||||
warning: EF.DIR seems to be empty!
|
||||
ADF.ECASD: a0000005591010ffffffff8900000200
|
||||
ADF.ISD-R: a0000005591010ffffffff8900000100
|
||||
ISIM: a0000000871004
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
"card_type_id","formfactor_id","imsi","iccid","pin1","puk1","pin2","puk2","ki","adm1","adm2","proprietary","kic1","kic2","kic3","kid1","kid2","kid3","kik1","kik2","kik3","msisdn","acc","opc"
|
||||
"myCardType","3FF","901700000000001","8988211000000000001","1234","12345678","1223","12345678","AAAAAAAAAAA5435425AAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 01","BBBBBBBBBB3324BBBBBBBB21212BBBBB","CC7654CCCCCCCCCCCCCCCCCCCCCCCCCC","DDDD90DDDDDDDDDDDDDDDDDD767DDDDD","EEEEEE567657567567EEEEEEEEEEEEEE","FFFFFFFFFFFFFFFFFFF56765765FFFFF","11111567811111111111111111111111","22222222222222222227669999222222","33333333333333333333333333333333","44444444444444445234544444444444","55555555555","0001","66666666666666666666666666666666"
|
||||
"myCardType","3FF","901700000000002","8988211000000000002","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAA3425AAAA","10101010","9999999999999999","proprietary data 02","BBBBBB421BBBBBBBBBB12BBBBBBBBBBB","CCCCCCCCCC3456CCCCCCCCCCCCCCCCCC","DDDDDDDDD567657DDDD2DDDDDDDDDDDD","EEEEEEEE56756EEEEEEEEE567657EEEE","FFFFF567657FFFFFFFFFFFFFFFFFFFFF","11111111111146113433411576511111","22222222222223432225765222222222","33333333333333523453333333333333","44425435234444444444446544444444","55555555555","0001","66666666666666266666666666666666"
|
||||
"myCardType","3FF","901700000000003","8988211000000000003","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 03","BBBBBBB45678BBBB756765BBBBBBBBBB","CCCCCCCCCCCCCC76543CCCC56765CCCC","DDDDDDDDDDDDDDDDDD5676575DDDDDDD","EEEEEEEEEEEEEEEEEE56765EEEEEEEEE","FFFFFFFFFFFFFFF567657FFFFFFFFFFF","11111111119876511111111111111111","22222222222444422222222222576522","33333332543333576733333333333333","44444444444567657567444444444444","55555555555","0001","66666675676575666666666666666666"
|
||||
|
||||
|
@@ -1,152 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
import os
|
||||
from pySim.card_key_provider import *
|
||||
|
||||
class TestCardKeyProviderCsv(unittest.TestCase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
|
||||
"OPC" : "000102030405065545645645645D0E0F",
|
||||
"KIC1" : "06410203546406456456450B0C0D0E0F",
|
||||
"KID1" : "00040267840507667609045645645E0F",
|
||||
"KIK1" : "0001020307687607668678678C0D0E0F",
|
||||
"KIC2" : "000142457594860706090A0B0688678F",
|
||||
"KID2" : "600102030405649468690A0B0C0D648F",
|
||||
"KIK2" : "00010203330506070496330B08640E0F",
|
||||
"KIC3" : "000104030405064684686A068C0D0E0F",
|
||||
"KID3" : "00010243048468070809060B0C0D0E0F",
|
||||
"KIK3" : "00010204040506070809488B0C0D0E0F"}
|
||||
|
||||
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
|
||||
card_key_provider_register(CardKeyProviderCsv(csv_file_path, column_keys))
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def test_card_key_provider_get(self):
|
||||
test_data = [{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||
'KI': '48a6d5f60567d45299e3ba08594009e7', 'ADM1': '10101010',
|
||||
'ADM2': '9999999999999999', 'KIC1': '3eb8567fa0b4b1e63bcab13bff5f2702',
|
||||
'KIC2': 'fd6c173a5b3f04b563808da24237fb46',
|
||||
'KIC3': '66c8c848e5dff69d70689d155d44f323',
|
||||
'KID1': 'd78accce870332dced467c173244dd94',
|
||||
'KID2': 'b3bf050969747b2d2c9389e127a3d791',
|
||||
'KID3': '40a77deb50d260b3041bbde1b5040625',
|
||||
'KIK1': '451b503239d818ea34421aa9c2a8887a',
|
||||
'KIK2': '967716f5fca8ae179f87f76524d1ae6b',
|
||||
'KIK3': '0884db5eee5409a00fc1bbc57ac52541',
|
||||
'OPC': '81817574c1961dd272ad080eb2caf279'}, 'ICCID' :"8988211000000000001"},
|
||||
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||
'KI': 'e94d7fa6fb92375dae86744ff6ecef49', 'ADM1': '10101010',
|
||||
'ADM2': '9999999999999999', 'KIC1': '79b4e39387c66253da68f653381ded44',
|
||||
'KIC2': '560561b5dba89c1da8d1920049e5e4f7',
|
||||
'KIC3': '79ff35e84e39305a119af8c79f84e8e5',
|
||||
'KID1': '233baf89122159553d67545ecedcf8e0',
|
||||
'KID2': '8fc2874164d7a8e40d72c968bc894ab8',
|
||||
'KID3': '2e3320f0dda85054d261be920fbfa065',
|
||||
'KIK1': 'd51b1b17630103d1672a3e9e0e4827ed',
|
||||
'KIK2': 'd01edbc48be555139506b0d7982bf7ff',
|
||||
'KIK3': 'a6487a5170849e8e0a03026afea91f5a',
|
||||
'OPC': '6b0d19ef28bd12f2daac31828d426939'}, 'ICCID' :"8988211000000000002"},
|
||||
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||
'KI': '3cdec1552ef433a89f327905213c5a6e', 'ADM1': '10101010',
|
||||
'ADM2': '9999999999999999', 'KIC1': '72986b13ce505e12653ad42df5cfca13',
|
||||
'KIC2': '8f0d1e58b01e833773e5562c4940674d',
|
||||
'KIC3': '9c72ba5a14d54f489edbffd3d8802f03',
|
||||
'KID1': 'd23a42995df9ca83f74b2cfd22695526',
|
||||
'KID2': '5c3a189d12aa1ac6614883d7de5e6c8c',
|
||||
'KID3': 'a6ace0d303a2b38a96b418ab83c16725',
|
||||
'KIK1': 'bf2319467d859c12527aa598430caef2',
|
||||
'KIK2': '6a4c459934bea7e40787976b8881ab01',
|
||||
'KIK3': '91cd02c38b5f68a98cc90a1f2299538f',
|
||||
'OPC': '6df46814b1697daca003da23808bbbc3'}, 'ICCID' :"8988211000000000003"}]
|
||||
|
||||
for t in test_data:
|
||||
result = card_key_provider_get(["PIN1","PUK1","PIN2","PUK2","KI","ADM1","ADM2","KIC1",
|
||||
"KIC2","KIC3","KID1","KID2","KID3","KIK1","KIK2","KIK3","OPC"],
|
||||
"ICCID", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
result = card_key_provider_get(["PIN1","puk1","PIN2","PUK2","KI","adm1","ADM2","KIC1",
|
||||
"KIC2","kic3","KID1","KID2","KID3","kik1","KIK2","KIK3","OPC"],
|
||||
"iccid", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
|
||||
def test_card_key_provider_get_field(self):
|
||||
test_data = [{'EXPECTED' : "3eb8567fa0b4b1e63bcab13bff5f2702", 'ICCID' :"8988211000000000001"},
|
||||
{'EXPECTED' : "79b4e39387c66253da68f653381ded44", 'ICCID' :"8988211000000000002"},
|
||||
{'EXPECTED' : "72986b13ce505e12653ad42df5cfca13", 'ICCID' :"8988211000000000003"}]
|
||||
|
||||
for t in test_data:
|
||||
result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
for t in test_data:
|
||||
result = card_key_provider_get_field("kic1", "iccid", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
|
||||
class TestCardKeyFieldCryptor(unittest.TestCase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
|
||||
"OPC" : "000102030405065545645645645D0E0F",
|
||||
"KIC1" : "06410203546406456456450B0C0D0E0F",
|
||||
"UICC_SCP03" : "00040267840507667609045645645E0F"}
|
||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def test_encrypt_field(self):
|
||||
test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "OPC"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIC1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KID1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIK1"},
|
||||
{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "opc"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kic1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kid1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kik1"}]
|
||||
|
||||
for t in test_data:
|
||||
result = self.crypt.encrypt_field(t.get('FIELDNAME'), t.get('PLAINTEXT_VAL'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
def test_decrypt_field(self):
|
||||
test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "OPC"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIC1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KID1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIK1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "opc"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kic1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kid1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kik1"}]
|
||||
|
||||
for t in test_data:
|
||||
result = self.crypt.decrypt_field(t.get('FIELDNAME'), t.get('ENCRYPTED_VAL'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,121 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import unittest
|
||||
import logging
|
||||
from pySim.log import PySimLogger
|
||||
import io
|
||||
import sys
|
||||
from inspect import currentframe, getframeinfo
|
||||
|
||||
log = PySimLogger.get("TEST")
|
||||
|
||||
TEST_MSG_DEBUG = "this is a debug message"
|
||||
TEST_MSG_INFO = "this is an info message"
|
||||
TEST_MSG_WARNING = "this is a warning message"
|
||||
TEST_MSG_ERROR = "this is an error message"
|
||||
TEST_MSG_CRITICAL = "this is a critical message"
|
||||
|
||||
expected_message = None
|
||||
|
||||
class PySimLogger_Test(unittest.TestCase):
|
||||
|
||||
def __test_01_safe_defaults_one(self, callback, message:str):
|
||||
# When log messages are sent to an unconfigured PySimLogger class, we expect the unmodified message being
|
||||
# logged to stdout, just as if it were printed via a normal print() statement.
|
||||
log_output = io.StringIO()
|
||||
sys.stdout = log_output
|
||||
callback(message)
|
||||
assert(log_output.getvalue().strip() == message)
|
||||
sys.stdout = sys.__stdout__
|
||||
|
||||
def test_01_safe_defaults(self):
|
||||
# When log messages are sent to an unconfigured PySimLogger class, we expect that all messages are logged,
|
||||
# regardless of the logging level.
|
||||
self.__test_01_safe_defaults_one(log.debug, TEST_MSG_DEBUG)
|
||||
self.__test_01_safe_defaults_one(log.info, TEST_MSG_INFO)
|
||||
self.__test_01_safe_defaults_one(log.warning, TEST_MSG_WARNING)
|
||||
self.__test_01_safe_defaults_one(log.error, TEST_MSG_ERROR)
|
||||
self.__test_01_safe_defaults_one(log.critical, TEST_MSG_CRITICAL)
|
||||
|
||||
@staticmethod
|
||||
def _test_print_callback(message):
|
||||
assert(message.strip() == expected_message)
|
||||
|
||||
def test_02_normal(self):
|
||||
# When the PySimLogger is set up with its default values, we expect formatted log messages on all logging
|
||||
# levels.
|
||||
global expected_message
|
||||
PySimLogger.setup(self._test_print_callback)
|
||||
expected_message = "DEBUG: " + TEST_MSG_DEBUG
|
||||
log.debug(TEST_MSG_DEBUG)
|
||||
expected_message = "INFO: " + TEST_MSG_INFO
|
||||
log.info(TEST_MSG_INFO)
|
||||
expected_message = "WARNING: " + TEST_MSG_WARNING
|
||||
log.warning(TEST_MSG_WARNING)
|
||||
expected_message = "ERROR: " + TEST_MSG_ERROR
|
||||
log.error(TEST_MSG_ERROR)
|
||||
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
|
||||
log.critical(TEST_MSG_CRITICAL)
|
||||
|
||||
def test_03_verbose(self):
|
||||
# When the PySimLogger is set up with its default values, we expect verbose formatted log messages on all
|
||||
# logging levels.
|
||||
global expected_message
|
||||
PySimLogger.setup(self._test_print_callback)
|
||||
PySimLogger.set_verbose(True)
|
||||
frame = currentframe()
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
|
||||
log.debug(TEST_MSG_DEBUG)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
|
||||
log.info(TEST_MSG_INFO)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
|
||||
log.warning(TEST_MSG_WARNING)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
|
||||
log.error(TEST_MSG_ERROR)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
|
||||
log.critical(TEST_MSG_CRITICAL)
|
||||
|
||||
def test_04_level(self):
|
||||
# When the PySimLogger is set up with its default values, we expect formatted log messages but since we will
|
||||
# limit the log level to INFO, we should not see any messages of level DEBUG
|
||||
global expected_message
|
||||
PySimLogger.setup(self._test_print_callback)
|
||||
PySimLogger.set_level(logging.INFO)
|
||||
|
||||
# We test this in non verbose mode, this will also confirm that disabeling the verbose mode works.
|
||||
PySimLogger.set_verbose(False)
|
||||
|
||||
# Debug messages should not appear
|
||||
expected_message = None
|
||||
log.debug(TEST_MSG_DEBUG)
|
||||
|
||||
# All other messages should appear normally
|
||||
expected_message = "INFO: " + TEST_MSG_INFO
|
||||
log.info(TEST_MSG_INFO)
|
||||
expected_message = "WARNING: " + TEST_MSG_WARNING
|
||||
log.warning(TEST_MSG_WARNING)
|
||||
expected_message = "ERROR: " + TEST_MSG_ERROR
|
||||
log.error(TEST_MSG_ERROR)
|
||||
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
|
||||
log.critical(TEST_MSG_CRITICAL)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user