mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-26 07:18:33 +03:00
smdpp: less verbose by default
Those data blobs are huge. Change-Id: I04a72b8f52417862d4dcba1f0743700dd942ef49
This commit is contained in:
@@ -47,6 +47,9 @@ from pySim.esim.es8p import *
|
||||
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
|
||||
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# HACK: make this configurable
|
||||
DATA_DIR = './smdpp-data'
|
||||
HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
|
||||
@@ -106,12 +109,12 @@ def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
||||
|
||||
if cert_variant == 'O':
|
||||
# Old variant - use nameConstraints extension
|
||||
print("Using nameConstraints parsing for variant O certificate")
|
||||
#print("Using nameConstraints parsing for variant O certificate")
|
||||
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
|
||||
|
||||
else:
|
||||
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
|
||||
print("Using GSMA permittedEins parsing for newer certificate variant")
|
||||
#print("Using GSMA permittedEins parsing for newer certificate variant")
|
||||
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
|
||||
|
||||
unique_iins = list(set(permitted_iins))
|
||||
@@ -178,13 +181,13 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
||||
x509.oid.ExtensionOID.NAME_CONSTRAINTS
|
||||
)
|
||||
|
||||
print("Found nameConstraints extension (variant O)")
|
||||
# print("Found nameConstraints extension (variant O)")
|
||||
name_constraints = name_constraints_ext.value
|
||||
|
||||
# Check permittedSubtrees for IIN constraints
|
||||
if name_constraints.permitted_subtrees:
|
||||
for subtree in name_constraints.permitted_subtrees:
|
||||
print(f"Processing permitted subtree: {subtree}")
|
||||
# print(f"Processing permitted subtree: {subtree}")
|
||||
|
||||
if isinstance(subtree, x509.DirectoryName):
|
||||
for attribute in subtree.value:
|
||||
@@ -299,11 +302,11 @@ class SmDppHttpServer:
|
||||
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
|
||||
"""Find CI certificate for given key identifier."""
|
||||
for cert in self.ci_certs:
|
||||
print("cert: %s" % cert)
|
||||
logger.debug("cert: %s" % cert)
|
||||
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
|
||||
print(subject_exts)
|
||||
logger.debug(subject_exts)
|
||||
subject_pkid = subject_exts[0].value
|
||||
print(subject_pkid)
|
||||
logger.debug(subject_pkid)
|
||||
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
|
||||
return cert
|
||||
return None
|
||||
@@ -356,7 +359,7 @@ class SmDppHttpServer:
|
||||
validate_request_headers(request)
|
||||
|
||||
content = json.loads(request.content.read())
|
||||
print("Rx JSON: %s" % json.dumps(content))
|
||||
#logger.debug("Rx JSON: %s" % json.dumps(content))
|
||||
set_headers(request)
|
||||
|
||||
output = func(self, request, content)
|
||||
@@ -364,7 +367,7 @@ class SmDppHttpServer:
|
||||
return ''
|
||||
|
||||
build_resp_header(output)
|
||||
print("Tx JSON: %s" % json.dumps(output))
|
||||
logger.debug("Tx JSON: %s" % json.dumps(output)[:200])
|
||||
return json.dumps(output)
|
||||
return _api_wrapper
|
||||
|
||||
@@ -383,7 +386,7 @@ class SmDppHttpServer:
|
||||
|
||||
euiccInfo1_bin = b64decode(content['euiccInfo1'])
|
||||
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
|
||||
print("Rx euiccInfo1: %s" % euiccInfo1)
|
||||
logger.debug("Rx euiccInfo1: %s" % euiccInfo1)
|
||||
#euiccInfo1['svn']
|
||||
|
||||
# TODO: If euiccCiPKIdListForSigningV3 is present ...
|
||||
@@ -458,7 +461,7 @@ class SmDppHttpServer:
|
||||
|
||||
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
|
||||
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
|
||||
print("Rx %s: %s" % authenticateServerResp)
|
||||
logger.debug("Rx %s: %s" % authenticateServerResp)
|
||||
if authenticateServerResp[0] == 'authenticateResponseError':
|
||||
r_err = authenticateServerResp[1]
|
||||
#r_err['transactionId']
|
||||
@@ -589,7 +592,7 @@ class SmDppHttpServer:
|
||||
|
||||
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
|
||||
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
|
||||
print("Rx %s: %s" % prepDownloadResp)
|
||||
logger.debug("Rx %s: %s" % prepDownloadResp)
|
||||
|
||||
if prepDownloadResp[0] == 'downloadResponseError':
|
||||
r_err = prepDownloadResp[1]
|
||||
@@ -611,7 +614,7 @@ class SmDppHttpServer:
|
||||
|
||||
# store otPK.EUICC.ECKA in session state
|
||||
ss.euicc_otpk = euiccSigned2['euiccOtpk']
|
||||
print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||
logger.debug("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||
|
||||
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
|
||||
# Reference value of CERT.DPpb.ECDDSA
|
||||
@@ -619,8 +622,8 @@ class SmDppHttpServer:
|
||||
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
|
||||
# extract the public key in (hopefully) the right format for the ES8+ interface
|
||||
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
||||
print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||
print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||
logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||
logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||
|
||||
ss.host_id = b'mahlzeit'
|
||||
|
||||
@@ -661,7 +664,7 @@ class SmDppHttpServer:
|
||||
request.setResponseCode(204)
|
||||
pendingNotification_bin = b64decode(content['pendingNotification'])
|
||||
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
|
||||
print("Rx %s: %s" % pendingNotification)
|
||||
logger.debug("Rx %s: %s" % pendingNotification)
|
||||
if pendingNotification[0] == 'profileInstallationResult':
|
||||
profileInstallRes = pendingNotification[1]
|
||||
pird = profileInstallRes['profileInstallationResultData']
|
||||
@@ -714,7 +717,7 @@ class SmDppHttpServer:
|
||||
@rsp_api_wrapper
|
||||
def cancelSession(self, request: IRequest, content: dict) -> dict:
|
||||
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
|
||||
print("Rx JSON: %s" % content)
|
||||
logger.debug("Rx JSON: %s" % content)
|
||||
transactionId = content['transactionId']
|
||||
|
||||
# Verify that the received transactionId is known and relates to an ongoing RSP session
|
||||
@@ -724,7 +727,7 @@ class SmDppHttpServer:
|
||||
|
||||
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
|
||||
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
|
||||
print("Rx %s: %s" % cancelSessionResponse)
|
||||
logger.debug("Rx %s: %s" % cancelSessionResponse)
|
||||
|
||||
if cancelSessionResponse[0] == 'cancelSessionResponseError':
|
||||
# FIXME: print some error
|
||||
@@ -760,8 +763,11 @@ def main(argv):
|
||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
|
||||
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
||||
parser.add_argument("-s", "--nossl", help="do NOT use ssl", action='store_true', default=False)
|
||||
parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
|
||||
|
||||
common_cert_path = os.path.join(DATA_DIR, args.certdir)
|
||||
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=False)
|
||||
if(args.nossl):
|
||||
|
||||
Reference in New Issue
Block a user