mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-22 13:28:33 +03:00
Compare commits
23 Commits
laforge/ot
...
osmith/wip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e4ea1c9973 | ||
|
|
947154639c | ||
|
|
4ee99c18cd | ||
|
|
5d2e2ee259 | ||
|
|
92841f2cd5 | ||
|
|
caa955b3ac | ||
|
|
4dddcf932a | ||
|
|
10fe0e3aae | ||
|
|
076fec267a | ||
|
|
b4a12ecc14 | ||
|
|
6cffb31b42 | ||
|
|
6aed97d6c8 | ||
|
|
cb7d5aa3a7 | ||
|
|
70fedb5a46 | ||
|
|
7798ea9c5c | ||
|
|
0b1d3c85fd | ||
|
|
3c1a59640c | ||
|
|
ccefc98160 | ||
|
|
79805d1dd7 | ||
|
|
5969901be5 | ||
|
|
5316f2b1cc | ||
|
|
9572cbdb61 | ||
|
|
7fe7bff3d8 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -7,6 +7,10 @@
|
||||
/.local
|
||||
/build
|
||||
/pySim.egg-info
|
||||
/smdpp-data/sm-dp-sessions
|
||||
/smdpp-data/sm-dp-sessions*
|
||||
dist
|
||||
tags
|
||||
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem
|
||||
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
|
||||
smdpp-data/generated
|
||||
smdpp-data/certs/dhparam2048.pem
|
||||
|
||||
@@ -24,7 +24,7 @@ ICCID_HELP='The ICCID of the eSIM that shall be made available'
|
||||
MATCHID_HELP='MatchingID that shall be used by profile download'
|
||||
|
||||
parser = argparse.ArgumentParser(description="""
|
||||
Utility to manuall issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
|
||||
Utility to manually issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
|
||||
parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint')
|
||||
parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+')
|
||||
parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server')
|
||||
@@ -63,7 +63,7 @@ if __name__ == '__main__':
|
||||
data = {}
|
||||
for k, v in vars(opts).items():
|
||||
if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']:
|
||||
# remove keys from dict that shold not end up in JSON...
|
||||
# remove keys from dict that should not end up in JSON...
|
||||
continue
|
||||
if v is not None:
|
||||
data[k] = v
|
||||
|
||||
@@ -68,7 +68,7 @@ parser_dl.add_argument('--confirmation-code',
|
||||
# notification
|
||||
parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification')
|
||||
parser_ntf.add_argument('operation', choices=['enable','disable','delete'],
|
||||
help='Profile Management Opreation whoise occurrence shall be notififed')
|
||||
help='Profile Management Operation whoise occurrence shall be notififed')
|
||||
parser_ntf.add_argument('--sequence-nr', type=int, required=True,
|
||||
help='eUICC global notification sequence number')
|
||||
parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL')
|
||||
@@ -123,8 +123,8 @@ class Es9pClient:
|
||||
'profileManagementOperation': PMO(self.opts.operation).to_bitstring(),
|
||||
'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc,
|
||||
}
|
||||
if opts.iccid:
|
||||
ntf_metadata['iccid'] = h2b(swap_nibbles(opts.iccid))
|
||||
if self.opts.iccid:
|
||||
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid))
|
||||
|
||||
if self.opts.operation == 'download':
|
||||
pird = {
|
||||
|
||||
661
contrib/generate_smdpp_certs.py
Executable file
661
contrib/generate_smdpp_certs.py
Executable file
@@ -0,0 +1,661 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Faithfully reproduces the smdpp certs contained in SGP.26_v1.5_Certificates_18_07_2024.zip
|
||||
available at https://www.gsma.com/solutions-and-impact/technologies/esim/gsma_resources/sgp-26-test-certificate-definition-v1-5/
|
||||
Only usable for testing, it obviously uses a different CI key.
|
||||
"""
|
||||
|
||||
import os
|
||||
import binascii
|
||||
from datetime import datetime
|
||||
from cryptography import x509
|
||||
from cryptography.x509.oid import NameOID, ExtensionOID
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
# Custom OIDs used in certificates
|
||||
OID_CERTIFICATE_POLICIES_CI = "2.23.146.1.2.1.0" # CI cert policy
|
||||
OID_CERTIFICATE_POLICIES_TLS = "2.23.146.1.2.1.3" # DPtls cert policy
|
||||
OID_CERTIFICATE_POLICIES_AUTH = "2.23.146.1.2.1.4" # DPauth cert policy
|
||||
OID_CERTIFICATE_POLICIES_PB = "2.23.146.1.2.1.5" # DPpb cert policy
|
||||
|
||||
# Subject Alternative Name OIDs
|
||||
OID_CI_RID = "2.999.1" # CI Registered ID
|
||||
OID_DP_RID = "2.999.10" # DP+ Registered ID
|
||||
OID_DP2_RID = "2.999.12" # DP+2 Registered ID
|
||||
OID_DP4_RID = "2.999.14" # DP+4 Registered ID
|
||||
OID_DP8_RID = "2.999.18" # DP+8 Registered ID
|
||||
|
||||
|
||||
class SimplifiedCertificateGenerator:
|
||||
def __init__(self):
|
||||
self.backend = default_backend()
|
||||
# Store generated CI keys to sign other certs
|
||||
self.ci_certs = {} # {"BRP": cert, "NIST": cert}
|
||||
self.ci_keys = {} # {"BRP": key, "NIST": key}
|
||||
|
||||
def get_curve(self, curve_type):
|
||||
"""Get the appropriate curve object."""
|
||||
if curve_type == "BRP":
|
||||
return ec.BrainpoolP256R1()
|
||||
else:
|
||||
return ec.SECP256R1()
|
||||
|
||||
def generate_key_pair(self, curve):
|
||||
"""Generate a new EC key pair."""
|
||||
private_key = ec.generate_private_key(curve, self.backend)
|
||||
return private_key
|
||||
|
||||
def load_private_key_from_hex(self, hex_key, curve):
|
||||
"""Load EC private key from hex string."""
|
||||
key_bytes = binascii.unhexlify(hex_key.replace(":", "").replace(" ", "").replace("\n", ""))
|
||||
key_int = int.from_bytes(key_bytes, 'big')
|
||||
return ec.derive_private_key(key_int, curve, self.backend)
|
||||
|
||||
def generate_ci_cert(self, curve_type):
|
||||
"""Generate CI certificate for either BRP or NIST curve."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.generate_key_pair(curve)
|
||||
|
||||
# Build subject and issuer (self-signed) - same for both
|
||||
subject = issuer = x509.Name([
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "Test CI"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "TESTCERT"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSPTEST"),
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "IT"),
|
||||
])
|
||||
|
||||
# Build certificate - all parameters same for both
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(issuer)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 8, 27, 51))
|
||||
builder = builder.not_valid_after(datetime(2055, 4, 1, 8, 27, 51))
|
||||
builder = builder.serial_number(0xb874f3abfa6c44d3)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
# Add extensions - all same for both
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.BasicConstraints(ca=True, path_length=None),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_CI),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=False,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=True,
|
||||
crl_sign=True,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier(OID_CI_RID))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(private_key, hashes.SHA256(), self.backend)
|
||||
|
||||
self.ci_keys[curve_type] = private_key
|
||||
self.ci_certs[curve_type] = certificate
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_dp_cert(self, curve_type, subject_cn, serial, key_hex,
|
||||
cert_policy_oid, rid_oid, validity_start, validity_end):
|
||||
"""Generate a DP certificate signed by CI - works for both BRP and NIST."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(validity_start)
|
||||
builder = builder.not_valid_after(validity_end)
|
||||
builder = builder.serial_number(serial)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(cert_policy_oid),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_tls_cert(self, curve_type, subject_cn, dns_name, serial, key_hex,
|
||||
rid_oid, validity_start, validity_end):
|
||||
"""Generate a TLS certificate signed by CI."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(validity_start)
|
||||
builder = builder.not_valid_after(validity_end)
|
||||
builder = builder.serial_number(serial)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.ExtendedKeyUsage([
|
||||
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
|
||||
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_TLS),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.DNSName(dns_name),
|
||||
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_eum_cert(self, curve_type, key_hex):
|
||||
"""Generate EUM certificate signed by CI."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "EUM Test"),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 28, 37))
|
||||
builder = builder.not_valid_after(datetime(2054, 3, 24, 9, 28, 37))
|
||||
builder = builder.serial_number(0x12345678)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=False,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=True,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier("2.23.146.1.2.1.2"), # EUM policy
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier("2.999.5"))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.BasicConstraints(ca=True, path_length=0),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
# Name Constraints
|
||||
constrained_name = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032"),
|
||||
])
|
||||
|
||||
name_constraints = x509.NameConstraints(
|
||||
permitted_subtrees=[
|
||||
x509.DirectoryName(constrained_name)
|
||||
],
|
||||
excluded_subtrees=None
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
name_constraints,
|
||||
critical=True
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_euicc_cert(self, curve_type, eum_cert, eum_key, key_hex):
|
||||
"""Generate eUICC certificate signed by EUM."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032123451234512345678901235"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "Test eUICC"),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(eum_cert.subject)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 48, 58))
|
||||
builder = builder.not_valid_after(datetime(7496, 1, 24, 9, 48, 58))
|
||||
builder = builder.serial_number(0x0200000000000001)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(eum_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier("2.23.146.1.2.1.1"), # eUICC policy
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
certificate = builder.sign(eum_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def save_cert_and_key(self, cert, key, cert_path_der, cert_path_pem, key_path_sk, key_path_pk):
|
||||
"""Save certificate and key in various formats."""
|
||||
# Create directories if needed
|
||||
os.makedirs(os.path.dirname(cert_path_der), exist_ok=True)
|
||||
|
||||
with open(cert_path_der, "wb") as f:
|
||||
f.write(cert.public_bytes(serialization.Encoding.DER))
|
||||
|
||||
if cert_path_pem:
|
||||
with open(cert_path_pem, "wb") as f:
|
||||
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
if key and key_path_sk:
|
||||
with open(key_path_sk, "wb") as f:
|
||||
f.write(key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
))
|
||||
|
||||
if key and key_path_pk:
|
||||
with open(key_path_pk, "wb") as f:
|
||||
f.write(key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
))
|
||||
|
||||
|
||||
def main():
|
||||
gen = SimplifiedCertificateGenerator()
|
||||
|
||||
output_dir = "smdpp-data/generated"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
print("=== Generating CI Certificates ===")
|
||||
|
||||
for curve_type in ["BRP", "NIST"]:
|
||||
ci_cert, ci_key = gen.generate_ci_cert(curve_type)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
ci_cert, ci_key,
|
||||
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.der",
|
||||
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.pem",
|
||||
None, None
|
||||
)
|
||||
print(f"Generated CI {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPauth Certificates ===")
|
||||
|
||||
dpauth_configs = [
|
||||
("BRP", "TEST SM-DP+", 256, "93:fb:33:d0:58:4f:34:9b:07:f8:b5:d2:af:93:d7:c3:e3:54:b3:49:a3:b9:13:50:2e:6a:bc:07:0e:4d:49:29", OID_DP_RID, "DPauth"),
|
||||
("NIST", "TEST SM-DP+", 256, "0a:7c:c1:c2:44:e6:0c:52:cd:5b:78:07:ab:8c:36:0c:26:52:46:01:50:7d:ca:bc:5d:d5:98:b5:a6:16:d5:d5", OID_DP_RID, "DPauth"),
|
||||
("BRP", "TEST SM-DP+2", 512, "0c:17:35:5c:01:1d:0f:e8:d7:da:dd:63:f1:97:85:cf:6c:51:cb:cd:46:6a:e8:8b:e8:f8:1b:c1:05:88:46:f6", OID_DP2_RID, "DP2auth"),
|
||||
("NIST", "TEST SM-DP+2", 512, "9c:32:a0:95:d4:88:42:d9:ff:a4:04:f7:12:51:2a:a2:c5:42:5a:1a:26:38:6a:b6:a1:45:d5:81:1e:03:91:41", OID_DP2_RID, "DP2auth"),
|
||||
]
|
||||
|
||||
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dpauth_configs:
|
||||
cert, key = gen.generate_dp_cert(
|
||||
curve_type, cn, serial, key_hex,
|
||||
OID_CERTIFICATE_POLICIES_AUTH, rid_oid,
|
||||
datetime(2020, 4, 1, 8, 31, 30),
|
||||
datetime(2030, 3, 30, 8, 31, 30)
|
||||
)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPauth/CERT_S_SM_{name_prefix}{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPauth/SK_S_SM_{name_prefix}{suffix}.pem",
|
||||
f"{output_dir}/DPauth/PK_S_SM_{name_prefix}{suffix}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPpb Certificates ===")
|
||||
|
||||
dppb_configs = [
|
||||
("BRP", "TEST SM-DP+", 257, "75:ff:32:2f:41:66:16:da:e1:a4:84:ef:71:d4:87:4f:b0:df:32:95:fd:35:c2:cb:a4:89:fb:b2:bb:9c:7b:f6", OID_DP_RID, "DPpb"),
|
||||
("NIST", "TEST SM-DP+", 257, "dc:d6:94:b7:78:95:7e:8e:9a:dd:bd:d9:44:33:e9:ef:8f:73:d1:1e:49:1c:48:d4:25:a3:8a:94:91:bd:3b:ed", OID_DP_RID, "DPpb"),
|
||||
("BRP", "TEST SM-DP+2", 513, "9c:ae:2e:1a:56:07:a9:d5:78:38:2e:ee:93:2e:25:1f:52:30:4f:86:ee:b1:f1:70:8c:db:d3:c0:7b:e2:cd:3d", OID_DP2_RID, "DP2pb"),
|
||||
("NIST", "TEST SM-DP+2", 513, "66:93:11:49:63:9d:ba:ac:1d:c3:d3:06:c5:8b:d2:df:d2:2f:73:bf:63:ac:86:31:98:32:90:b5:7f:90:93:45", OID_DP2_RID, "DP2pb"),
|
||||
]
|
||||
|
||||
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dppb_configs:
|
||||
cert, key = gen.generate_dp_cert(
|
||||
curve_type, cn, serial, key_hex,
|
||||
OID_CERTIFICATE_POLICIES_PB, rid_oid,
|
||||
datetime(2020, 4, 1, 8, 34, 46),
|
||||
datetime(2030, 3, 30, 8, 34, 46)
|
||||
)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPpb/CERT_S_SM_{name_prefix}{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPpb/SK_S_SM_{name_prefix}{suffix}.pem",
|
||||
f"{output_dir}/DPpb/PK_S_SM_{name_prefix}{suffix}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPtls Certificates ===")
|
||||
|
||||
dptls_configs = [
|
||||
("BRP", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "3f:67:15:28:02:b3:f4:c7:fa:e6:79:58:55:f6:82:54:1e:45:e3:5e:ff:f4:e8:a0:55:65:a0:f1:91:2a:78:2e", OID_DP_RID, "DP_TLS_BRP"),
|
||||
("NIST", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "a0:3e:7c:e4:55:04:74:be:a4:b7:a8:73:99:ce:5a:8c:9f:66:1b:68:0f:94:01:39:ff:f8:4e:9d:ec:6a:4d:8c", OID_DP_RID, "DP_TLS_NIST"),
|
||||
("NIST", "testsmdpplus2.example.com", "testsmdpplus2.example.com", 12, "4e:65:61:c6:40:88:f6:69:90:7a:db:e3:94:b1:1a:84:24:2e:03:3a:82:a8:84:02:31:63:6d:c9:1b:4e:e3:f5", OID_DP2_RID, "DP2_TLS"),
|
||||
("NIST", "testsmdpplus4.example.com", "testsmdpplus4.example.com", 14, "f2:65:9d:2f:52:8f:4b:11:37:40:d5:8a:0d:2a:f3:eb:2b:48:e1:22:c2:b6:0a:6a:f6:fc:96:ad:86:be:6f:a4", OID_DP4_RID, "DP4_TLS"),
|
||||
("NIST", "testsmdpplus8.example.com", "testsmdpplus8.example.com", 18, "ff:6e:4a:50:9b:ad:db:38:10:88:31:c2:3c:cc:2d:44:30:7a:f2:81:e9:25:96:7f:8c:df:1d:95:54:a0:28:8d", OID_DP8_RID, "DP8_TLS"),
|
||||
]
|
||||
|
||||
for curve_type, cn, dns, serial, key_hex, rid_oid, name_prefix in dptls_configs:
|
||||
cert, key = gen.generate_tls_cert(
|
||||
curve_type, cn, dns, serial, key_hex, rid_oid,
|
||||
datetime(2024, 7, 9, 15, 29, 36),
|
||||
datetime(2025, 8, 11, 15, 29, 36)
|
||||
)
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPtls/CERT_S_SM_{name_prefix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPtls/SK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem",
|
||||
f"{output_dir}/DPtls/PK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} certificate")
|
||||
|
||||
print("\n=== Generating EUM Certificates ===")
|
||||
|
||||
eum_configs = [
|
||||
("BRP", "12:9b:0a:b1:3f:17:e1:4a:40:b6:fa:4e:d8:23:e0:cf:46:5b:7b:3d:73:24:05:e6:29:5d:3b:23:b0:45:c9:9a"),
|
||||
("NIST", "25:e6:75:77:28:e1:e9:51:13:51:9c:dc:34:55:5c:29:ba:ed:23:77:3a:c5:af:dd:dc:da:d9:84:89:8a:52:f0"),
|
||||
]
|
||||
|
||||
eum_certs = {}
|
||||
eum_keys = {}
|
||||
|
||||
for curve_type, key_hex in eum_configs:
|
||||
cert, key = gen.generate_eum_cert(curve_type, key_hex)
|
||||
eum_certs[curve_type] = cert
|
||||
eum_keys[curve_type] = key
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/EUM/CERT_EUM{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/EUM/SK_EUM{suffix}.pem",
|
||||
f"{output_dir}/EUM/PK_EUM{suffix}.pem"
|
||||
)
|
||||
print(f"Generated EUM {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating eUICC Certificates ===")
|
||||
|
||||
euicc_configs = [
|
||||
("BRP", "8d:c3:47:a7:6d:b7:bd:d6:22:2d:d7:5e:a1:a1:68:8a:ca:81:1e:4c:bc:6a:7f:6a:ef:a4:b2:64:19:62:0b:90"),
|
||||
("NIST", "11:e1:54:67:dc:19:4f:33:71:83:e4:60:c9:f6:32:60:09:1e:12:e8:10:26:cd:65:61:e1:7c:6d:85:39:cc:9c"),
|
||||
]
|
||||
|
||||
for curve_type, key_hex in euicc_configs:
|
||||
cert, key = gen.generate_euicc_cert(curve_type, eum_certs[curve_type], eum_keys[curve_type], key_hex)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/eUICC/CERT_EUICC{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/eUICC/SK_EUICC{suffix}.pem",
|
||||
f"{output_dir}/eUICC/PK_EUICC{suffix}.pem"
|
||||
)
|
||||
print(f"Generated eUICC {curve_type} certificate")
|
||||
|
||||
print("\n=== Certificate generation complete! ===")
|
||||
print(f"All certificates saved to: {output_dir}/")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -82,10 +82,6 @@ case "$JOB_TYPE" in
|
||||
|
||||
pip install -r requirements.txt
|
||||
|
||||
# XXX: workaround for https://github.com/python-cmd2/cmd2/issues/1414
|
||||
# 2.4.3 was the last stable release not affected by this bug (OS#6776)
|
||||
pip install cmd2==2.4.3
|
||||
|
||||
rm -rf docs/_build
|
||||
make -C "docs" html latexpdf
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ parser_rpe.add_argument('--output-file', required=True, help='Output file name')
|
||||
parser_rpe.add_argument('--identification', default=[], type=int, action='append', help='Remove PEs matching specified identification')
|
||||
parser_rpe.add_argument('--type', default=[], action='append', help='Remove PEs matching specified type')
|
||||
|
||||
parser_rn = subparsers.add_parser('remove-naa', help='Remove speciifed NAAs from PE-Sequence')
|
||||
parser_rn = subparsers.add_parser('remove-naa', help='Remove specified NAAs from PE-Sequence')
|
||||
parser_rn.add_argument('--output-file', required=True, help='Output file name')
|
||||
parser_rn.add_argument('--naa-type', required=True, choices=NAAs.keys(), help='Network Access Application type to remove')
|
||||
# TODO: add an --naa-index or the like, so only one given instance can be removed
|
||||
|
||||
@@ -27,5 +27,5 @@ PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH add-app-i
|
||||
# Display the contents of the resulting application PE:
|
||||
PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH info --apps
|
||||
|
||||
# For an explaination of --uicc-toolkit-app-spec-pars, see:
|
||||
# For an explanation of --uicc-toolkit-app-spec-pars, see:
|
||||
# ETSI TS 102 226, section 8.2.1.3.2.2.1
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# A more useful verion of the 'unber' tool provided with asn1c:
|
||||
# A more useful version of the 'unber' tool provided with asn1c:
|
||||
# Give a hierarchical decode of BER/DER-encoded ASN.1 TLVs
|
||||
|
||||
import sys
|
||||
|
||||
@@ -21,6 +21,14 @@ project = 'osmopysim-usermanual'
|
||||
copyright = '2009-2023 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
|
||||
# PDF: Avoid that the authors list exceeds the page by inserting '\and'
|
||||
# manually as line break (https://github.com/sphinx-doc/sphinx/issues/6875)
|
||||
latex_elements = {
|
||||
"maketitle":
|
||||
r"""\author{Sylvain Munaut, Harald Welte, Philipp Maier, \and Supreeth Herle, Merlin Chlosta}
|
||||
\sphinxmaketitle
|
||||
"""
|
||||
}
|
||||
|
||||
# -- General configuration ---------------------------------------------------
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ Two modes are possible:
|
||||
Ki and OPc will be generated during each programming cycle. This means fresh keys are generated, even when the
|
||||
``--num`` remains unchanged.
|
||||
|
||||
The parameter ``--num`` specifies a card individual number. This number will be manged into the random seed so that
|
||||
The parameter ``--num`` specifies a card individual number. This number will be managed into the random seed so that
|
||||
it serves as an identifier for a particular set of randomly generated parameters.
|
||||
|
||||
In the example above the parameters ``--mcc``, and ``--mnc`` are specified as well, since they identify the GSM
|
||||
@@ -77,7 +77,7 @@ the parameter ``--type``. The following card types are supported:
|
||||
|
||||
Specifying the card reader:
|
||||
|
||||
It is most common to use ``pySim-prog`` together whith a PCSC reader. The PCSC reader number is specified via the
|
||||
It is most common to use ``pySim-prog`` together with a PCSC reader. The PCSC reader number is specified via the
|
||||
``--pcsc-device`` or ``-p`` option. However, other reader types (such as serial readers and modems) are supported. Use
|
||||
the ``--help`` option of ``pySim-prog`` for more information.
|
||||
|
||||
|
||||
@@ -21,9 +21,9 @@ osmo-smdpp currently
|
||||
|
||||
* [by default] uses test certificates copied from GSMA SGP.26 into `./smdpp-data/certs`, assuming that your
|
||||
osmo-smdpp would be running at the host name `testsmdpplus1.example.com`. You can of course replace those
|
||||
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with mathcing eUICCs.
|
||||
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with matching eUICCs.
|
||||
* doesn't understand profile state. Any profile can always be downloaded any number of times, irrespective
|
||||
of the EID or whether it was donwloaded before. This is actually very useful for R&D and testing, as it
|
||||
of the EID or whether it was downloaded before. This is actually very useful for R&D and testing, as it
|
||||
doesn't require you to generate new profiles all the time. This logic of course is unsuitable for
|
||||
production usage.
|
||||
* doesn't perform any personalization, so the IMSI/ICCID etc. are always identical (the ones that are stored in
|
||||
|
||||
@@ -75,7 +75,7 @@ The response body is a JSON document, either
|
||||
#. key freshness failure
|
||||
#. unspecified card error
|
||||
|
||||
Example (succcess):
|
||||
Example (success):
|
||||
::
|
||||
|
||||
{
|
||||
|
||||
@@ -17,7 +17,7 @@ In any case, in order to operate a SUCI-enabled 5G SA network, you will have to
|
||||
#. deploy the public key on your USIMs
|
||||
#. deploy the private key on your 5GC, specifically the UDM function
|
||||
|
||||
pysim contains (int its `contrib` directory) a small utility program that can make it easy to generate
|
||||
pysim contains (in its `contrib` directory) a small utility program that can make it easy to generate
|
||||
such keys: `suci-keytool.py`
|
||||
|
||||
Generating keys
|
||||
|
||||
@@ -30,7 +30,7 @@ This guide covers the basic workflow of provisioning SIM cards with the 5G SUCI
|
||||
|
||||
For specific information on sysmocom SIM cards, refer to
|
||||
|
||||
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the curent
|
||||
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the current
|
||||
sysmoISIM-SJA5 product
|
||||
* the `sysmoISIM-SJA2 User Manual <https://sysmocom.de/manuals/sysmousim-manual.pdf>`__ for the older
|
||||
sysmoISIM-SJA2 product
|
||||
|
||||
458
osmo-smdpp.py
458
osmo-smdpp.py
@@ -17,28 +17,124 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
import uuid
|
||||
import os
|
||||
import functools
|
||||
from typing import Optional, Dict, List
|
||||
from pprint import pprint as pp
|
||||
|
||||
import base64
|
||||
from base64 import b64decode
|
||||
from klein import Klein
|
||||
from twisted.web.iweb import IRequest
|
||||
# asn1tools issue https://github.com/eerimoq/asn1tools/issues/194
|
||||
# must be first here
|
||||
import asn1tools
|
||||
import asn1tools.codecs.ber
|
||||
import asn1tools.codecs.der
|
||||
# do not move the code
|
||||
def fix_asn1_oid_decoding():
|
||||
fix_asn1_schema = """
|
||||
TestModule DEFINITIONS ::= BEGIN
|
||||
TestOid ::= SEQUENCE {
|
||||
oid OBJECT IDENTIFIER
|
||||
}
|
||||
END
|
||||
"""
|
||||
|
||||
from osmocom.utils import h2b, b2h, swap_nibbles
|
||||
fix_asn1_asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
|
||||
fix_asn1_oid_string = '2.999.10'
|
||||
fix_asn1_encoded = fix_asn1_asn1.encode('TestOid', {'oid': fix_asn1_oid_string})
|
||||
fix_asn1_decoded = fix_asn1_asn1.decode('TestOid', fix_asn1_encoded)
|
||||
|
||||
import pySim.esim.rsp as rsp
|
||||
from pySim.esim import saip, PMO
|
||||
from pySim.esim.es8p import *
|
||||
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
|
||||
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
|
||||
if (fix_asn1_decoded['oid'] != fix_asn1_oid_string):
|
||||
# ASN.1 OBJECT IDENTIFIER Decoding Issue:
|
||||
#
|
||||
# In ASN.1 BER/DER encoding, the first two arcs of an OBJECT IDENTIFIER are
|
||||
# combined into a single value: (40 * arc0) + arc1. This is encoded as a base-128
|
||||
# variable-length quantity (and commonly known as VLQ or base-128 encoding)
|
||||
# as specified in ITU-T X.690 §8.19, it can span multiple bytes if
|
||||
# the value is large.
|
||||
#
|
||||
# For arc0 = 0 or 1, arc1 must be in [0, 39]. For arc0 = 2, arc1 can be any non-negative integer.
|
||||
# All subsequent arcs (arc2, arc3, ...) are each encoded as a separate base-128 VLQ.
|
||||
#
|
||||
# The decoding bug occurs when the decoder does not properly split the first
|
||||
# subidentifier for arc0 = 2 and arc1 >= 40. Instead of decoding:
|
||||
# - arc0 = 2
|
||||
# - arc1 = (first_subidentifier - 80)
|
||||
# it may incorrectly interpret the first_subidentifier as arc0 = (first_subidentifier // 40),
|
||||
# arc1 = (first_subidentifier % 40), which is only valid for arc1 < 40.
|
||||
#
|
||||
# This patch handles it properly for all valid OBJECT IDENTIFIERs
|
||||
# with large second arcs, by applying the ASN.1 rules:
|
||||
# - if first_subidentifier < 40: arc0 = 0, arc1 = first_subidentifier
|
||||
# - elif first_subidentifier < 80: arc0 = 1, arc1 = first_subidentifier - 40
|
||||
# - else: arc0 = 2, arc1 = first_subidentifier - 80
|
||||
#
|
||||
# This problem is not uncommon, see for example https://github.com/randombit/botan/issues/4023
|
||||
|
||||
def fixed_decode_object_identifier(data, offset, end_offset):
|
||||
"""Decode ASN.1 OBJECT IDENTIFIER from bytes to dotted string, fixing large second arc handling."""
|
||||
def read_subidentifier(data, offset):
|
||||
value = 0
|
||||
while True:
|
||||
b = data[offset]
|
||||
value = (value << 7) | (b & 0x7F)
|
||||
offset += 1
|
||||
if not (b & 0x80):
|
||||
break
|
||||
return value, offset
|
||||
|
||||
subid, offset = read_subidentifier(data, offset)
|
||||
if subid < 40:
|
||||
first = 0
|
||||
second = subid
|
||||
elif subid < 80:
|
||||
first = 1
|
||||
second = subid - 40
|
||||
else:
|
||||
first = 2
|
||||
second = subid - 80
|
||||
arcs = [first, second]
|
||||
|
||||
while offset < end_offset:
|
||||
subid, offset = read_subidentifier(data, offset)
|
||||
arcs.append(subid)
|
||||
|
||||
return '.'.join(str(x) for x in arcs)
|
||||
|
||||
asn1tools.codecs.ber.decode_object_identifier = fixed_decode_object_identifier
|
||||
asn1tools.codecs.der.decode_object_identifier = fixed_decode_object_identifier
|
||||
|
||||
# test our patch
|
||||
asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
|
||||
decoded = asn1.decode('TestOid', fix_asn1_encoded)['oid']
|
||||
assert fix_asn1_oid_string == str(decoded)
|
||||
|
||||
fix_asn1_oid_decoding()
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature # noqa: E402
|
||||
from cryptography import x509 # noqa: E402
|
||||
from cryptography.exceptions import InvalidSignature # noqa: E402
|
||||
from cryptography.hazmat.primitives import hashes # noqa: E402
|
||||
from cryptography.hazmat.primitives.asymmetric import ec, dh # noqa: E402
|
||||
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption, ParameterFormat # noqa: E402
|
||||
from pathlib import Path # noqa: E402
|
||||
import json # noqa: E402
|
||||
import sys # noqa: E402
|
||||
import argparse # noqa: E402
|
||||
import uuid # noqa: E402
|
||||
import os # noqa: E402
|
||||
import functools # noqa: E402
|
||||
from typing import Optional, Dict, List # noqa: E402
|
||||
from pprint import pprint as pp # noqa: E402
|
||||
|
||||
import base64 # noqa: E402
|
||||
from base64 import b64decode # noqa: E402
|
||||
from klein import Klein # noqa: E402
|
||||
from twisted.web.iweb import IRequest # noqa: E402
|
||||
|
||||
from osmocom.utils import h2b, b2h, swap_nibbles # noqa: E402
|
||||
|
||||
import pySim.esim.rsp as rsp # noqa: E402
|
||||
from pySim.esim import saip, PMO # noqa: E402
|
||||
from pySim.esim.es8p import ProfileMetadata,UnprotectedProfilePackage,ProtectedProfilePackage,BoundProfilePackage,BspInstance # noqa: E402
|
||||
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id # noqa: E402
|
||||
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError # noqa: E402
|
||||
|
||||
import logging # noqa: E402
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# HACK: make this configurable
|
||||
DATA_DIR = './smdpp-data'
|
||||
@@ -54,6 +150,173 @@ def set_headers(request: IRequest):
|
||||
request.setHeader('Content-Type', 'application/json;charset=UTF-8')
|
||||
request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
|
||||
|
||||
def validate_request_headers(request: IRequest):
|
||||
"""Validate mandatory HTTP headers according to SGP.22."""
|
||||
content_type = request.getHeader('Content-Type')
|
||||
if not content_type or not content_type.startswith('application/json'):
|
||||
raise ApiError('1.2.1', '2.1', 'Invalid Content-Type header')
|
||||
|
||||
admin_protocol = request.getHeader('X-Admin-Protocol')
|
||||
if admin_protocol and not admin_protocol.startswith('gsma/rsp/v'):
|
||||
raise ApiError('1.2.2', '2.1', 'Unsupported X-Admin-Protocol version')
|
||||
|
||||
def get_eum_certificate_variant(eum_cert) -> str:
|
||||
"""Determine EUM certificate variant by checking Certificate Policies extension.
|
||||
Returns 'O' for old variant, or 'NEW' for Ov3/A/B/C variants."""
|
||||
|
||||
try:
|
||||
cert_policies_ext = eum_cert.extensions.get_extension_for_oid(
|
||||
x509.oid.ExtensionOID.CERTIFICATE_POLICIES
|
||||
)
|
||||
|
||||
for policy in cert_policies_ext.value:
|
||||
policy_oid = policy.policy_identifier.dotted_string
|
||||
logger.debug(f"Found certificate policy: {policy_oid}")
|
||||
|
||||
if policy_oid == '2.23.146.1.2.1.2':
|
||||
logger.debug("Detected EUM certificate variant: O (old)")
|
||||
return 'O'
|
||||
elif policy_oid == '2.23.146.1.2.1.0.0.0':
|
||||
logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)")
|
||||
return 'NEW'
|
||||
except x509.ExtensionNotFound:
|
||||
logger.debug("No Certificate Policies extension found")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error checking certificate policies: {e}")
|
||||
|
||||
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
||||
"""Extract permitted IINs from EUM certificate using the appropriate method
|
||||
based on certificate variant (O vs Ov3/A/B/C).
|
||||
Returns list of permitted IINs (basically prefixes that valid EIDs must start with)."""
|
||||
|
||||
# Determine certificate variant first
|
||||
cert_variant = get_eum_certificate_variant(eum_cert)
|
||||
permitted_iins = []
|
||||
|
||||
if cert_variant == 'O':
|
||||
# Old variant - use nameConstraints extension
|
||||
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
|
||||
|
||||
else:
|
||||
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
|
||||
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
|
||||
|
||||
unique_iins = list(set(permitted_iins))
|
||||
|
||||
logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}")
|
||||
return unique_iins
|
||||
|
||||
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
||||
"""Parse the GSMA permittedEins extension using correct ASN.1 structure.
|
||||
PermittedEins ::= SEQUENCE OF PrintableString
|
||||
Each string contains an IIN (Issuer Identification Number) - a prefix of valid EIDs."""
|
||||
permitted_iins = []
|
||||
|
||||
try:
|
||||
permitted_eins_oid = x509.ObjectIdentifier('2.23.146.1.2.2.0') # sgp26: 2.23.146.1.2.2.0 = ASN1:SEQUENCE:permittedEins
|
||||
|
||||
for ext in eum_cert.extensions:
|
||||
if ext.oid == permitted_eins_oid:
|
||||
logger.debug(f"Found GSMA permittedEins extension: {ext.oid}")
|
||||
|
||||
# Get the DER-encoded extension value
|
||||
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
|
||||
|
||||
if isinstance(ext_der, bytes):
|
||||
try:
|
||||
permitted_eins_schema = """
|
||||
PermittedEins DEFINITIONS ::= BEGIN
|
||||
PermittedEins ::= SEQUENCE OF PrintableString
|
||||
END
|
||||
"""
|
||||
decoder = asn1tools.compile_string(permitted_eins_schema)
|
||||
decoded_strings = decoder.decode('PermittedEins', ext_der)
|
||||
|
||||
for iin_string in decoded_strings:
|
||||
# Each string contains an IIN -> prefix of euicc EID
|
||||
iin_clean = iin_string.strip().upper()
|
||||
|
||||
# IINs is 8 chars per sgp22, var len according to sgp29, fortunately we don't care
|
||||
if (len(iin_clean) == 8 and
|
||||
all(c in '0123456789ABCDEF' for c in iin_clean) and
|
||||
len(iin_clean) % 2 == 0):
|
||||
permitted_iins.append(iin_clean)
|
||||
logger.debug(f"Found permitted IIN (GSMA): {iin_clean}")
|
||||
else:
|
||||
logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error parsing GSMA permittedEins extension: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error accessing GSMA certificate extensions: {e}")
|
||||
|
||||
return permitted_iins
|
||||
|
||||
|
||||
def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
||||
"""Parse permitted IINs from nameConstraints extension (variant O)."""
|
||||
permitted_iins = []
|
||||
|
||||
try:
|
||||
# Look for nameConstraints extension
|
||||
name_constraints_ext = eum_cert.extensions.get_extension_for_oid(
|
||||
x509.oid.ExtensionOID.NAME_CONSTRAINTS
|
||||
)
|
||||
|
||||
name_constraints = name_constraints_ext.value
|
||||
|
||||
# Check permittedSubtrees for IIN constraints
|
||||
if name_constraints.permitted_subtrees:
|
||||
for subtree in name_constraints.permitted_subtrees:
|
||||
|
||||
if isinstance(subtree, x509.DirectoryName):
|
||||
for attribute in subtree.value:
|
||||
# IINs for O in serialNumber
|
||||
if attribute.oid == x509.oid.NameOID.SERIAL_NUMBER:
|
||||
serial_value = attribute.value.upper()
|
||||
# sgp22 8, sgp29 var len, fortunately we don't care
|
||||
if (len(serial_value) == 8 and
|
||||
all(c in '0123456789ABCDEF' for c in serial_value) and
|
||||
len(serial_value) % 2 == 0):
|
||||
permitted_iins.append(serial_value)
|
||||
logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
|
||||
|
||||
except x509.ExtensionNotFound:
|
||||
logger.debug("No nameConstraints extension found")
|
||||
except Exception as e:
|
||||
logger.debug(f"Error parsing nameConstraints: {e}")
|
||||
|
||||
return permitted_iins
|
||||
|
||||
|
||||
def validate_eid_range(eid: str, eum_cert) -> bool:
|
||||
"""Validate that EID is within the permitted EINs of the EUM certificate."""
|
||||
if not eid or len(eid) != 32:
|
||||
logger.debug(f"Invalid EID format: {eid}")
|
||||
return False
|
||||
|
||||
try:
|
||||
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
|
||||
|
||||
if not permitted_eins:
|
||||
logger.debug("Warning: No permitted EINs found in EUM certificate")
|
||||
return False
|
||||
|
||||
eid_normalized = eid.upper()
|
||||
logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
|
||||
|
||||
for permitted_ein in permitted_eins:
|
||||
if eid_normalized.startswith(permitted_ein):
|
||||
logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
|
||||
return True
|
||||
|
||||
logger.debug(f"EID {eid_normalized} is not in any permitted EIN list")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error validating EID: {e}")
|
||||
return False
|
||||
|
||||
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
|
||||
r = {'subjectCode': subject_code, 'reasonCode': reason_code }
|
||||
if subject_id:
|
||||
@@ -72,12 +335,6 @@ def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_da
|
||||
if status_code_data:
|
||||
js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
|
||||
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography import x509
|
||||
|
||||
def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
|
||||
"""convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
|
||||
@@ -125,22 +382,36 @@ class SmDppHttpServer:
|
||||
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
|
||||
"""Find CI certificate for given key identifier."""
|
||||
for cert in self.ci_certs:
|
||||
print("cert: %s" % cert)
|
||||
logger.debug("cert: %s" % cert)
|
||||
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
|
||||
print(subject_exts)
|
||||
logger.debug(subject_exts)
|
||||
subject_pkid = subject_exts[0].value
|
||||
print(subject_pkid)
|
||||
logger.debug(subject_pkid)
|
||||
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
|
||||
return cert
|
||||
return None
|
||||
|
||||
def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
|
||||
def validate_certificate_chain_for_verification(self, euicc_ci_pkid_list: List[bytes]) -> bool:
|
||||
"""Validate that SM-DP+ has valid certificate chains for the given CI PKIDs."""
|
||||
for ci_pkid in euicc_ci_pkid_list:
|
||||
ci_cert = self.ci_get_cert_for_pkid(ci_pkid)
|
||||
if ci_cert:
|
||||
# Check if our DPauth certificate chains to this CI
|
||||
try:
|
||||
cs = CertificateSet(ci_cert)
|
||||
cs.verify_cert_chain(self.dp_auth.cert)
|
||||
return True
|
||||
except VerifyError:
|
||||
continue
|
||||
return False
|
||||
|
||||
def __init__(self, server_hostname: str, ci_certs_path: str, common_cert_path: str, use_brainpool: bool = False, in_memory: bool = False):
|
||||
self.server_hostname = server_hostname
|
||||
self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
|
||||
self.ci_certs = self.load_certs_from_path(ci_certs_path)
|
||||
# load DPauth cert + key
|
||||
self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2)
|
||||
cert_dir = os.path.join(DATA_DIR, 'certs')
|
||||
cert_dir = common_cert_path
|
||||
if use_brainpool:
|
||||
self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_BRP.der'))
|
||||
self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_BRP.pem'))
|
||||
@@ -155,7 +426,15 @@ class SmDppHttpServer:
|
||||
else:
|
||||
self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
|
||||
self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
|
||||
self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
|
||||
if in_memory:
|
||||
self.rss = rsp.RspSessionStore(in_memory=True)
|
||||
logger.info("Using in-memory session storage")
|
||||
else:
|
||||
# Use different session database files for BRP and NIST to avoid file locking during concurrent runs
|
||||
session_db_suffix = "BRP" if use_brainpool else "NIST"
|
||||
db_path = os.path.join(DATA_DIR, f"sm-dp-sessions-{session_db_suffix}")
|
||||
self.rss = rsp.RspSessionStore(filename=db_path, in_memory=False)
|
||||
logger.info(f"Using file-based session storage: {db_path}")
|
||||
|
||||
@app.handle_errors(ApiError)
|
||||
def handle_apierror(self, request: IRequest, failure):
|
||||
@@ -179,11 +458,10 @@ class SmDppHttpServer:
|
||||
functionality, such as JSON decoding/encoding and debug-printing."""
|
||||
@functools.wraps(func)
|
||||
def _api_wrapper(self, request: IRequest):
|
||||
# TODO: evaluate User-Agent + X-Admin-Protocol header
|
||||
# TODO: reject any non-JSON Content-type
|
||||
validate_request_headers(request)
|
||||
|
||||
content = json.loads(request.content.read())
|
||||
print("Rx JSON: %s" % json.dumps(content))
|
||||
logger.debug("Rx JSON: %s" % json.dumps(content))
|
||||
set_headers(request)
|
||||
|
||||
output = func(self, request, content)
|
||||
@@ -191,7 +469,7 @@ class SmDppHttpServer:
|
||||
return ''
|
||||
|
||||
build_resp_header(output)
|
||||
print("Tx JSON: %s" % json.dumps(output))
|
||||
logger.debug("Tx JSON: %s" % json.dumps(output))
|
||||
return json.dumps(output)
|
||||
return _api_wrapper
|
||||
|
||||
@@ -210,7 +488,7 @@ class SmDppHttpServer:
|
||||
|
||||
euiccInfo1_bin = b64decode(content['euiccInfo1'])
|
||||
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
|
||||
print("Rx euiccInfo1: %s" % euiccInfo1)
|
||||
logger.debug("Rx euiccInfo1: %s" % euiccInfo1)
|
||||
#euiccInfo1['svn']
|
||||
|
||||
# TODO: If euiccCiPKIdListForSigningV3 is present ...
|
||||
@@ -218,6 +496,12 @@ class SmDppHttpServer:
|
||||
pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
|
||||
if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
|
||||
pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
|
||||
|
||||
# Validate that SM-DP+ supports certificate chains for verification
|
||||
verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', [])
|
||||
if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list):
|
||||
raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA Certificate with a Public Key supported by the eUICC')
|
||||
|
||||
# verify it supports one of the keys indicated by euiccCiPKIdListForSigning
|
||||
ci_cert = None
|
||||
for x in pkid_list:
|
||||
@@ -232,13 +516,6 @@ class SmDppHttpServer:
|
||||
if not ci_cert:
|
||||
raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
|
||||
|
||||
# TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
|
||||
# * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
|
||||
# supported by the eUICC (indicated by euiccCiPKIdListForVerification).
|
||||
# * Using a certificate chain that the eUICC and the LPA both support:
|
||||
#euiccInfo1['euiccCiPKIdListForVerification']
|
||||
# raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
|
||||
|
||||
# Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
|
||||
# SHALL be unique within the scope and lifetime of each SM-DP+.
|
||||
transactionId = uuid.uuid4().hex.upper()
|
||||
@@ -254,9 +531,9 @@ class SmDppHttpServer:
|
||||
'serverAddress': self.server_hostname,
|
||||
'serverChallenge': serverChallenge,
|
||||
}
|
||||
print("Tx serverSigned1: %s" % serverSigned1)
|
||||
logger.debug("Tx serverSigned1: %s" % serverSigned1)
|
||||
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
|
||||
print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
||||
logger.debug("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
||||
output = {}
|
||||
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
|
||||
|
||||
@@ -285,7 +562,7 @@ class SmDppHttpServer:
|
||||
|
||||
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
|
||||
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
|
||||
print("Rx %s: %s" % authenticateServerResp)
|
||||
logger.debug("Rx %s: %s" % authenticateServerResp)
|
||||
if authenticateServerResp[0] == 'authenticateResponseError':
|
||||
r_err = authenticateServerResp[1]
|
||||
#r_err['transactionId']
|
||||
@@ -336,10 +613,12 @@ class SmDppHttpServer:
|
||||
if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
|
||||
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
|
||||
|
||||
# TODO: verify EID of eUICC cert is within permitted range of EUM cert
|
||||
|
||||
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
|
||||
print("EID (from eUICC cert): %s" % ss.eid)
|
||||
logger.debug("EID (from eUICC cert): %s" % ss.eid)
|
||||
|
||||
# Verify EID is within permitted range of EUM certificate
|
||||
if not validate_eid_range(ss.eid, eum_cert):
|
||||
raise ApiError('8.1.4', '6.1', 'EID is not within the permitted range of the EUM certificate')
|
||||
|
||||
# Verify that the serverChallenge attached to the ongoing RSP session matches the
|
||||
# serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -
|
||||
@@ -350,6 +629,7 @@ class SmDppHttpServer:
|
||||
# If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...]
|
||||
# TODO: We really do a very simplistic job here, this needs to be properly implemented later,
|
||||
# considering all the various cases, profile state, etc.
|
||||
iccid_str = None
|
||||
if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication':
|
||||
cpca = euiccSigned1['ctxParams1'][1]
|
||||
matchingId = cpca.get('matchingId', None)
|
||||
@@ -372,7 +652,7 @@ class SmDppHttpServer:
|
||||
# there's currently no other option in the ctxParams1 choice, so this cannot happen
|
||||
raise ApiError('1.3.1', '2.2', 'ctxParams1 missing mandatory ctxParamsForCommonAuthentication')
|
||||
|
||||
# FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
|
||||
# FIXME: we actually want to perform the profile binding herr, and read the profile metadata from the profile
|
||||
|
||||
# Put together profileMetadata + _bin
|
||||
ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId)
|
||||
@@ -414,7 +694,7 @@ class SmDppHttpServer:
|
||||
|
||||
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
|
||||
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
|
||||
print("Rx %s: %s" % prepDownloadResp)
|
||||
logger.debug("Rx %s: %s" % prepDownloadResp)
|
||||
|
||||
if prepDownloadResp[0] == 'downloadResponseError':
|
||||
r_err = prepDownloadResp[1]
|
||||
@@ -436,37 +716,37 @@ class SmDppHttpServer:
|
||||
|
||||
# store otPK.EUICC.ECKA in session state
|
||||
ss.euicc_otpk = euiccSigned2['euiccOtpk']
|
||||
print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||
logger.debug("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||
|
||||
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
|
||||
# Reference value of CERT.DPpb.ECDDSA
|
||||
print("curve = %s" % self.dp_pb.get_curve())
|
||||
logger.debug("curve = %s" % self.dp_pb.get_curve())
|
||||
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
|
||||
# extract the public key in (hopefully) the right format for the ES8+ interface
|
||||
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
||||
print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||
print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||
logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||
logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||
|
||||
ss.host_id = b'mahlzeit'
|
||||
|
||||
# Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
|
||||
euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
|
||||
ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
|
||||
print("shared_secret: %s" % b2h(ss.shared_secret))
|
||||
logger.debug("shared_secret: %s" % b2h(ss.shared_secret))
|
||||
|
||||
# TODO: Check if this order requires a Confirmation Code verification
|
||||
|
||||
# Perform actual protection + binding of profile package (or return pre-bound one)
|
||||
with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f:
|
||||
upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
|
||||
# HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
|
||||
# HACK: Use empty PPP as we're still debugging the configureISDP step, and we want to avoid
|
||||
# cluttering the log with stuff happening after the failure
|
||||
#upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
|
||||
if False:
|
||||
# Use random keys
|
||||
bpp = BoundProfilePackage.from_upp(upp)
|
||||
else:
|
||||
# Use sesssion keys
|
||||
# Use session keys
|
||||
ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
|
||||
bpp = BoundProfilePackage.from_ppp(ppp)
|
||||
|
||||
@@ -486,22 +766,22 @@ class SmDppHttpServer:
|
||||
request.setResponseCode(204)
|
||||
pendingNotification_bin = b64decode(content['pendingNotification'])
|
||||
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
|
||||
print("Rx %s: %s" % pendingNotification)
|
||||
logger.debug("Rx %s: %s" % pendingNotification)
|
||||
if pendingNotification[0] == 'profileInstallationResult':
|
||||
profileInstallRes = pendingNotification[1]
|
||||
pird = profileInstallRes['profileInstallationResultData']
|
||||
transactionId = b2h(pird['transactionId'])
|
||||
ss = self.rss.get(transactionId, None)
|
||||
if ss is None:
|
||||
print("Unable to find session for transactionId")
|
||||
return
|
||||
logger.warning(f"Unable to find session for transactionId: {transactionId}")
|
||||
return None # Will return HTTP 204 with empty body
|
||||
profileInstallRes['euiccSignPIR']
|
||||
# TODO: use original data, don't re-encode?
|
||||
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
||||
# verify eUICC signature
|
||||
if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
|
||||
raise Exception('ECDSA signature verification failed on notification')
|
||||
print("Profile Installation Final Result: ", pird['finalResult'])
|
||||
logger.debug("Profile Installation Final Result: %s", pird['finalResult'])
|
||||
# remove session state
|
||||
del self.rss[transactionId]
|
||||
elif pendingNotification[0] == 'otherSignedNotification':
|
||||
@@ -526,7 +806,7 @@ class SmDppHttpServer:
|
||||
iccid = other_notif.get('iccid', None)
|
||||
if iccid:
|
||||
iccid = swap_nibbles(b2h(iccid))
|
||||
print("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
||||
logger.debug("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
||||
else:
|
||||
raise ValueError(pendingNotification)
|
||||
|
||||
@@ -539,7 +819,7 @@ class SmDppHttpServer:
|
||||
@rsp_api_wrapper
|
||||
def cancelSession(self, request: IRequest, content: dict) -> dict:
|
||||
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
|
||||
print("Rx JSON: %s" % content)
|
||||
logger.debug("Rx JSON: %s" % content)
|
||||
transactionId = content['transactionId']
|
||||
|
||||
# Verify that the received transactionId is known and relates to an ongoing RSP session
|
||||
@@ -549,7 +829,7 @@ class SmDppHttpServer:
|
||||
|
||||
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
|
||||
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
|
||||
print("Rx %s: %s" % cancelSessionResponse)
|
||||
logger.debug("Rx %s: %s" % cancelSessionResponse)
|
||||
|
||||
if cancelSessionResponse[0] == 'cancelSessionResponseError':
|
||||
# FIXME: print some error
|
||||
@@ -583,13 +863,51 @@ def main(argv):
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
|
||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
|
||||
#parser.add_argument("-v", "--verbose", help="increase output verbosity", action='count', default=0)
|
||||
|
||||
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
||||
parser.add_argument("-s", "--nossl", help="do NOT use ssl", action='store_true', default=False)
|
||||
parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False)
|
||||
parser.add_argument("-b", "--brainpool", help="Use Brainpool curves instead of NIST",
|
||||
action='store_true', default=False)
|
||||
parser.add_argument("-m", "--in-memory", help="Use ephermal in-memory session storage (for concurrent runs)",
|
||||
action='store_true', default=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=False)
|
||||
#hs.app.run(endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
|
||||
hs.app.run(args.host, args.port)
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
|
||||
|
||||
common_cert_path = os.path.join(DATA_DIR, args.certdir)
|
||||
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool)
|
||||
if(args.nossl):
|
||||
hs.app.run(args.host, args.port)
|
||||
else:
|
||||
curve_type = 'BRP' if args.brainpool else 'NIST'
|
||||
cert_derpath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.der'
|
||||
cert_pempath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.pem'
|
||||
cert_skpath = Path(common_cert_path) / 'DPtls' / f'SK_S_SM_DP_TLS_{curve_type}.pem'
|
||||
dhparam_path = Path(common_cert_path) / "dhparam2048.pem"
|
||||
if not dhparam_path.exists():
|
||||
print("Generating dh params, this takes a few seconds..")
|
||||
# Generate DH parameters with 2048-bit key size and generator 2
|
||||
parameters = dh.generate_parameters(generator=2, key_size=2048)
|
||||
pem_data = parameters.parameter_bytes(encoding=Encoding.PEM,format=ParameterFormat.PKCS3)
|
||||
with open(dhparam_path, 'wb') as file:
|
||||
file.write(pem_data)
|
||||
print("DH params created successfully")
|
||||
|
||||
if not cert_pempath.exists():
|
||||
print("Translating tls server cert from DER to PEM..")
|
||||
with open(cert_derpath, 'rb') as der_file:
|
||||
der_cert_data = der_file.read()
|
||||
|
||||
cert = x509.load_der_x509_certificate(der_cert_data)
|
||||
pem_cert = cert.public_bytes(Encoding.PEM) #.decode('utf-8')
|
||||
|
||||
with open(cert_pempath, 'wb') as pem_file:
|
||||
pem_file.write(pem_cert)
|
||||
|
||||
SERVER_STRING = f'ssl:{args.port}:privateKey={cert_skpath}:certKey={cert_pempath}:dhParameters={dhparam_path}'
|
||||
print(SERVER_STRING)
|
||||
|
||||
hs.app.run(host=HOSTNAME, port=args.port, endpoint_description=SERVER_STRING)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv)
|
||||
|
||||
@@ -586,7 +586,7 @@ def read_params_csv(opts, imsi=None, iccid=None):
|
||||
else:
|
||||
row['mnc'] = row.get('mnc', mnc_from_imsi(row.get('imsi'), False))
|
||||
|
||||
# NOTE: We might concider to specify a new CSV field "mnclen" in our
|
||||
# NOTE: We might consider to specify a new CSV field "mnclen" in our
|
||||
# CSV files for a better automatization. However, this only makes sense
|
||||
# when the tools and databases we export our files from will also add
|
||||
# such a field.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
#
|
||||
# Utility to display some informations about a SIM card
|
||||
# Utility to display some information about a SIM card
|
||||
#
|
||||
#
|
||||
# Copyright (C) 2009 Sylvain Munaut <tnt@246tNt.com>
|
||||
|
||||
@@ -265,7 +265,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
def do_apdu(self, opts):
|
||||
"""Send a raw APDU to the card, and print SW + Response.
|
||||
CAUTION: this command bypasses the logical channel handling of pySim-shell and card state changes are not
|
||||
tracked. Dpending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
|
||||
tracked. Depending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
|
||||
a different file."""
|
||||
|
||||
# When sending raw APDUs we access the scc object through _scc member of the card object. It should also be
|
||||
@@ -336,7 +336,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
|
||||
def _process_card(self, first, script_path):
|
||||
|
||||
# Early phase of card initialzation (this part may fail with an exception)
|
||||
# Early phase of card initialization (this part may fail with an exception)
|
||||
try:
|
||||
rs, card = init_card(self.sl)
|
||||
rc = self.equip(card, rs)
|
||||
@@ -377,7 +377,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
|
||||
bulk_script_parser = argparse.ArgumentParser()
|
||||
bulk_script_parser.add_argument('SCRIPT_PATH', help="path to the script file")
|
||||
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exeption occurs',
|
||||
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exception occurs',
|
||||
action='store_true')
|
||||
bulk_script_parser.add_argument('--tries', type=int, default=2,
|
||||
help='how many tries before trying the next card')
|
||||
@@ -731,7 +731,7 @@ class PySimCommands(CommandSet):
|
||||
body = {}
|
||||
for t in tags:
|
||||
result = self._cmd.lchan.retrieve_data(t)
|
||||
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
|
||||
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
|
||||
body[t] = b2h(val)
|
||||
else:
|
||||
raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename))
|
||||
|
||||
@@ -84,7 +84,7 @@ def tcp_connected_callback(p: protocol.Protocol):
|
||||
logger.error("%s: connected!" % p)
|
||||
|
||||
class ProactChannel:
|
||||
"""Representation of a single proective channel."""
|
||||
"""Representation of a single protective channel."""
|
||||
def __init__(self, channels: 'ProactChannels', chan_nr: int):
|
||||
self.channels = channels
|
||||
self.chan_nr = chan_nr
|
||||
|
||||
@@ -151,7 +151,7 @@ global_group.add_argument('--no-suppress-select', action='store_false', dest='su
|
||||
global_group.add_argument('--no-suppress-status', action='store_false', dest='suppress_status',
|
||||
help="""
|
||||
Don't suppress displaying STATUS APDUs. We normally suppress them as they don't provide any
|
||||
information that was not already received in resposne to the most recent SEELCT.""")
|
||||
information that was not already received in response to the most recent SEELCT.""")
|
||||
global_group.add_argument('--show-raw-apdu', action='store_true', dest='show_raw_apdu',
|
||||
help="""Show the raw APDU in addition to its parsed form.""")
|
||||
|
||||
@@ -188,7 +188,7 @@ parser_rspro_pyshark_live.add_argument('-i', '--interface', required=True,
|
||||
parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
|
||||
Read APDUs from a TCA Loader log file.""")
|
||||
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
|
||||
help='Name of te log file to be read')
|
||||
help='Name of the log file to be read')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
|
||||
@@ -317,7 +317,7 @@ class ADF_ARAM(CardADF):
|
||||
store_ref_ar_do_parse = argparse.ArgumentParser()
|
||||
# REF-DO
|
||||
store_ref_ar_do_parse.add_argument(
|
||||
'--device-app-id', required=True, help='Identifies the specific device application that the rule appplies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
|
||||
'--device-app-id', required=True, help='Identifies the specific device application that the rule applies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
|
||||
aid_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
|
||||
aid_grp.add_argument(
|
||||
'--aid', help='Identifies the specific SE application for which rules are to be stored. Can be a partial AID, containing for example only the RID. (5-16 or 0 hex bytes)')
|
||||
@@ -399,7 +399,7 @@ class ADF_ARAM(CardADF):
|
||||
sw_aram = {
|
||||
'ARA-M': {
|
||||
'6381': 'Rule successfully stored but an access rule already exists',
|
||||
'6382': 'Rule successfully stored bu contained at least one unknown (discarded) BER-TLV',
|
||||
'6382': 'Rule successfully stored but contained at least one unknown (discarded) BER-TLV',
|
||||
'6581': 'Memory Problem',
|
||||
'6700': 'Wrong Length in Lc',
|
||||
'6981': 'DO is not supported by the ARA-M/ARA-C',
|
||||
|
||||
@@ -163,7 +163,7 @@ def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key
|
||||
provider_list : override the list of providers from the global default
|
||||
"""
|
||||
if not isinstance(provider, CardKeyProvider):
|
||||
raise ValueError("provider is not a card data provier")
|
||||
raise ValueError("provider is not a card data provider")
|
||||
provider_list.append(provider)
|
||||
|
||||
|
||||
@@ -181,7 +181,7 @@ def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_p
|
||||
for p in provider_list:
|
||||
if not isinstance(p, CardKeyProvider):
|
||||
raise ValueError(
|
||||
"provider list contains element which is not a card data provier")
|
||||
"provider list contains element which is not a card data provider")
|
||||
result = p.get(fields, key, value)
|
||||
if result:
|
||||
return result
|
||||
@@ -202,7 +202,7 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
|
||||
for p in provider_list:
|
||||
if not isinstance(p, CardKeyProvider):
|
||||
raise ValueError(
|
||||
"provider list contains element which is not a card data provier")
|
||||
"provider list contains element which is not a card data provider")
|
||||
result = p.get_field(field, key, value)
|
||||
if result:
|
||||
return result
|
||||
|
||||
@@ -112,6 +112,8 @@ class UiccCardBase(SimCardBase):
|
||||
def probe(self) -> bool:
|
||||
# EF.DIR is a mandatory EF on all ICCIDs; however it *may* also exist on a TS 51.011 SIM
|
||||
ef_dir = EF_DIR()
|
||||
# select MF first
|
||||
self.file_exists("3f00")
|
||||
return self.file_exists(ef_dir.fid)
|
||||
|
||||
def read_aids(self) -> List[Hexstr]:
|
||||
|
||||
@@ -316,19 +316,19 @@ class FileList(COMPR_TLV_IE, tag=0x92):
|
||||
_construct = Struct('number_of_files'/Int8ub,
|
||||
'files'/GreedyRange(FileId))
|
||||
|
||||
# TS 102 223 Secton 8.19
|
||||
# TS 102 223 Section 8.19
|
||||
class LocationInformation(COMPR_TLV_IE, tag=0x93):
|
||||
pass
|
||||
|
||||
# TS 102 223 Secton 8.20
|
||||
# TS 102 223 Section 8.20
|
||||
class IMEI(COMPR_TLV_IE, tag=0x94):
|
||||
_construct = BcdAdapter(GreedyBytes)
|
||||
|
||||
# TS 102 223 Secton 8.21
|
||||
# TS 102 223 Section 8.21
|
||||
class HelpRequest(COMPR_TLV_IE, tag=0x95):
|
||||
pass
|
||||
|
||||
# TS 102 223 Secton 8.22
|
||||
# TS 102 223 Section 8.22
|
||||
class NetworkMeasurementResults(COMPR_TLV_IE, tag=0x96):
|
||||
_construct = BcdAdapter(GreedyBytes)
|
||||
|
||||
|
||||
@@ -285,7 +285,7 @@ class SimCardCommands:
|
||||
return self.send_apdu_checksw(self.cla_byte + "a40304")
|
||||
|
||||
def select_adf(self, aid: Hexstr) -> ResTuple:
|
||||
"""Execute SELECT a given Applicaiton ADF.
|
||||
"""Execute SELECT a given Application ADF.
|
||||
|
||||
Args:
|
||||
aid : application identifier as hex string
|
||||
@@ -577,7 +577,7 @@ class SimCardCommands:
|
||||
|
||||
Args:
|
||||
rand : 16 byte random data as hex string (RAND)
|
||||
autn : 8 byte Autentication Token (AUTN)
|
||||
autn : 8 byte Authentication Token (AUTN)
|
||||
context : 16 byte random data ('3g' or 'gsm')
|
||||
"""
|
||||
# 3GPP TS 31.102 Section 7.1.2.1
|
||||
|
||||
@@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
|
||||
block_nr = self.block_nr
|
||||
ciphertext = self._encrypt(padded_data)
|
||||
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
|
||||
block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext))
|
||||
block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20])
|
||||
return ciphertext
|
||||
|
||||
def decrypt(self, data:bytes) -> bytes:
|
||||
@@ -149,10 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC):
|
||||
temp_data = self.mac_chain + tag_and_length + data
|
||||
old_mcv = self.mac_chain
|
||||
c_mac = self._auth(temp_data)
|
||||
|
||||
# DEBUG: Show MAC computation details
|
||||
logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
|
||||
logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
|
||||
logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
|
||||
logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
|
||||
logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
|
||||
|
||||
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
|
||||
ret = tag_and_length + data + c_mac
|
||||
logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
|
||||
|
||||
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
|
||||
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret))
|
||||
tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20])
|
||||
return ret
|
||||
|
||||
def verify(self, ciphertext: bytes) -> bool:
|
||||
@@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
|
||||
s_enc = out[l:2*l]
|
||||
s_mac = out[l*2:3*l]
|
||||
|
||||
logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
|
||||
|
||||
return s_enc, s_mac, initial_mac_chaining_value
|
||||
|
||||
|
||||
@@ -231,9 +246,21 @@ class BspInstance:
|
||||
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||
assert tag <= 255
|
||||
assert len(plaintext) <= self.max_payload_size
|
||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
|
||||
|
||||
# DEBUG: Show what we're processing
|
||||
logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
|
||||
logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
|
||||
|
||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20])
|
||||
ciphered = self.c_algo.encrypt(plaintext)
|
||||
logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
|
||||
|
||||
maced = self.m_algo.auth(tag, ciphered)
|
||||
logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}")
|
||||
|
||||
return maced
|
||||
|
||||
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
|
||||
@@ -255,7 +282,7 @@ class BspInstance:
|
||||
def mac_only_one(self, tag: int, plaintext: bytes) -> bytes:
|
||||
"""MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||
assert tag <= 255
|
||||
assert len(plaintext) < self.max_payload_size
|
||||
assert len(plaintext) <= self.max_payload_size
|
||||
maced = self.m_algo.auth(tag, plaintext)
|
||||
# The data block counter for ICV calculation is incremented also for each segment with C-MAC only.
|
||||
self.c_algo.block_nr += 1
|
||||
|
||||
@@ -116,7 +116,7 @@ class param:
|
||||
pass
|
||||
|
||||
class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||
"""Base classs for representing an ES2+ API Function."""
|
||||
"""Base class for representing an ES2+ API Function."""
|
||||
pass
|
||||
|
||||
# ES2+ DownloadOrder function (SGP.22 section 5.3.1)
|
||||
|
||||
@@ -25,6 +25,9 @@ import pySim.esim.rsp as rsp
|
||||
from pySim.esim.bsp import BspInstance
|
||||
from pySim.esim import PMO
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
|
||||
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
|
||||
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
|
||||
@@ -196,8 +199,12 @@ class BoundProfilePackage(ProfilePackage):
|
||||
# 'initialiseSecureChannelRequest'
|
||||
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
|
||||
# firstSequenceOf87
|
||||
logger.debug("BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
|
||||
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
|
||||
# sequenceOF88
|
||||
logger.debug("BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
|
||||
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
|
||||
|
||||
if self.ppp: # we have to use session keys
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""GSMA eSIM RSP ES9+ interface according ot SGP.22 v2.5"""
|
||||
"""GSMA eSIM RSP ES9+ interface according to SGP.22 v2.5"""
|
||||
|
||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
|
||||
@@ -159,7 +159,7 @@ class ApiError(Exception):
|
||||
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
|
||||
|
||||
class JsonHttpApiFunction(abc.ABC):
|
||||
"""Base classs for representing an HTTP[s] API Function."""
|
||||
"""Base class for representing an HTTP[s] API Function."""
|
||||
# the below class variables are expected to be overridden in derived classes
|
||||
|
||||
path = None
|
||||
|
||||
@@ -90,13 +90,61 @@ class RspSessionState:
|
||||
# FIXME: how to add the public key from smdp_otpk to an instance of EllipticCurvePrivateKey?
|
||||
del state['_smdp_otsk']
|
||||
del state['_smdp_ot_curve']
|
||||
# automatically recover all the remainig state
|
||||
# automatically recover all the remaining state
|
||||
self.__dict__.update(state)
|
||||
|
||||
|
||||
class RspSessionStore(shelve.DbfilenameShelf):
|
||||
"""A derived class as wrapper around the database-backed non-volatile storage 'shelve', in case we might
|
||||
need to extend it in the future. We use it to store RspSessionState objects indexed by transactionId."""
|
||||
class RspSessionStore:
|
||||
"""A wrapper around the database-backed storage 'shelve' for storing RspSessionState objects.
|
||||
Can be configured to use either file-based storage or in-memory storage.
|
||||
We use it to store RspSessionState objects indexed by transactionId."""
|
||||
|
||||
def __init__(self, filename: Optional[str] = None, in_memory: bool = False):
|
||||
self._in_memory = in_memory
|
||||
|
||||
if in_memory:
|
||||
self._shelf = shelve.Shelf(dict())
|
||||
else:
|
||||
if filename is None:
|
||||
raise ValueError("filename is required for file-based session store")
|
||||
self._shelf = shelve.open(filename)
|
||||
|
||||
# dunder magic
|
||||
def __getitem__(self, key):
|
||||
return self._shelf[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._shelf[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self._shelf[key]
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self._shelf
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._shelf)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._shelf)
|
||||
|
||||
# everything else
|
||||
def __getattr__(self, name):
|
||||
"""Delegate attribute access to the underlying shelf object."""
|
||||
return getattr(self._shelf, name)
|
||||
|
||||
def close(self):
|
||||
"""Close the session store."""
|
||||
if hasattr(self._shelf, 'close'):
|
||||
self._shelf.close()
|
||||
if self._in_memory:
|
||||
# For in-memory store, clear the reference
|
||||
self._shelf = None
|
||||
|
||||
def sync(self):
|
||||
"""Synchronize the cache with the underlying storage."""
|
||||
if hasattr(self._shelf, 'sync'):
|
||||
self._shelf.sync()
|
||||
|
||||
def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes:
|
||||
"""Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This
|
||||
|
||||
@@ -334,7 +334,7 @@ class File:
|
||||
self.fill_pattern = pefi['fillPattern']
|
||||
self.fill_pattern_repeat = False
|
||||
elif fdb_dec['file_type'] == 'df':
|
||||
# only set it, if an earlier call to from_template() didn't alrady set it, as
|
||||
# only set it, if an earlier call to from_template() didn't already set it, as
|
||||
# the template can differentiate between MF, DF and ADF (unlike FDB)
|
||||
if not self.file_type:
|
||||
self.file_type = 'DF'
|
||||
@@ -427,7 +427,7 @@ class File:
|
||||
|
||||
class ProfileElement:
|
||||
"""Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly,
|
||||
but ist more likely sub-classed with a specific class for the specific profile element type, like e.g
|
||||
but it's more likely sub-classed with a specific class for the specific profile element type, like e.g
|
||||
ProfileElementHeader, ProfileElementMF, ...
|
||||
"""
|
||||
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
|
||||
@@ -440,7 +440,7 @@ class ProfileElement:
|
||||
'genericFileManagement': 'gfm-header',
|
||||
'akaParameter': 'aka-header',
|
||||
'cdmaParameter': 'cdma-header',
|
||||
# note how they couldn't even consistently captialize the 'header' suffix :(
|
||||
# note how they couldn't even consistently capitalize the 'header' suffix :(
|
||||
'application': 'app-Header',
|
||||
'pukCodes': 'puk-Header',
|
||||
'pinCodes': 'pin-Header',
|
||||
@@ -628,7 +628,7 @@ class FsProfileElement(ProfileElement):
|
||||
# this is a template that belongs into the [A]DF of another template
|
||||
# 1) find the PE for the referenced template
|
||||
parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid)
|
||||
# 2) resolve te [A]DF that forms the base of that parent PE
|
||||
# 2) resolve the [A]DF that forms the base of that parent PE
|
||||
pe_df = parent_pe.files[template.parent.base_df().pe_name].node
|
||||
self.pe_sequence.cur_df = pe_df
|
||||
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
|
||||
@@ -649,7 +649,7 @@ class FsProfileElement(ProfileElement):
|
||||
self.add_file(file)
|
||||
|
||||
def create_file(self, pename: str) -> File:
|
||||
"""Programatically create a file by its PE-Name."""
|
||||
"""Programmatically create a file by its PE-Name."""
|
||||
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
||||
file = File(pename, None, template.files_by_pename.get(pename, None))
|
||||
self.add_file(file)
|
||||
@@ -1409,7 +1409,7 @@ class ProfileElementHeader(ProfileElement):
|
||||
iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None,
|
||||
**kwargs):
|
||||
"""You would usually initialize an instance either with a "decoded" argument (as read from
|
||||
a DER-encoded SAIP file via asn1tools), or [some of] the othe arguments in case you're
|
||||
a DER-encoded SAIP file via asn1tools), or [some of] the other arguments in case you're
|
||||
constructing a Profile Header from scratch.
|
||||
|
||||
Args:
|
||||
@@ -1562,7 +1562,7 @@ class ProfileElementSequence:
|
||||
|
||||
def _rebuild_pes_by_naa(self) -> None:
|
||||
"""rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form,
|
||||
which basically means for every NAA there's a lsit of instances, and each consists
|
||||
which basically means for every NAA there's a list of instances, and each consists
|
||||
of a list of a list of PEs."""
|
||||
self.pres_by_naa = {}
|
||||
petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end']
|
||||
@@ -1690,7 +1690,7 @@ class ProfileElementSequence:
|
||||
i += 1
|
||||
|
||||
def get_index_by_pe(self, pe: ProfileElement) -> int:
|
||||
"""Return a list with the indicies of all instances of PEs of petype."""
|
||||
"""Return a list with the indices of all instances of PEs of petype."""
|
||||
ret = []
|
||||
i = 0
|
||||
for cur in self.pe_list:
|
||||
@@ -1711,7 +1711,7 @@ class ProfileElementSequence:
|
||||
self.insert_at_index(idx+1, pe_new)
|
||||
|
||||
def get_index_by_type(self, petype: str) -> List[int]:
|
||||
"""Return a list with the indicies of all instances of PEs of petype."""
|
||||
"""Return a list with the indices of all instances of PEs of petype."""
|
||||
ret = []
|
||||
i = 0
|
||||
for pe in self.pe_list:
|
||||
@@ -1736,7 +1736,7 @@ class ProfileElementSequence:
|
||||
for service in naa.mandatory_services:
|
||||
if service in hdr.decoded['eUICC-Mandatory-services']:
|
||||
del hdr.decoded['eUICC-Mandatory-services'][service]
|
||||
# remove any associaed mandatory filesystem templates
|
||||
# remove any associated mandatory filesystem templates
|
||||
for template in naa.templates:
|
||||
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
|
||||
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
|
||||
@@ -2040,7 +2040,8 @@ class FsNodeADF(FsNodeDF):
|
||||
super().__init__(fid, parent, file, name)
|
||||
|
||||
def __str__(self):
|
||||
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name))
|
||||
# self.df_name is usually None for an ADF like ADF.USIM or ADF.ISIM so we need to guard against it
|
||||
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name) if self.df_name else None)
|
||||
|
||||
class FsNodeMF(FsNodeDF):
|
||||
"""The MF (Master File) in the filesystem hierarchy."""
|
||||
|
||||
@@ -68,7 +68,7 @@ class CheckBasicStructure(ProfileConstraintChecker):
|
||||
|
||||
def check_optional_ordering(self, pes: ProfileElementSequence):
|
||||
"""Check the ordering of optional PEs following the respective mandatory ones."""
|
||||
# ordering and required depenencies
|
||||
# ordering and required dependencies
|
||||
self._is_after_if_exists(pes,'opt-usim', 'usim')
|
||||
self._is_after_if_exists(pes,'opt-isim', 'isim')
|
||||
self._is_after_if_exists(pes,'gsm-access', 'usim')
|
||||
|
||||
@@ -149,7 +149,7 @@ class CertificateSet:
|
||||
check_signed(c, self.root_cert)
|
||||
return
|
||||
parent_cert = self.intermediate_certs.get(aki, None)
|
||||
if not aki:
|
||||
if not parent_cert:
|
||||
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki))
|
||||
check_signed(c, parent_cert)
|
||||
# if we reach here, we passed (no exception raised)
|
||||
|
||||
@@ -39,7 +39,7 @@ from osmocom.utils import h2b, b2h, is_hex, auto_int, auto_uint8, auto_uint16, i
|
||||
from osmocom.tlv import bertlv_parse_one
|
||||
from osmocom.construct import filter_dict, parse_construct, build_construct
|
||||
|
||||
from pySim.utils import sw_match
|
||||
from pySim.utils import sw_match, decomposeATR
|
||||
from pySim.jsonpath import js_path_modify
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.exceptions import SwMatchError
|
||||
@@ -86,7 +86,7 @@ class CardFile:
|
||||
self.service = service
|
||||
self.shell_commands = [] # type: List[CommandSet]
|
||||
|
||||
# Note: the basic properties (fid, name, ect.) are verified when
|
||||
# Note: the basic properties (fid, name, etc.) are verified when
|
||||
# the file is attached to a parent file. See method add_file() in
|
||||
# class Card DF
|
||||
|
||||
@@ -266,7 +266,7 @@ class CardFile:
|
||||
def get_profile(self):
|
||||
"""Get the profile associated with this file. If this file does not have any
|
||||
profile assigned, try to find a file above (usually the MF) in the filesystem
|
||||
hirarchy that has a profile assigned
|
||||
hierarchy that has a profile assigned
|
||||
"""
|
||||
|
||||
# If we have a profile set, return it
|
||||
@@ -679,7 +679,7 @@ class TransparentEF(CardEF):
|
||||
Args:
|
||||
fid : File Identifier (4 hex digits)
|
||||
sfid : Short File Identifier (2 hex digits, optional)
|
||||
name : Brief name of the file, lik EF_ICCID
|
||||
name : Brief name of the file, like EF_ICCID
|
||||
desc : Description of the file
|
||||
parent : Parent CardFile object within filesystem hierarchy
|
||||
size : tuple of (minimum_size, recommended_size)
|
||||
@@ -982,11 +982,11 @@ class LinFixedEF(CardEF):
|
||||
Args:
|
||||
fid : File Identifier (4 hex digits)
|
||||
sfid : Short File Identifier (2 hex digits, optional)
|
||||
name : Brief name of the file, lik EF_ICCID
|
||||
name : Brief name of the file, like EF_ICCID
|
||||
desc : Description of the file
|
||||
parent : Parent CardFile object within filesystem hierarchy
|
||||
rec_len : Tuple of (minimum_length, recommended_length)
|
||||
leftpad: On write, data must be padded from the left to fit pysical record length
|
||||
leftpad: On write, data must be padded from the left to fit physical record length
|
||||
"""
|
||||
super().__init__(fid=fid, sfid=sfid, name=name, desc=desc, parent=parent, **kwargs)
|
||||
self.rec_len = rec_len
|
||||
@@ -1422,7 +1422,7 @@ class BerTlvEF(CardEF):
|
||||
Args:
|
||||
fid : File Identifier (4 hex digits)
|
||||
sfid : Short File Identifier (2 hex digits, optional)
|
||||
name : Brief name of the file, lik EF_ICCID
|
||||
name : Brief name of the file, like EF_ICCID
|
||||
desc : Description of the file
|
||||
parent : Parent CardFile object within filesystem hierarchy
|
||||
size : tuple of (minimum_size, recommended_size)
|
||||
@@ -1455,7 +1455,7 @@ class BerTlvEF(CardEF):
|
||||
export_str += "delete_all\n"
|
||||
for t in tags:
|
||||
result = lchan.retrieve_data(t)
|
||||
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
|
||||
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
|
||||
export_str += ("set_data 0x%02x %s\n" % (t, b2h(val)))
|
||||
return export_str.strip()
|
||||
|
||||
@@ -1495,7 +1495,7 @@ class CardApplication:
|
||||
self.name = name
|
||||
self.adf = adf
|
||||
self.sw = sw or {}
|
||||
# back-reference from ADF to Applicaiton
|
||||
# back-reference from ADF to Application
|
||||
if self.adf:
|
||||
self.aid = aid or self.adf.aid
|
||||
self.adf.application = self
|
||||
@@ -1545,6 +1545,13 @@ class CardModel(abc.ABC):
|
||||
if atr == card_atr:
|
||||
print("Detected CardModel:", cls.__name__)
|
||||
return True
|
||||
# if nothing found try to just compare the Historical Bytes of the ATR
|
||||
card_atr_hb = decomposeATR(card_atr)['hb']
|
||||
for atr in cls._atrs:
|
||||
atr_hb = decomposeATR(atr)['hb']
|
||||
if atr_hb == card_atr_hb:
|
||||
print("Detected CardModel:", cls.__name__)
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
@@ -1565,7 +1572,7 @@ class Path:
|
||||
p = p.split('/')
|
||||
elif len(p) and isinstance(p[0], int):
|
||||
p = ['%04x' % x for x in p]
|
||||
# make sure internal representation alwas is uppercase only
|
||||
# make sure internal representation always is uppercase only
|
||||
self.list = [x.upper() for x in p]
|
||||
|
||||
def __str__(self) -> str:
|
||||
|
||||
@@ -627,7 +627,7 @@ class ADF_SD(CardADF):
|
||||
kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b''
|
||||
kcv = b2h(kcv_bin)
|
||||
if self._cmd.lchan.scc.scp:
|
||||
# encrypte key data with DEK of current SCP
|
||||
# encrypted key data with DEK of current SCP
|
||||
kcb = b2h(self._cmd.lchan.scc.scp.encrypt_key(h2b(opts.key_data[i])))
|
||||
else:
|
||||
# (for example) during personalization, DEK might not be required)
|
||||
@@ -755,7 +755,7 @@ class ADF_SD(CardADF):
|
||||
|
||||
inst_load_parser = argparse.ArgumentParser()
|
||||
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
|
||||
help='AID of the loded file')
|
||||
help='AID of the loaded file')
|
||||
inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='',
|
||||
help='AID of the Security Domain into which the file shalle be added')
|
||||
inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='',
|
||||
@@ -845,7 +845,7 @@ class ADF_SD(CardADF):
|
||||
# TODO:tune chunk_len based on the overhead of the used SCP?
|
||||
# build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
|
||||
remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
|
||||
# transfer this in vaious chunks to the card
|
||||
# transfer this in various chunks to the card
|
||||
total_size = len(remainder)
|
||||
block_nr = 0
|
||||
while len(remainder):
|
||||
|
||||
@@ -104,4 +104,4 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
||||
# KID 0x02: SK.CASD.AUT (PK) and KS.CASD.AUT (Non-PK)
|
||||
# KID 0x03: SK.CASD.CT (P) and KS.CASD.CT (Non-PK)
|
||||
# KVN 0x75 KID 0x01: 16-byte DES key for Ciphered Load File Data Block
|
||||
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s upport
|
||||
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s support
|
||||
|
||||
@@ -97,7 +97,7 @@ class CapFile():
|
||||
raise ValueError("invalid cap file, %s missing!" % required_components[component])
|
||||
|
||||
def get_loadfile(self) -> bytes:
|
||||
"""Get the executeable loadfile as hexstring"""
|
||||
"""Get the executable loadfile as hexstring"""
|
||||
# Concatenate all cap file components in the specified order
|
||||
# see also: Java Card Platform Virtual Machine Specification, v3.2, section 6.3
|
||||
loadfile = self.__component['Header']
|
||||
|
||||
@@ -495,7 +495,7 @@ class IsimCard(UiccCardBase):
|
||||
|
||||
class MagicSimBase(abc.ABC, SimCard):
|
||||
"""
|
||||
Theses cards uses several record based EFs to store the provider infos,
|
||||
These cards uses several record based EFs to store the provider infos,
|
||||
each possible provider uses a specific record number in each EF. The
|
||||
indexes used are ( where N is the number of providers supported ) :
|
||||
- [2 .. N+1] for the operator name
|
||||
@@ -644,7 +644,7 @@ class MagicSim(MagicSimBase):
|
||||
|
||||
class FakeMagicSim(SimCard):
|
||||
"""
|
||||
Theses cards have a record based EF 3f00/000c that contains the provider
|
||||
These cards have a record based EF 3f00/000c that contains the provider
|
||||
information. See the program method for its format. The records go from
|
||||
1 to N.
|
||||
"""
|
||||
|
||||
@@ -296,7 +296,7 @@ def dec_addr_tlv(hexstr):
|
||||
|
||||
elif addr_type == 0x01: # IPv4
|
||||
# Skip address tye byte i.e. first byte in value list
|
||||
# Skip the unused byte in Octect 4 after address type byte as per 3GPP TS 31.102
|
||||
# Skip the unused byte in Octet 4 after address type byte as per 3GPP TS 31.102
|
||||
ipv4 = tlv[2][2:]
|
||||
content = '.'.join(str(x) for x in ipv4)
|
||||
return (content, '01')
|
||||
|
||||
@@ -110,7 +110,7 @@ class CardProfile:
|
||||
@abc.abstractmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
"""Try to see if the specific profile matches the card. This method is a
|
||||
placeholder that is overloaded by specific dirived classes. The method
|
||||
placeholder that is overloaded by specific derived classes. The method
|
||||
actively probes the card to make sure the profile class matches the
|
||||
physical card. This usually also means that the card is reset during
|
||||
the process, so this method must not be called at random times. It may
|
||||
|
||||
@@ -60,6 +60,9 @@ class RuntimeState:
|
||||
self.card.set_apdu_parameter(
|
||||
cla=self.profile.cla, sel_ctrl=self.profile.sel_ctrl)
|
||||
|
||||
# make sure MF is selected before probing for Addons
|
||||
self.lchan[0].select('MF')
|
||||
|
||||
for addon_cls in self.profile.addons:
|
||||
addon = addon_cls()
|
||||
if addon.probe(self.card):
|
||||
@@ -147,7 +150,7 @@ class RuntimeState:
|
||||
# select MF to reset internal state and to verify card really works
|
||||
self.lchan[0].select('MF', cmd_app)
|
||||
self.lchan[0].selected_adf = None
|
||||
# store ATR as part of our card identies dict
|
||||
# store ATR as part of our card identities dict
|
||||
self.identity['ATR'] = atr
|
||||
return atr
|
||||
|
||||
@@ -321,7 +324,7 @@ class RuntimeLchan:
|
||||
# If we succeed, we know that the file exists on the card and we may
|
||||
# proceed with creating a new CardEF object in the local file model at
|
||||
# run time. In case the file does not exist on the card, we just abort.
|
||||
# The state on the card (selected file/application) wont't be changed,
|
||||
# The state on the card (selected file/application) won't be changed,
|
||||
# so we do not have to update any state in that case.
|
||||
(data, _sw) = self.scc.select_file(fid)
|
||||
except SwMatchError as swm:
|
||||
|
||||
@@ -32,7 +32,7 @@ class SecureChannel(abc.ABC):
|
||||
pass
|
||||
|
||||
def send_apdu_wrapper(self, send_fn: callable, pdu: Hexstr, *args, **kwargs) -> ResTuple:
|
||||
"""Wrapper function to wrap command APDU and unwrap repsonse APDU around send_apdu callable."""
|
||||
"""Wrapper function to wrap command APDU and unwrap response APDU around send_apdu callable."""
|
||||
pdu_wrapped = b2h(self.wrap_cmd_apdu(h2b(pdu)))
|
||||
res, sw = send_fn(pdu_wrapped, *args, **kwargs)
|
||||
res_unwrapped = b2h(self.unwrap_rsp_apdu(h2b(sw), h2b(res)))
|
||||
|
||||
@@ -200,7 +200,7 @@ class LinkBase(abc.ABC):
|
||||
# It *was* successful after all -- the extra pieces FETCH handled
|
||||
# need not concern the caller.
|
||||
rv = (rv[0], '9000')
|
||||
# proactive sim as per TS 102 221 Setion 7.4.2
|
||||
# proactive sim as per TS 102 221 Section 7.4.2
|
||||
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
|
||||
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
|
||||
# Setting this in case we later decide not to send a terminal
|
||||
@@ -228,7 +228,7 @@ class LinkBase(abc.ABC):
|
||||
# Structure as per TS 102 223 V4.4.0 Section 6.8
|
||||
|
||||
# Testing hint: The value of tail does not influence the behavior
|
||||
# of an SJA2 that sent ans SMS, so this is implemented only
|
||||
# of an SJA2 that sent an SMS, so this is implemented only
|
||||
# following TS 102 223, and not fully tested.
|
||||
ti_list_bin = [x.to_tlv() for x in ti_list]
|
||||
tail = b''.join(ti_list_bin)
|
||||
|
||||
@@ -208,7 +208,7 @@ EF_5G_PROSE_ST_map = {
|
||||
5: '5G ProSe configuration data for usage information reporting',
|
||||
}
|
||||
|
||||
# Mapping between USIM Enbled Service Number and its description
|
||||
# Mapping between USIM Enabled Service Number and its description
|
||||
EF_EST_map = {
|
||||
1: 'Fixed Dialling Numbers (FDN)',
|
||||
2: 'Barred Dialling Numbers (BDN)',
|
||||
|
||||
@@ -119,7 +119,7 @@ class EF_AC_GBAUAPI(LinFixedEF):
|
||||
"""The use of this EF is eescribed in 3GPP TS 31.130"""
|
||||
class AppletNafAccessControl(BER_TLV_IE, tag=0x80):
|
||||
# the use of Int8ub as length field in Prefixed is strictly speaking incorrect, as it is a BER-TLV
|
||||
# length field whihc will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
|
||||
# length field which will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
|
||||
# safely be assumed shorter than that
|
||||
_construct = Struct('aid'/Prefixed(Int8ub, GreedyBytes),
|
||||
'naf_id'/Prefixed(Int8ub, GreedyBytes))
|
||||
|
||||
@@ -1007,7 +1007,7 @@ class EF_ICCID(TransparentEF):
|
||||
def _encode_hex(self, abstract, **kwargs):
|
||||
return enc_iccid(abstract['iccid'])
|
||||
|
||||
# TS 102 221 Section 13.3 / TS 31.101 Secction 13 / TS 51.011 Section 10.1.2
|
||||
# TS 102 221 Section 13.3 / TS 31.101 Section 13 / TS 51.011 Section 10.1.2
|
||||
class EF_PL(TransRecEF):
|
||||
_test_de_encode = [
|
||||
( '6465', "de" ),
|
||||
|
||||
139
pySim/utils.py
139
pySim/utils.py
@@ -15,6 +15,7 @@ from osmocom.tlv import bertlv_encode_tag, bertlv_encode_len
|
||||
|
||||
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
|
||||
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
|
||||
# Copyright (C) 2009-2022 Ludovic Rousseau
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
@@ -330,7 +331,7 @@ def derive_mnc(digit1: int, digit2: int, digit3: int = 0x0f) -> int:
|
||||
mnc = 0
|
||||
|
||||
# 3-rd digit is optional for the MNC. If present
|
||||
# the algorythm is the same as for the MCC.
|
||||
# the algorithm is the same as for the MCC.
|
||||
if digit3 != 0x0f:
|
||||
return derive_mcc(digit1, digit2, digit3)
|
||||
|
||||
@@ -410,7 +411,7 @@ def get_addr_type(addr):
|
||||
|
||||
fqdn_flag = True
|
||||
for i in addr_list:
|
||||
# Only Alpha-numeric characters and hyphen - RFC 1035
|
||||
# Only Alphanumeric characters and hyphen - RFC 1035
|
||||
import re
|
||||
if not re.match("^[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)?$", i):
|
||||
fqdn_flag = False
|
||||
@@ -476,7 +477,7 @@ def expand_hex(hexstring, length):
|
||||
"""Expand a given hexstring to a specified length by replacing "." or ".."
|
||||
with a filler that is derived from the neighboring nibbles respective
|
||||
bytes. Usually this will be the nibble respective byte before "." or
|
||||
"..", execpt when the string begins with "." or "..", then the nibble
|
||||
"..", except when the string begins with "." or "..", then the nibble
|
||||
respective byte after "." or ".." is used.". In case the string cannot
|
||||
be expanded for some reason, the input string is returned unmodified.
|
||||
|
||||
@@ -585,10 +586,138 @@ def parse_command_apdu(apdu: bytes) -> int:
|
||||
raise ValueError('invalid APDU (%s), too short!' % b2h(apdu))
|
||||
|
||||
|
||||
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
|
||||
def normalizeATR(atr):
|
||||
"""Transform an ATR in list of integers.
|
||||
valid input formats are
|
||||
"3B A7 00 40 18 80 65 A2 08 01 01 52"
|
||||
"3B:A7:00:40:18:80:65:A2:08:01:01:52"
|
||||
|
||||
Args:
|
||||
atr: string
|
||||
Returns:
|
||||
list of bytes
|
||||
|
||||
>>> normalize("3B:A7:00:40:18:80:65:A2:08:01:01:52")
|
||||
[59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82]
|
||||
"""
|
||||
atr = atr.replace(":", "")
|
||||
atr = atr.replace(" ", "")
|
||||
|
||||
res = []
|
||||
while len(atr) >= 2:
|
||||
byte, atr = atr[:2], atr[2:]
|
||||
res.append(byte)
|
||||
if len(atr) > 0:
|
||||
raise ValueError("warning: odd string, remainder: %r" % atr)
|
||||
|
||||
atr = [int(x, 16) for x in res]
|
||||
return atr
|
||||
|
||||
|
||||
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
|
||||
def decomposeATR(atr_txt):
|
||||
"""Decompose the ATR in elementary fields
|
||||
|
||||
Args:
|
||||
atr_txt: ATR as a hex bytes string
|
||||
Returns:
|
||||
dictionary of field and values
|
||||
|
||||
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
|
||||
{ 'T0': {'value': 167},
|
||||
'TB': {1: {'value': 0}},
|
||||
'TC': {2: {'value': 24}},
|
||||
'TD': {1: {'value': 64}},
|
||||
'TS': {'value': 59},
|
||||
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
|
||||
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
|
||||
'hbn': 7}
|
||||
"""
|
||||
ATR_PROTOCOL_TYPE_T0 = 0
|
||||
atr_txt = normalizeATR(atr_txt)
|
||||
atr = {}
|
||||
|
||||
# the ATR itself as a list of integers
|
||||
atr["atr"] = atr_txt
|
||||
|
||||
# store TS and T0
|
||||
atr["TS"] = {"value": atr_txt[0]}
|
||||
TDi = atr_txt[1]
|
||||
atr["T0"] = {"value": TDi}
|
||||
hb_length = TDi & 15
|
||||
pointer = 1
|
||||
# protocol number
|
||||
pn = 1
|
||||
|
||||
# store number of historical bytes
|
||||
atr["hbn"] = TDi & 0xF
|
||||
|
||||
while pointer < len(atr_txt):
|
||||
# Check TAi is present
|
||||
if (TDi | 0xEF) == 0xFF:
|
||||
pointer += 1
|
||||
if "TA" not in atr:
|
||||
atr["TA"] = {}
|
||||
atr["TA"][pn] = {"value": atr_txt[pointer]}
|
||||
|
||||
# Check TBi is present
|
||||
if (TDi | 0xDF) == 0xFF:
|
||||
pointer += 1
|
||||
if "TB" not in atr:
|
||||
atr["TB"] = {}
|
||||
atr["TB"][pn] = {"value": atr_txt[pointer]}
|
||||
|
||||
# Check TCi is present
|
||||
if (TDi | 0xBF) == 0xFF:
|
||||
pointer += 1
|
||||
if "TC" not in atr:
|
||||
atr["TC"] = {}
|
||||
atr["TC"][pn] = {"value": atr_txt[pointer]}
|
||||
|
||||
# Check TDi is present
|
||||
if (TDi | 0x7F) == 0xFF:
|
||||
pointer += 1
|
||||
if "TD" not in atr:
|
||||
atr["TD"] = {}
|
||||
TDi = atr_txt[pointer]
|
||||
atr["TD"][pn] = {"value": TDi}
|
||||
if (TDi & 0x0F) != ATR_PROTOCOL_TYPE_T0:
|
||||
atr["TCK"] = True
|
||||
pn += 1
|
||||
else:
|
||||
break
|
||||
|
||||
# Store historical bytes
|
||||
atr["hb"] = {"value": atr_txt[pointer + 1 : pointer + 1 + hb_length]}
|
||||
|
||||
# Store TCK
|
||||
last = pointer + 1 + hb_length
|
||||
if "TCK" in atr:
|
||||
try:
|
||||
atr["TCK"] = {"value": atr_txt[last]}
|
||||
except IndexError:
|
||||
atr["TCK"] = {"value": -1}
|
||||
last += 1
|
||||
|
||||
if len(atr_txt) > last:
|
||||
atr["extra"] = atr_txt[last:]
|
||||
|
||||
if len(atr["hb"]["value"]) < hb_length:
|
||||
missing = hb_length - len(atr["hb"]["value"])
|
||||
if missing > 1:
|
||||
(t1, t2) = ("s", "are")
|
||||
else:
|
||||
(t1, t2) = ("", "is")
|
||||
atr["warning"] = "ATR is truncated: %d byte%s %s missing" % (missing, t1, t2)
|
||||
|
||||
return atr
|
||||
|
||||
|
||||
class DataObject(abc.ABC):
|
||||
"""A DataObject (DO) in the sense of ISO 7816-4. Contrary to 'normal' TLVs where one
|
||||
simply has any number of different TLVs that may occur in any order at any point, ISO 7816
|
||||
has the habit of specifying TLV data but with very spcific ordering, or specific choices of
|
||||
has the habit of specifying TLV data but with very specific ordering, or specific choices of
|
||||
tags at specific points in a stream. This class tries to represent this."""
|
||||
|
||||
def __init__(self, name: str, desc: Optional[str] = None, tag: Optional[int] = None):
|
||||
@@ -710,7 +839,7 @@ class TL0_DataObject(DataObject):
|
||||
|
||||
|
||||
class DataObjectCollection:
|
||||
"""A DataObjectCollection consits of multiple Data Objects identified by their tags.
|
||||
"""A DataObjectCollection consists of multiple Data Objects identified by their tags.
|
||||
A given encoded DO may contain any of them in any order, and may contain multiple instances
|
||||
of each DO."""
|
||||
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
[build-system]
|
||||
requires = ["setuptools", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.pylint.main]
|
||||
ignored-classes = ["twisted.internet.reactor"]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
pyscard
|
||||
pyserial
|
||||
pytlv
|
||||
cmd2>=1.5
|
||||
cmd2>=2.6.2,<3.0
|
||||
jsonpath-ng
|
||||
construct>=2.10.70
|
||||
bidict
|
||||
|
||||
8
setup.py
8
setup.py
@@ -49,4 +49,12 @@ setup(
|
||||
'asn1/saip/*.asn',
|
||||
],
|
||||
},
|
||||
extras_require={
|
||||
"smdpp": [
|
||||
"klein",
|
||||
"service-identity",
|
||||
"pyopenssl",
|
||||
"requests",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
16
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
Normal file
16
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
Normal file
@@ -0,0 +1,16 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICgzCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
|
||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI
|
||||
zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n
|
||||
MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV
|
||||
HQ8BAf8EBAMCB4AwIAYDVR0lAQH/BBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMBQG
|
||||
A1UdIAQNMAswCQYHZ4ESAQIBAzAdBgNVHQ4EFgQUPTMJg/OfzFvS5K1ophmnR0iu
|
||||
i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ
|
||||
dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG
|
||||
JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0
|
||||
cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0gA
|
||||
MEUCIQCfaGcMk+kuSJsbIyRPWttwWNftwQdHCQuu346PaiA2FAIgUrqhPw2um9gV
|
||||
C+eWHaXio7WQh5L6VgLZzNifTQcldD4=
|
||||
-----END CERTIFICATE-----
|
||||
Binary file not shown.
@@ -1,7 +1,7 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICgjCCAiigAwIBAgIBCjAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MIICgjCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||
SVQwHhcNMjUwNDIzMTUyMzA1WhcNMzUwNDIxMTUyMzA1WjAzMQ0wCwYDVQQKDARB
|
||||
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
|
||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
|
||||
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
|
||||
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
|
||||
@@ -11,6 +11,6 @@ qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
|
||||
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
|
||||
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
|
||||
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSAAw
|
||||
RQIgYakBZP6y9/2iu9aZkgb89SQgfRb0TKVMGElWgtyoX2gCIQCT8o/TkAxhWCTY
|
||||
yaBDMi1L9Ub+93ef5s+S+eHLmwuudA==
|
||||
RQIhAL+1lp/hGsj87/5RqOX2u3hS/VSftDN7EPrHJJFnTXLRAiBVxemKIKmC7+W1
|
||||
+RsTY5I51R+Cyoq4l5TEU49eplo5bw==
|
||||
-----END CERTIFICATE-----
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user