diff --git a/pySim/esim/saip/batch.py b/pySim/esim/saip/batch.py new file mode 100644 index 00000000..2af05790 --- /dev/null +++ b/pySim/esim/saip/batch.py @@ -0,0 +1,117 @@ +"""Implementation of Personalization of eSIM profiles in SimAlliance/TCA Interoperable Profile: + Run a batch of N personalizations""" + +# (C) 2025-2026 by sysmocom - s.f.m.c. GmbH +# +# Author: nhofmeyr@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import copy +from typing import Generator +from pySim.esim.saip.personalization import ConfigurableParameter +from pySim.esim.saip import param_source +from pySim.esim.saip import ProfileElementSequence, ProfileElementSD + +class BatchPersonalization: + """Produce a series of eSIM profiles from predefined parameters. + Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource. + + Usage example: + + der_input = some_file.open('rb').read() + pes = ProfileElementSequence.from_der(der_input) + p = pers.BatchPersonalization( + n=10, + src_pes=pes, + csv_rows=get_csv_reader()) + + p.add_param_and_src( + personalization.Iccid(), + param_source.IncDigitSource( + num_digits=18, + first_value=123456789012340001, + last_value=123456789012340010)) + + # add more parameters here, using ConfigurableParameter and ParamSource subclass instances to define the profile + # ... + + # generate all 10 profiles (from n=10 above) + for result_pes in p.generate_profiles(): + upp = result_pes.to_der() + store_upp(upp) + """ + + class ParamAndSrc: + 'tie a ConfigurableParameter to a source of actual values' + def __init__(self, param: ConfigurableParameter, src: param_source.ParamSource): + self.param = param + self.src = src + + def __init__(self, + n: int, + src_pes: ProfileElementSequence, + params: list[ParamAndSrc]=None, + csv_rows: Generator=None, + ): + """ + n: number of eSIM profiles to generate. + src_pes: a decoded eSIM profile as ProfileElementSequence, to serve as template. This is not modified, only + copied. + params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in + profile values. + csv_rows: A list or generator producing all CSV rows one at a time, starting with a row containing the column + headers. This is compatible with the python csv.reader. Each row gets passed to + ParamSource.get_next(), such that ParamSource implementations can access the row items. + See param_source.CsvSource. + """ + self.n = n + self.params = params or [] + self.src_pes = src_pes + self.csv_rows = csv_rows + + def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource): + self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src)) + + def generate_profiles(self): + # get first row of CSV: column names + csv_columns = None + if self.csv_rows: + try: + csv_columns = next(self.csv_rows) + except StopIteration as e: + raise ValueError('the input CSV file appears to be empty') from e + + for i in range(self.n): + csv_row = None + if self.csv_rows and csv_columns: + try: + csv_row_list = next(self.csv_rows) + except StopIteration as e: + raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e + + csv_row = dict(zip(csv_columns, csv_row_list)) + + pes = copy.deepcopy(self.src_pes) + + for p in self.params: + try: + input_value = p.src.get_next(csv_row=csv_row) + assert input_value is not None + value = p.param.__class__.validate_val(input_value) + p.param.__class__.apply_val(pes, value) + except Exception as e: + raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}') from e + + yield pes diff --git a/pySim/esim/saip/param_source.py b/pySim/esim/saip/param_source.py new file mode 100644 index 00000000..77f2e1cc --- /dev/null +++ b/pySim/esim/saip/param_source.py @@ -0,0 +1,174 @@ +# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization. +# +# (C) 2025 by sysmocom - s.f.m.c. GmbH +# +# Author: nhofmeyr@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import random + +class ParamSourceExn(Exception): + pass + +class ParamSourceExhaustedExn(ParamSourceExn): + pass + +class ParamSourceUndefinedExn(ParamSourceExn): + pass + +class ParamSource: + """abstract parameter source. For usage, see personalization.BatchPersonalization.""" + + # This name should be short but descriptive, useful for a user interface, like 'random decimal digits'. + name = "none" + + @classmethod + def from_str(cls, s:str): + """Subclasses implement this: + if a parameter source defines some string input magic, override this function. + For example, a RandomDigitSource derives the number of digits from the string length, + so the user can enter '0000' to get a four digit random number.""" + return cls(s) + + def get_next(self, csv_row:dict=None): + """Subclasses implement this: return the next value from the parameter source. + When there are no more values from the source, raise a ParamSourceExhaustedExn. + This default implementation is an empty source.""" + raise ParamSourceExhaustedExn() + + +class ConstantSource(ParamSource): + """one value for all""" + name = "constant" + + def __init__(self, val:str): + self.val = val + + def get_next(self, csv_row:dict=None): + return self.val + +class DecimalRangeSource(ParamSource): + """abstract: decimal numbers with a value range""" + + def __init__(self, num_digits, first_value, last_value): + """ + See also from_str(). + + All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine. + num_digits: fixed number of digits (possibly with leading zeros) to generate. + first_value, last_value: the decimal range in which to provide digits. + """ + num_digits = int(num_digits) + first_value = int(first_value) + last_value = int(last_value) + assert num_digits > 0 + assert first_value <= last_value + self.num_digits = num_digits + self.val_first_last = (first_value, last_value) + + def val_to_digit(self, val:int): + return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string + + @classmethod + def from_str(cls, s:str): + if ".." in s: + first_str, last_str = s.split("..") + first_str = first_str.strip() + last_str = last_str.strip() + else: + first_str = s.strip() + last_str = None + + first_value = int(first_str) + last_value = int(last_str) if last_str is not None else "9" * len(first_str) + return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value) + +class RandomDigitSource(DecimalRangeSource): + """return a different sequence of random decimal digits each""" + name = "random decimal digits" + + def get_next(self, csv_row:dict=None): + val = random.randint(*self.val_first_last) # TODO secure random source? + return self.val_to_digit(val) + +class RandomHexDigitSource(ParamSource): + """return a different sequence of random hexadecimal digits each""" + name = "random hexadecimal digits" + + def __init__(self, num_digits): + """see from_str()""" + num_digits = int(num_digits) + if num_digits < 1: + raise ValueError("zero number of digits") + # hex digits always come in two + if (num_digits & 1) != 0: + raise ValueError(f"hexadecimal value should have even number of digits, not {num_digits}") + self.num_digits = num_digits + + def get_next(self, csv_row:dict=None): + val = random.randbytes(self.num_digits // 2) # TODO secure random source? + return val + + @classmethod + def from_str(cls, s:str): + return cls(num_digits=len(s.strip())) + +class IncDigitSource(DecimalRangeSource): + """incrementing sequence of digits""" + name = "incrementing decimal digits" + + def __init__(self, num_digits, first_value, last_value): + super().__init__(num_digits, first_value, last_value) + self.next_val = None + self.reset() + + def reset(self): + """Restart from the first value of the defined range passed to __init__().""" + self.next_val = self.val_first_last[0] + + def get_next(self, csv_row:dict=None): + val = self.next_val + if val is None: + raise ParamSourceExhaustedExn() + + returnval = self.val_to_digit(val) + + val += 1 + if val > self.val_first_last[1]: + self.next_val = None + else: + self.next_val = val + + return returnval + +class CsvSource(ParamSource): + """apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)""" + name = "from CSV" + + def __init__(self, csv_column): + """ + csv_column: column name indicating the column to use for this parameter. + This name is used in get_next(): the caller passes the current CSV row to get_next(), from which + CsvSource picks the column with the name matching csv_column. + """ + self.csv_column = csv_column + + def get_next(self, csv_row:dict=None): + val = None + if csv_row: + val = csv_row.get(self.csv_column) + if not val: + raise ParamSourceUndefinedExn(f"no value for CSV column {self.csv_column!r}") + return val