personalization: add param_source.py, implement batch personalization

Implement pySim.esim.saip.personalization.BatchPersonalization,
generating N eSIM profiles from a preset configuration.

Batch parameters can be fed by a constant, incrementing, random or from
CSV rows: add pySim.esim.saip.param_source.* classes to feed such input
to each of the BatchPersonalization's ConfigurableParameter instances.

Related: SYS#6768
Change-Id: I497c60c101ea0eea980e8b1a4b1f36c0eda39002
This commit is contained in:
Neels Hofmeyr
2025-03-01 20:09:33 +01:00
parent dd42978285
commit a8f3962be3
2 changed files with 282 additions and 1 deletions

View File

@@ -0,0 +1,181 @@
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
#
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: nhofmeyr@sysmocom.de
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
from pySim.utils import all_subclasses_of
class ParamSourceExn(Exception):
pass
class ParamSourceExhaustedExn(ParamSourceExn):
pass
class ParamSourceUndefinedExn(ParamSourceExn):
pass
class ParamSource:
'abstract parameter source. For usage, see personalization.BatchPersonalization.'
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
name = 'none'
@classmethod
def get_all_implementations(cls, blacklist=None):
"return all subclasses of ParamSource that have is_abstract = False."
# return a set() so that multiple inheritance does not return dups
return set(c
for c in all_subclasses_of(cls)
if ((not blacklist) or (c not in blacklist))
)
@classmethod
def from_str(cls, s:str):
'''Subclasses implement this:
if a parameter source defines some string input magic, override this function.
For example, a RandomDigitSource derives the number of digits from the string length,
so the user can enter '0000' to get a four digit random number.'''
return cls(s)
def get_next(self, csv_row:dict=None):
'''Subclasses implement this: return the next value from the parameter source.
When there are no more values from the source, raise a ParamSourceExhaustedExn.'''
raise ParamSourceExhaustedExn()
class ConstantSource(ParamSource):
'one value for all'
name = 'constant'
def __init__(self, val:str):
self.val = val
def get_next(self, csv_row:dict=None):
return self.val
class RandomDigitSource(ParamSource):
'return a different sequence of random decimal digits each'
name = 'random decimal digits'
def __init__(self, num_digits, first_value, last_value):
"""
See also from_str().
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
num_digits: number of random digits (possibly with leading zeros) to generate.
first_value, last_value: the decimal range in which to provide random digits.
"""
num_digits = int(num_digits)
first_value = int(first_value)
last_value = int(last_value)
assert num_digits > 0
assert first_value <= last_value
self.num_digits = num_digits
self.val_first_last = (first_value, last_value)
def get_next(self, csv_row:dict=None):
val = random.randint(*self.val_first_last) # TODO secure random source?
return self.val_to_digit(val)
def val_to_digit(self, val:int):
return '%0*d' % (self.num_digits, val) # pylint: disable=consider-using-f-string
@classmethod
def from_str(cls, s:str):
if '..' in s:
first_str, last_str = s.split('..')
first_str = first_str.strip()
last_str = last_str.strip()
else:
first_str = s.strip()
last_str = None
first_value = int(first_str)
last_value = int(last_str) if last_str is not None else '9' * len(first_str)
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
class RandomHexDigitSource(ParamSource):
'return a different sequence of random hexadecimal digits each'
name = 'random hexadecimal digits'
def __init__(self, num_digits):
'see from_str()'
num_digits = int(num_digits)
if num_digits < 1:
raise ValueError('zero number of digits')
# hex digits always come in two
if (num_digits & 1) != 0:
raise ValueError(f'hexadecimal value should have even number of digits, not {num_digits}')
self.num_digits = num_digits
def get_next(self, csv_row:dict=None):
val = random.randbytes(self.num_digits // 2) # TODO secure random source?
return val
@classmethod
def from_str(cls, s:str):
return cls(num_digits=len(s.strip()))
class IncDigitSource(RandomDigitSource):
'incrementing sequence of digits'
name = 'incrementing decimal digits'
def __init__(self, *args, **kwargs):
"The arguments defining the number of digits and value range are identical to RandomDigitSource.__init__()."
super().__init__(*args, **kwargs)
self.next_val = None
self.reset()
def reset(self):
"Restart from the first value of the defined range passed to __init__()."
self.next_val = self.val_first_last[0]
def get_next(self, csv_row:dict=None):
val = self.next_val
if val is None:
raise ParamSourceExhaustedExn()
returnval = self.val_to_digit(val)
val += 1
if val > self.val_first_last[1]:
self.next_val = None
else:
self.next_val = val
return returnval
class CsvSource(ParamSource):
'apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)'
name = 'from CSV'
def __init__(self, csv_column):
"""
csv_column: column name indicating the column to use for this parameter.
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
CsvSource picks the column with the name matching csv_column.
"""
self.csv_column = csv_column
def get_next(self, csv_row:dict=None):
val = None
if csv_row:
val = csv_row.get(self.csv_column)
if not val:
raise ParamSourceUndefinedExn(f'no value for CSV column {self.csv_column!r}')
return val

View File

@@ -17,11 +17,13 @@
import abc
import io
from typing import List, Tuple
import copy
from typing import List, Tuple, Generator
from osmocom.tlv import camel_to_snake
from pySim.utils import enc_iccid, enc_imsi, h2b, rpad, sanitize_iccid, all_subclasses_of
from pySim.esim.saip import ProfileElement, ProfileElementSequence
from pySim.esim.saip import param_source
def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]:
"""In a list of tuples, remove all tuples whose first part equals 'unwanted_key'."""
@@ -607,3 +609,101 @@ class K(BinaryParam, AlgoConfig):
class Opc(K):
name = 'OPc'
algo_config_key = 'opc'
class BatchPersonalization:
"""Produce a series of eSIM profiles from predefined parameters.
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
Usage example:
der_input = some_file.open('rb').read()
pes = ProfileElementSequence.from_der(der_input)
p = pers.BatchPersonalization(
n=10,
src_pes=pes,
csv_rows=get_csv_reader())
p.add_param_and_src(
personalization.Iccid(),
param_source.IncDigitSource(
num_digits=18,
first_value=123456789012340001,
last_value=123456789012340010))
# add more parameters here, using ConfigurableParameter and ParamSource subclass instances to define the profile
# ...
# generate all 10 profiles (from n=10 above)
for result_pes in p.generate_profiles():
upp = result_pes.to_der()
store_upp(upp)
"""
class ParamAndSrc:
'tie a ConfigurableParameter to a source of actual values'
def __init__(self, param:ConfigurableParameter, src:param_source.ParamSource):
self.param = param
self.src = src
def __init__(self,
n:int,
src_pes:ProfileElementSequence,
params:list[ParamAndSrc]=None,
csv_rows:Generator=None,
):
"""
n: number of eSIM profiles to generate.
src_pes: a decoded eSIM profile as ProfileElementSequence, to serve as template. This is not modified, only
copied.
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
profile values.
csv_rows: A list or generator producing all CSV rows one at a time, starting with a row containing the column
headers. This is compatible with the python csv.reader. Each row gets passed to
ParamSource.get_next(), such that ParamSource implementations can access the row items.
See param_source.CsvSource.
"""
self.n = n
self.params = params or []
self.src_pes = src_pes
self.csv_rows = csv_rows
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src))
def generate_profiles(self):
# get first row of CSV: column names
csv_columns = None
if self.csv_rows:
try:
csv_columns = next(self.csv_rows)
except StopIteration as e:
raise ValueError('the input CSV file appears to be empty') from e
for i in range(self.n):
csv_row = None
if self.csv_rows and csv_columns:
try:
csv_row_list = next(self.csv_rows)
except StopIteration as e:
raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e
csv_row = dict(zip(csv_columns, csv_row_list))
pes = copy.deepcopy(self.src_pes)
for p in self.params:
try:
input_value = p.src.get_next(csv_row=csv_row)
assert input_value is not None
value = p.param.__class__.validate_val(input_value)
p.param.__class__.apply_val(pes, value)
except (
TypeError,
ValueError,
KeyError,
) as e:
raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}'
f' (input_value={p.param.input_value!r} value={p.param.value!r})') from e
yield pes