personalization: implement UppAudit and BatchAudit

Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a
This commit is contained in:
Neels Hofmeyr
2025-04-18 05:19:34 +02:00
parent 588d06cd9d
commit 9baafc1771

View File

@@ -20,6 +20,7 @@ import io
import os
import copy
import re
import pprint
from typing import List, Tuple, Generator, Optional
from osmocom.tlv import camel_to_snake
@@ -278,6 +279,13 @@ class ConfigurableParameter:
and ((not blacklist) or (c not in blacklist)))
)
@classmethod
def is_super_of(cls, other_class):
try:
return issubclass(other_class, cls)
except TypeError:
return False
class DecimalParam(ConfigurableParameter):
"""Decimal digits. The input value may be a string of decimal digits like '012345', or an int. The output of
validate_val() is a string with only decimal digits 0-9, in the required length with leading zeros if necessary.
@@ -1161,3 +1169,208 @@ class BatchPersonalization:
f' (input_value={p.param.input_value!r} value={p.param.value!r})') from e
yield pes
class UppAudit(dict):
"""
Key-value pairs collected from a single UPP DER or PES.
UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP.
"""
@classmethod
def from_der(cls, der: bytes, params: List):
'''return a dict of parameter name and set of parameter values found in a DER encoded profile.
Read all parameters listed in params. This calls only classmethods, so each entry in params can either be a class or
an instance of a class, of a (non-abstract) ConfigurableParameter subclass. For example, params = [Imsi, ] is
equivalent to params = [Imsi(), ].'''
upp_audit = cls()
upp_audit['der_size'] = set((len(der), ))
pes = ProfileElementSequence.from_der(der)
for param in params:
key = param.get_name()
if key in upp_audit:
raise ValueError(f'UPP audit: there seem to be two conflicting parameters with the name {key!r}: '
+ ', '.join(f"{param.get_name()}={param.__name__}" for param in params))
try:
for valdict in param.get_values_from_pes(pes):
upp_audit.add_values(valdict)
except (TypeError, ValueError) as e:
raise ValueError(f'Error during audit for parameter {key}: {e}') from e
return upp_audit
def get_single_val(self, param, validate=True, allow_absent=False, absent_val=None):
"""
Return the audit's value for the given ConfigurableParameter class.
Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value,
return that value. When they do not agree, raise a ValueError.
"""
cp = None
if ConfigurableParameter.is_super_of(param):
cp = param
key = param.name
else:
key = param
assert isinstance(key, str)
v = self.get(key)
if v is None and allow_absent:
return absent_val
if not isinstance(v, set):
raise ValueError(f'audit value should be a set(), got {v!r}')
if len(v) != 1:
raise ValueError(f'expected a single value for {key}, got {v!r}')
v = tuple(v)[0]
if validate and cp:
# run value by the ConfigurableParameter's validation.
# (do not use the returned value, because the returned value is encoded for a PES)
cp.validate_val(v)
return v
@staticmethod
def audit_val_to_str(v):
"""
Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values,
audit values are always python sets. Turn it into a nice string representation: only the value when it is
unambiguous, otherwise a list of the ambiguous values.
A value may also be completely absent, then return 'not present'.
"""
def try_single_val(w):
'change single-entry sets to just the single value'
if isinstance(w, set):
if len(w) == 1:
return tuple(w)[0]
if len(w) == 0:
return None
return w
v = try_single_val(v)
if isinstance(v, bytes):
v = bytes_to_hexstr(v)
if v is None:
return 'not present'
return str(v)
def get_val_str(self, key):
"""Return a string of the value stored for the given key"""
return UppAudit.audit_val_to_str(self.get(key))
def add_values(self, src:dict):
"""self and src are both a dict of sets.
For example from
self == { 'a': set((123,)) }
and
src == { 'a': set((456,)), 'b': set((789,)) }
then after this function call:
self == { 'a': set((123, 456,)), 'b': set((789,)) }
"""
assert isinstance(src, dict)
for key, srcvalset in src.items():
dstvalset = self.get(key)
if dstvalset is None:
dstvalset = set()
self[key] = dstvalset
dstvalset.add(srcvalset)
def __str__(self):
return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys()))
class BatchAudit(list):
"""
Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization.
Produce an output CSV.
Usage example:
ba = BatchAudit(params=(personalization.Iccid, ))
for upp_der in upps:
ba.add_audit(upp_der)
print(ba.summarize())
with open('output.csv', 'wb') as csv_data:
csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='')
csv.writer(csv_str).writerows( ba.to_csv_rows() )
csv_str.flush()
BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances.
"""
def __init__(self, params:List=None):
if params is None:
params = ConfigurableParameter.get_all_implementations()
self.params = params
def add_audit(self, upp_der:bytes):
audit = UppAudit.from_der(upp_der, self.params)
self.append(audit)
return audit
def summarize(self):
batch_audit = UppAudit()
audits = self
if len(audits) > 2:
val_sep = ', ..., '
else:
val_sep = ', '
first_audit = None
last_audit = None
if len(audits) >= 1:
first_audit = audits[0]
if len(audits) >= 2:
last_audit = audits[-1]
if first_audit:
if last_audit:
for key in first_audit.keys():
first_val = first_audit.get_val_str(key)
last_val = last_audit.get_val_str(key)
if first_val == last_val:
val = first_val
else:
val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}"
val = val_sep_with_newline.join((first_val, last_val))
batch_audit[key] = val
else:
batch_audit.update(first_audit)
return batch_audit
def to_csv_rows(self, headers=True):
'''generator that yields all audits' values as rows, useful feed to a csv.writer.'''
params = tuple(sorted(self.params, key=lambda param: param.get_name()))
if headers:
yield (p.get_name() for p in params)
for audit in self:
yield (audit.get_single_val(p, allow_absent=True, absent_val="") for p in params)
def bytes_to_hexstr(b:bytes, sep=''):
return sep.join(f'{x:02x}' for x in b)
def esim_profile_introspect(upp):
pes = ProfileElementSequence.from_der(upp.read())
d = {}
d['upp'] = repr(pes)
def show_bytes_as_hexdump(item):
if isinstance(item, bytes):
return bytes_to_hexstr(item)
if isinstance(item, list):
return list(show_bytes_as_hexdump(i) for i in item)
if isinstance(item, tuple):
return tuple(show_bytes_as_hexdump(i) for i in item)
if isinstance(item, dict):
d = {}
for k, v in item.items():
d[k] = show_bytes_as_hexdump(v)
return d
return item
l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes)
d['pp'] = pprint.pformat(l, width=120)
return d