diff --git a/pySim/esim/saip/batch.py b/pySim/esim/saip/batch.py index 2af05790..236f6191 100644 --- a/pySim/esim/saip/batch.py +++ b/pySim/esim/saip/batch.py @@ -115,3 +115,218 @@ class BatchPersonalization: raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}') from e yield pes + + +class UppAudit(dict): + """ + Key-value pairs collected from a single UPP DER or PES. + + UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP. + """ + + @classmethod + def from_der(cls, der: bytes, params: List, der_size=False): + '''return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note: + some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns + both 'IMSI' and 'IMSI-ACC' parameters. + + e.g. + UppAudit.from_der(my_der, [Imsi, ]) + --> {'IMSI': '001010000000023', 'IMSI-ACC': '5'} + + (where 'IMSI' == Imsi.name) + + Read all parameters listed in params. params is a list of either ConfigurableParameter classes or + ConfigurableParameter class instances. This calls only classmethods, so each entry in params can either be the + class itself, or a class-instance of, a (non-abstract) ConfigurableParameter subclass. + For example, params = [Imsi, ] is equivalent to params = [Imsi(), ]. + + For der_size=True, also include a {'der_size':12345} entry. + ''' + + # make an instance of this class + upp_audit = cls() + + if der_size: + upp_audit['der_size'] = set((len(der), )) + + pes = ProfileElementSequence.from_der(der) + for param in params: + try: + for valdict in param.get_values_from_pes(pes): + upp_audit.add_values(valdict) + except (TypeError, ValueError) as e: + raise ValueError(f'Error during audit for parameter {param}: {e}') from e + return upp_audit + + def get_single_val(self, key, validate=True, allow_absent=False, absent_val=None): + """ + Return the audit's value for the given audit key (like 'IMSI' or 'IMSI-ACC'). + Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value, + return that value. When they do not agree, raise a ValueError. + """ + # key should be a string, but if someone passes a ConfigurableParameter, just use its default name + if ConfigurableParameter.is_super_of(key): + key = key.get_name() + + assert isinstance(key, str) + v = self.get(key) + if v is None and allow_absent: + return absent_val + if not isinstance(v, set): + raise ValueError(f'audit value should be a set(), got {v!r}') + if len(v) != 1: + raise ValueError(f'expected a single value for {key}, got {v!r}') + v = tuple(v)[0] + return v + + @staticmethod + def audit_val_to_str(v): + """ + Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values, + audit values are always python sets. Turn it into a nice string representation: only the value when it is + unambiguous, otherwise a list of the ambiguous values. + A value may also be completely absent, then return 'not present'. + """ + def try_single_val(w): + 'change single-entry sets to just the single value' + if isinstance(w, set): + if len(w) == 1: + return tuple(w)[0] + if len(w) == 0: + return None + return w + + v = try_single_val(v) + if isinstance(v, bytes): + v = bytes_to_hexstr(v) + if v is None: + return 'not present' + return str(v) + + def get_val_str(self, key): + """Return a string of the value stored for the given key""" + return UppAudit.audit_val_to_str(self.get(key)) + + def add_values(self, src:dict): + """self and src are both a dict of sets. + For example from + self == { 'a': set((123,)) } + and + src == { 'a': set((456,)), 'b': set((789,)) } + then after this function call: + self == { 'a': set((123, 456,)), 'b': set((789,)) } + """ + assert isinstance(src, dict) + for key, srcvalset in src.items(): + dstvalset = self.get(key) + if dstvalset is None: + dstvalset = set() + self[key] = dstvalset + dstvalset.add(srcvalset) + + def __str__(self): + return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys())) + +class BatchAudit(list): + """ + Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization. + Produce an output CSV. + + Usage example: + + ba = BatchAudit(params=(personalization.Iccid, )) + for upp_der in upps: + ba.add_audit(upp_der) + print(ba.summarize()) + + with open('output.csv', 'wb') as csv_data: + csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='') + csv.writer(csv_str).writerows( ba.to_csv_rows() ) + csv_str.flush() + + BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances. + """ + + def __init__(self, params:List): + assert params + self.params = params + + def add_audit(self, upp_der:bytes): + audit = UppAudit.from_der(upp_der, self.params) + self.append(audit) + return audit + + def summarize(self): + batch_audit = UppAudit() + + audits = self + + if len(audits) > 2: + val_sep = ', ..., ' + else: + val_sep = ', ' + + first_audit = None + last_audit = None + if len(audits) >= 1: + first_audit = audits[0] + if len(audits) >= 2: + last_audit = audits[-1] + + if first_audit: + if last_audit: + for key in first_audit.keys(): + first_val = first_audit.get_val_str(key) + last_val = last_audit.get_val_str(key) + + if first_val == last_val: + val = first_val + else: + val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}" + val = val_sep_with_newline.join((first_val, last_val)) + batch_audit[key] = val + else: + batch_audit.update(first_audit) + + return batch_audit + + def to_csv_rows(self, headers=True, sort_key=None): + '''generator that yields all audits' values as rows, useful feed to a csv.writer.''' + columns = set() + for audit in self: + columns.update(audit.keys()) + + columns = tuple(sorted(columns, key=sort_key)) + + if headers: + yield columns + + for audit in self: + yield (audit.get_single_val(col, allow_absent=True, absent_val="") for col in columns) + +def bytes_to_hexstr(b:bytes, sep=''): + return sep.join(f'{x:02x}' for x in b) + +def esim_profile_introspect(upp): + pes = ProfileElementSequence.from_der(upp.read()) + d = {} + d['upp'] = repr(pes) + + def show_bytes_as_hexdump(item): + if isinstance(item, bytes): + return bytes_to_hexstr(item) + if isinstance(item, list): + return list(show_bytes_as_hexdump(i) for i in item) + if isinstance(item, tuple): + return tuple(show_bytes_as_hexdump(i) for i in item) + if isinstance(item, dict): + d = {} + for k, v in item.items(): + d[k] = show_bytes_as_hexdump(v) + return d + return item + + l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes) + d['pp'] = pprint.pformat(l, width=120) + return d diff --git a/pySim/esim/saip/personalization.py b/pySim/esim/saip/personalization.py index 75d619b4..73e746b5 100644 --- a/pySim/esim/saip/personalization.py +++ b/pySim/esim/saip/personalization.py @@ -19,6 +19,7 @@ import abc import io import os import re +import pprint from typing import List, Tuple, Generator, Optional from osmocom.tlv import camel_to_snake @@ -260,6 +261,13 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta): ''' return cls.get_len_range()[1] or 16 + @classmethod + def is_super_of(cls, other_class): + try: + return issubclass(other_class, cls) + except TypeError: + return False + class DecimalParam(ConfigurableParameter): """Decimal digits. The input value may be a string of decimal digits like '012345', or an int. The output of validate_val() is a string with only decimal digits 0-9, in the required length with leading zeros if necessary.