diff --git a/pySim/esim/saip/param_source.py b/pySim/esim/saip/param_source.py new file mode 100644 index 00000000..7e4d9d1d --- /dev/null +++ b/pySim/esim/saip/param_source.py @@ -0,0 +1,181 @@ +# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization. +# +# (C) 2025 by sysmocom - s.f.m.c. GmbH +# +# Author: nhofmeyr@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import random +from pySim.utils import all_subclasses_of + +class ParamSourceExn(Exception): + pass + +class ParamSourceExhaustedExn(ParamSourceExn): + pass + +class ParamSourceUndefinedExn(ParamSourceExn): + pass + +class ParamSource: + 'abstract parameter source. For usage, see personalization.BatchPersonalization.' + + # This name should be short but descriptive, useful for a user interface, like 'random decimal digits'. + name = 'none' + + @classmethod + def get_all_implementations(cls, blacklist=None): + "return all subclasses of ParamSource that have is_abstract = False." + # return a set() so that multiple inheritance does not return dups + return set(c + for c in all_subclasses_of(cls) + if ((not blacklist) or (c not in blacklist)) + ) + + @classmethod + def from_str(cls, s:str): + '''Subclasses implement this: + if a parameter source defines some string input magic, override this function. + For example, a RandomDigitSource derives the number of digits from the string length, + so the user can enter '0000' to get a four digit random number.''' + return cls(s) + + def get_next(self, csv_row:dict=None): + '''Subclasses implement this: return the next value from the parameter source. + When there are no more values from the source, raise a ParamSourceExhaustedExn.''' + raise ParamSourceExhaustedExn() + + +class ConstantSource(ParamSource): + 'one value for all' + name = 'constant' + + def __init__(self, val:str): + self.val = val + + def get_next(self, csv_row:dict=None): + return self.val + +class RandomDigitSource(ParamSource): + 'return a different sequence of random decimal digits each' + name = 'random decimal digits' + + def __init__(self, num_digits, first_value, last_value): + """ + See also from_str(). + + All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine. + num_digits: number of random digits (possibly with leading zeros) to generate. + first_value, last_value: the decimal range in which to provide random digits. + """ + num_digits = int(num_digits) + first_value = int(first_value) + last_value = int(last_value) + assert num_digits > 0 + assert first_value <= last_value + self.num_digits = num_digits + self.val_first_last = (first_value, last_value) + + def get_next(self, csv_row:dict=None): + val = random.randint(*self.val_first_last) # TODO secure random source? + return self.val_to_digit(val) + + def val_to_digit(self, val:int): + return '%0*d' % (self.num_digits, val) # pylint: disable=consider-using-f-string + + @classmethod + def from_str(cls, s:str): + if '..' in s: + first_str, last_str = s.split('..') + first_str = first_str.strip() + last_str = last_str.strip() + else: + first_str = s.strip() + last_str = None + + first_value = int(first_str) + last_value = int(last_str) if last_str is not None else '9' * len(first_str) + return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value) + +class RandomHexDigitSource(ParamSource): + 'return a different sequence of random hexadecimal digits each' + name = 'random hexadecimal digits' + + def __init__(self, num_digits): + 'see from_str()' + num_digits = int(num_digits) + if num_digits < 1: + raise ValueError('zero number of digits') + # hex digits always come in two + if (num_digits & 1) != 0: + raise ValueError(f'hexadecimal value should have even number of digits, not {num_digits}') + self.num_digits = num_digits + + def get_next(self, csv_row:dict=None): + val = random.randbytes(self.num_digits // 2) # TODO secure random source? + return val + + @classmethod + def from_str(cls, s:str): + return cls(num_digits=len(s.strip())) + +class IncDigitSource(RandomDigitSource): + 'incrementing sequence of digits' + name = 'incrementing decimal digits' + + def __init__(self, *args, **kwargs): + "The arguments defining the number of digits and value range are identical to RandomDigitSource.__init__()." + super().__init__(*args, **kwargs) + self.next_val = None + self.reset() + + def reset(self): + "Restart from the first value of the defined range passed to __init__()." + self.next_val = self.val_first_last[0] + + def get_next(self, csv_row:dict=None): + val = self.next_val + if val is None: + raise ParamSourceExhaustedExn() + + returnval = self.val_to_digit(val) + + val += 1 + if val > self.val_first_last[1]: + self.next_val = None + else: + self.next_val = val + + return returnval + +class CsvSource(ParamSource): + 'apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)' + name = 'from CSV' + + def __init__(self, csv_column): + """ + csv_column: column name indicating the column to use for this parameter. + This name is used in get_next(): the caller passes the current CSV row to get_next(), from which + CsvSource picks the column with the name matching csv_column. + """ + self.csv_column = csv_column + + def get_next(self, csv_row:dict=None): + val = None + if csv_row: + val = csv_row.get(self.csv_column) + if not val: + raise ParamSourceUndefinedExn(f'no value for CSV column {self.csv_column!r}') + return val diff --git a/pySim/esim/saip/personalization.py b/pySim/esim/saip/personalization.py index 8c2644ba..74089638 100644 --- a/pySim/esim/saip/personalization.py +++ b/pySim/esim/saip/personalization.py @@ -17,11 +17,13 @@ import abc import io -from typing import List, Tuple +import copy +from typing import List, Tuple, Generator from osmocom.tlv import camel_to_snake from pySim.utils import enc_iccid, enc_imsi, h2b, rpad, sanitize_iccid, all_subclasses_of from pySim.esim.saip import ProfileElement, ProfileElementSequence +from pySim.esim.saip import param_source def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]: """In a list of tuples, remove all tuples whose first part equals 'unwanted_key'.""" @@ -607,3 +609,101 @@ class K(BinaryParam, AlgoConfig): class Opc(K): name = 'OPc' algo_config_key = 'opc' + + +class BatchPersonalization: + """Produce a series of eSIM profiles from predefined parameters. + Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource. + + Usage example: + + der_input = some_file.open('rb').read() + pes = ProfileElementSequence.from_der(der_input) + p = pers.BatchPersonalization( + n=10, + src_pes=pes, + csv_rows=get_csv_reader()) + + p.add_param_and_src( + personalization.Iccid(), + param_source.IncDigitSource( + num_digits=18, + first_value=123456789012340001, + last_value=123456789012340010)) + + # add more parameters here, using ConfigurableParameter and ParamSource subclass instances to define the profile + # ... + + # generate all 10 profiles (from n=10 above) + for result_pes in p.generate_profiles(): + upp = result_pes.to_der() + store_upp(upp) + """ + + class ParamAndSrc: + 'tie a ConfigurableParameter to a source of actual values' + def __init__(self, param:ConfigurableParameter, src:param_source.ParamSource): + self.param = param + self.src = src + + def __init__(self, + n:int, + src_pes:ProfileElementSequence, + params:list[ParamAndSrc]=None, + csv_rows:Generator=None, + ): + """ + n: number of eSIM profiles to generate. + src_pes: a decoded eSIM profile as ProfileElementSequence, to serve as template. This is not modified, only + copied. + params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in + profile values. + csv_rows: A list or generator producing all CSV rows one at a time, starting with a row containing the column + headers. This is compatible with the python csv.reader. Each row gets passed to + ParamSource.get_next(), such that ParamSource implementations can access the row items. + See param_source.CsvSource. + """ + self.n = n + self.params = params or [] + self.src_pes = src_pes + self.csv_rows = csv_rows + + def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource): + self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src)) + + def generate_profiles(self): + # get first row of CSV: column names + csv_columns = None + if self.csv_rows: + try: + csv_columns = next(self.csv_rows) + except StopIteration as e: + raise ValueError('the input CSV file appears to be empty') from e + + for i in range(self.n): + csv_row = None + if self.csv_rows and csv_columns: + try: + csv_row_list = next(self.csv_rows) + except StopIteration as e: + raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e + + csv_row = dict(zip(csv_columns, csv_row_list)) + + pes = copy.deepcopy(self.src_pes) + + for p in self.params: + try: + input_value = p.src.get_next(csv_row=csv_row) + assert input_value is not None + value = p.param.__class__.validate_val(input_value) + p.param.__class__.apply_val(pes, value) + except ( + TypeError, + ValueError, + KeyError, + ) as e: + raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}' + f' (input_value={p.param.input_value!r} value={p.param.value!r})') from e + + yield pes