Compare commits
5 Commits
neels/saip
...
pmaier/pgs
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a3469bc03b | ||
|
|
c118012fb9 | ||
|
|
45bffb53f9 | ||
|
|
cc15b2b4c3 | ||
|
|
11dfad88e6 |
@@ -100,6 +100,7 @@ Please install the following dependencies:
|
|||||||
- pyyaml >= 5.1
|
- pyyaml >= 5.1
|
||||||
- smpp.pdu (from `github.com/hologram-io/smpp.pdu`)
|
- smpp.pdu (from `github.com/hologram-io/smpp.pdu`)
|
||||||
- termcolor
|
- termcolor
|
||||||
|
- psycopg2-binary
|
||||||
|
|
||||||
Example for Debian:
|
Example for Debian:
|
||||||
```sh
|
```sh
|
||||||
|
|||||||
286
contrib/csv-to-pgsql.py
Executable file
286
contrib/csv-to-pgsql.py
Executable file
@@ -0,0 +1,286 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import csv
|
||||||
|
import sys
|
||||||
|
import yaml
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2.sql import Identifier, SQL
|
||||||
|
from pathlib import Path
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
from packaging import version
|
||||||
|
|
||||||
|
log = PySimLogger.get("CSV2PGQSL")
|
||||||
|
|
||||||
|
class CardKeyDatabase:
|
||||||
|
def __init__(self, config_filename: str, table_name: str, create_table: bool = False, admin: bool = False):
|
||||||
|
"""
|
||||||
|
Initialize database connection and set the table which shall be used as storage for the card key data.
|
||||||
|
In case the specified table does not exist yet it can be created using the create_table_type parameter.
|
||||||
|
|
||||||
|
New tables are always minimal tables which follow a pre-defined table scheme. The user may extend the table
|
||||||
|
with additional columns using the add_cols() later.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tablename : name of the database table to create.
|
||||||
|
create_table_type : type of the table to create ('UICC' or 'EUICC')
|
||||||
|
"""
|
||||||
|
|
||||||
|
def user_from_config_file(config, role: str) -> tuple[str, str]:
|
||||||
|
db_users = config.get('db_users')
|
||||||
|
user = db_users.get(role)
|
||||||
|
if user is None:
|
||||||
|
raise ValueError("user for role '%s' not set up in config file." % role)
|
||||||
|
return user.get('name'), user.get('pass')
|
||||||
|
|
||||||
|
log = PySimLogger.get("PQSQL")
|
||||||
|
self.table = table_name
|
||||||
|
self.cols = None
|
||||||
|
|
||||||
|
# Depending on the table type, the table name must contain either the substring "uicc_keys" or "euicc_keys".
|
||||||
|
# This convention will allow us to deduct the table type from the table name.
|
||||||
|
if "euicc_keys" not in table_name and "uicc_keys" not in table_name:
|
||||||
|
raise ValueError("Table name (%s) should contain the substring \"uicc_keys\" or \"euicc_keys\"" % table_name)
|
||||||
|
|
||||||
|
# Read config file
|
||||||
|
log.info("Using config file: %s", config_filename)
|
||||||
|
with open(config_filename, "r") as cfg:
|
||||||
|
config = yaml.load(cfg, Loader=yaml.FullLoader)
|
||||||
|
host = config.get('host')
|
||||||
|
log.info("Database host: %s", host)
|
||||||
|
db_name = config.get('db_name')
|
||||||
|
log.info("Database name: %s", db_name)
|
||||||
|
table_names = config.get('table_names')
|
||||||
|
username_admin, password_admin = user_from_config_file(config, 'admin')
|
||||||
|
username_importer, password_importer = user_from_config_file(config, 'importer')
|
||||||
|
username_reader, _ = user_from_config_file(config, 'reader')
|
||||||
|
|
||||||
|
# Switch between admin and importer user
|
||||||
|
if admin:
|
||||||
|
username, password = username_admin, password_admin
|
||||||
|
else:
|
||||||
|
username, password = username_importer, password_importer
|
||||||
|
|
||||||
|
# Create database connection
|
||||||
|
log.info("Database user: %s", username)
|
||||||
|
self.conn = psycopg2.connect(dbname=db_name, user=username, password=password, host=host)
|
||||||
|
self.cur = self.conn.cursor()
|
||||||
|
|
||||||
|
# In the context of this tool it is not relevant if the table name is present in the config file. However,
|
||||||
|
# pySim-shell.py will require the table name to be configured properly to access the database table.
|
||||||
|
if self.table not in table_names:
|
||||||
|
log.warning("Specified table name (%s) is not yet present in config file (required for access from pySim-shell.py)",
|
||||||
|
self.table)
|
||||||
|
|
||||||
|
# Create a new minimal database table of the specified table type.
|
||||||
|
if create_table:
|
||||||
|
if not admin:
|
||||||
|
raise ValueError("creation of new table refused, use option --admin and try again.")
|
||||||
|
if "euicc_keys" in self.table:
|
||||||
|
self.__create_table(username_reader, username_importer, ['EID'])
|
||||||
|
elif "uicc_keys" in self.table:
|
||||||
|
self.__create_table(username_reader, username_importer, ['ICCID', 'IMSI'])
|
||||||
|
|
||||||
|
# Ensure a table with the specified name exists
|
||||||
|
log.info("Database table: %s", self.table)
|
||||||
|
if self.get_cols() == []:
|
||||||
|
raise ValueError("Table name (%s) does not exist yet" % self.table)
|
||||||
|
log.info("Database table columns: %s", str(self.get_cols()))
|
||||||
|
|
||||||
|
def __create_table(self, user_reader:str, user_importer:str, cols:list[str]):
|
||||||
|
"""
|
||||||
|
Initialize a new table. New tables are always minimal tables with one primary key and additional index columns.
|
||||||
|
Non index-columns may be added later using method _update_cols().
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create table columns with primary key
|
||||||
|
query = SQL("CREATE TABLE {} ({} VARCHAR PRIMARY KEY").format(Identifier(self.table.lower()),
|
||||||
|
Identifier(cols[0].lower()))
|
||||||
|
for c in cols[1:]:
|
||||||
|
query += SQL(", {} VARCHAR").format(Identifier(c.lower()))
|
||||||
|
query += SQL(");")
|
||||||
|
self.cur.execute(query)
|
||||||
|
|
||||||
|
# Create indexes for all other columns
|
||||||
|
for c in cols[1:]:
|
||||||
|
self.cur.execute(query = SQL("CREATE INDEX {} ON {}({});").format(Identifier(c.lower()),
|
||||||
|
Identifier(self.table.lower()),
|
||||||
|
Identifier(c.lower())))
|
||||||
|
|
||||||
|
# Set permissions
|
||||||
|
self.cur.execute(SQL("GRANT INSERT ON {} TO {};").format(Identifier(self.table.lower()),
|
||||||
|
Identifier(user_importer)))
|
||||||
|
self.cur.execute(SQL("GRANT SELECT ON {} TO {};").format(Identifier(self.table.lower()),
|
||||||
|
Identifier(user_reader)))
|
||||||
|
|
||||||
|
log.info("New database table created: %s", str(self.table.lower()))
|
||||||
|
|
||||||
|
def get_cols(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Get a list of all columns available in the current table scheme.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list with column names (in uppercase) of the database table
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Return cached col list if present
|
||||||
|
if self.cols:
|
||||||
|
return self.cols
|
||||||
|
|
||||||
|
# Request a list of current cols from the database
|
||||||
|
self.cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (self.table.lower(),))
|
||||||
|
|
||||||
|
cols_result = self.cur.fetchall()
|
||||||
|
cols = []
|
||||||
|
for c in cols_result:
|
||||||
|
cols.append(c[0].upper())
|
||||||
|
self.cols = cols
|
||||||
|
return cols
|
||||||
|
|
||||||
|
def get_missing_cols(self, cols_expected:list[str]) -> list[str]:
|
||||||
|
"""
|
||||||
|
Check if the current table scheme lacks any of the given expected columns.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list with the missing columns.
|
||||||
|
"""
|
||||||
|
|
||||||
|
cols_present = self.get_cols()
|
||||||
|
return list(set(cols_expected) - set(cols_present))
|
||||||
|
|
||||||
|
def add_cols(self, cols:list[str]):
|
||||||
|
"""
|
||||||
|
Update the current table scheme with additional columns. In case the updated columns are already exist, the
|
||||||
|
table schema is not changed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table : name of the database table to alter
|
||||||
|
cols : list with updated colum names to add
|
||||||
|
"""
|
||||||
|
|
||||||
|
cols_missing = self.get_missing_cols(cols)
|
||||||
|
|
||||||
|
# Depending on the table type (see constructor), we either have a primary key 'ICCID' (for UICC data), or 'EID'
|
||||||
|
# (for eUICC data). Both table formats different types of data and have rather differen columns also. Let's
|
||||||
|
# prevent the excidentally mixing of both types.
|
||||||
|
if 'ICCID' in cols_missing:
|
||||||
|
raise ValueError("Table %s stores eUCCC key material, refusing to add UICC specific column 'ICCID'" % self.table)
|
||||||
|
if 'EID' in cols_missing:
|
||||||
|
raise ValueError("Table %s stores UCCC key material, refusing to add eUICC specific column 'EID'" % self.table)
|
||||||
|
|
||||||
|
# Add the missing columns to the table
|
||||||
|
self.cols = None
|
||||||
|
for c in cols_missing:
|
||||||
|
self.cur.execute(query = SQL("ALTER TABLE {} ADD {} VARCHAR;").format(Identifier(self.table.lower()),
|
||||||
|
Identifier(c.lower())))
|
||||||
|
|
||||||
|
def insert_row(self, row:dict[str, str]):
|
||||||
|
"""
|
||||||
|
Insert a new row into the database table.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
row : dictionary with the colum names and their designated values
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Check if the row is compatible with the current table scheme
|
||||||
|
cols_expected = list(row.keys())
|
||||||
|
cols_missing = self.get_missing_cols(cols_expected)
|
||||||
|
if cols_missing != []:
|
||||||
|
raise ValueError("table %s has incompatible format, the row %s contains unknown cols %s" %
|
||||||
|
(self.table, str(row), str(cols_missing)))
|
||||||
|
|
||||||
|
# Insert row into datbase table
|
||||||
|
row_keys = list(row.keys())
|
||||||
|
row_values = list(row.values())
|
||||||
|
query = SQL("INSERT INTO {} ").format(Identifier(self.table.lower()))
|
||||||
|
query += SQL("({} ").format(Identifier(row_keys[0].lower()))
|
||||||
|
for k in row_keys[1:]:
|
||||||
|
query += SQL(", {}").format(Identifier(k.lower()))
|
||||||
|
query += SQL(") VALUES (%s")
|
||||||
|
for v in row_values[1:]:
|
||||||
|
query += SQL(", %s")
|
||||||
|
query += SQL(");")
|
||||||
|
self.cur.execute(query, row_values)
|
||||||
|
|
||||||
|
def commit(self):
|
||||||
|
self.conn.commit()
|
||||||
|
log.info("Changes to table %s committed!", self.table)
|
||||||
|
|
||||||
|
def open_csv(opts: argparse.Namespace):
|
||||||
|
log.info("CSV file: %s", opts.csv)
|
||||||
|
csv_file = open(opts.csv, 'r')
|
||||||
|
cr = csv.DictReader(csv_file)
|
||||||
|
if not cr:
|
||||||
|
raise RuntimeError("could not open DictReader for CSV-File '%s'" % opts.csv)
|
||||||
|
cr.fieldnames = [field.upper() for field in cr.fieldnames]
|
||||||
|
log.info("CSV file columns: %s", str(cr.fieldnames))
|
||||||
|
return cr
|
||||||
|
|
||||||
|
def open_db(cr: csv.DictReader, opts: argparse.Namespace) -> CardKeyDatabase:
|
||||||
|
try:
|
||||||
|
db = CardKeyDatabase(opts.pqsql, opts.table_name, opts.create_table, opts.admin)
|
||||||
|
|
||||||
|
# Check CSV format against table schema, add missing columns
|
||||||
|
cols_missing = db.get_missing_cols(cr.fieldnames)
|
||||||
|
if cols_missing != [] and (opts.update_columns or opts.create_table):
|
||||||
|
log.info("Adding missing columns: %s", str(cols_missing))
|
||||||
|
db.add_cols(cols_missing)
|
||||||
|
cols_missing = db.get_missing_cols(cr.fieldnames)
|
||||||
|
|
||||||
|
# Make sure the table schema has no missing columns
|
||||||
|
if cols_missing != []:
|
||||||
|
log.error("Database table lacks CSV file columns: %s -- import aborted!", cols_missing)
|
||||||
|
sys.exit(2)
|
||||||
|
except Exception as e:
|
||||||
|
log.error(str(e).strip())
|
||||||
|
log.error("Database initialization aborted due to error!")
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
return db
|
||||||
|
|
||||||
|
def import_from_csv(db: CardKeyDatabase, cr: csv.DictReader):
|
||||||
|
count = 0
|
||||||
|
for row in cr:
|
||||||
|
try:
|
||||||
|
db.insert_row(row)
|
||||||
|
count+=1
|
||||||
|
if count % 100 == 0:
|
||||||
|
log.info("CSV file import in progress, %d rows imported...", count)
|
||||||
|
except Exception as e:
|
||||||
|
log.error(str(e).strip())
|
||||||
|
log.error("CSV file import aborted due to error, no datasets committed!")
|
||||||
|
sys.exit(2)
|
||||||
|
log.info("CSV file import done, %d rows imported", count)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider',
|
||||||
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
|
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
|
||||||
|
option_parser.add_argument('--pqsql', metavar='FILE',
|
||||||
|
default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
|
||||||
|
help='Read card data from PostgreSQL database (config file)')
|
||||||
|
option_parser.add_argument('--csv', metavar='FILE', help='input CSV file with card data', required=True)
|
||||||
|
option_parser.add_argument("--table-name", help="name of the card key table", type=str, required=True)
|
||||||
|
option_parser.add_argument("--update-columns", help="add missing table columns", action='store_true', default=False)
|
||||||
|
option_parser.add_argument("--create-table", action='store_true', help="create new card key table", default=False)
|
||||||
|
option_parser.add_argument("--admin", action='store_true', help="perform action as admin", default=False)
|
||||||
|
opts = option_parser.parse_args()
|
||||||
|
|
||||||
|
PySimLogger.setup(print, {logging.WARN: "\033[33m"})
|
||||||
|
if (opts.verbose):
|
||||||
|
PySimLogger.set_verbose(True)
|
||||||
|
PySimLogger.set_level(logging.DEBUG)
|
||||||
|
|
||||||
|
# Open CSV file
|
||||||
|
cr = open_csv(opts)
|
||||||
|
|
||||||
|
# Open database, create initial table, update column scheme
|
||||||
|
db = open_db(cr, opts)
|
||||||
|
|
||||||
|
# Progress with import
|
||||||
|
if not opts.admin:
|
||||||
|
import_from_csv(db, cr)
|
||||||
|
|
||||||
|
# Commit changes to the database
|
||||||
|
db.commit()
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
Retrieving card-individual keys via CardKeyProvider
|
Retrieving card-individual keys via CardKeyProvider
|
||||||
===================================================
|
===================================================
|
||||||
|
|
||||||
When working with a batch of cards, or more than one card in general, it
|
When working with a batch of cards, or more than one card in general, it
|
||||||
@@ -20,9 +20,11 @@ example develop your own CardKeyProvider that queries some kind of
|
|||||||
database for the key material, or that uses a key derivation function to
|
database for the key material, or that uses a key derivation function to
|
||||||
derive card-specific key material from a global master key.
|
derive card-specific key material from a global master key.
|
||||||
|
|
||||||
The only actual CardKeyProvider implementation included in pySim is the
|
pySim already includes two CardKeyProvider implementations. One to retrieve
|
||||||
`CardKeyProviderCsv` which retrieves the key material from a
|
key material from a CSV file (`CardKeyProviderCsv`) and a second one that allows
|
||||||
[potentially encrypted] CSV file.
|
to retrieve the key material from a PostgreSQL database (`CardKeyProviderPgsql`).
|
||||||
|
Both implementations equally implement a column encryption scheme that allows
|
||||||
|
to protect sensitive columns using a *transport key*
|
||||||
|
|
||||||
|
|
||||||
The CardKeyProviderCsv
|
The CardKeyProviderCsv
|
||||||
@@ -40,11 +42,215 @@ of pySim-shell. If you do not specify a CSV file, pySim will attempt to
|
|||||||
open a CSV file from the default location at
|
open a CSV file from the default location at
|
||||||
`~/.osmocom/pysim/card_data.csv`, and use that, if it exists.
|
`~/.osmocom/pysim/card_data.csv`, and use that, if it exists.
|
||||||
|
|
||||||
|
The `CardKeyProviderCsv` is suitable to manage small amounts of key material
|
||||||
|
locally. However, if your card inventory is very large and the key material
|
||||||
|
must be made available on multiple sites, the `CardKeyProviderPgsql` is the
|
||||||
|
better option.
|
||||||
|
|
||||||
|
|
||||||
|
The CardKeyProviderPqsql
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
With the `CardKeyProviderPsql` you can use a PostgreSQL database as storage
|
||||||
|
medium. The implementation comes with a CSV importer tool that consumes the
|
||||||
|
same CSV files you would normally use with the `CardKeyProviderCsv`, so you
|
||||||
|
can just use your existing CSV files and import them into the database.
|
||||||
|
|
||||||
|
|
||||||
|
Setting up the database
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
From the perspective of the database, the `CardKeyProviderPsql` has only
|
||||||
|
minimal requirements. You do not have to create any tables in advance. An empty
|
||||||
|
database and at least one user that may create, alter and insert into tables is
|
||||||
|
sufficient. However, for increased reliability and as a protection against
|
||||||
|
incorrect operation, the `CardKeyProviderPsql` supports a hierarchical model
|
||||||
|
with three users (or roles):
|
||||||
|
|
||||||
|
* **admin**:
|
||||||
|
This should be the owner of the database. It is intended to be used for
|
||||||
|
administrative tasks like adding new tables or adding new columns to existing
|
||||||
|
tables. This user should not be used to insert new data into tables or to access
|
||||||
|
data from within pySim-shell using the `CardKeyProviderPsql`
|
||||||
|
|
||||||
|
* **importer**:
|
||||||
|
This user is used when feeding new data into an existing table. It should only
|
||||||
|
be able to insert new rows into existing tables. It should not be used for
|
||||||
|
administrative tasks or to access data from within pySim-shell using the
|
||||||
|
`CardKeyProviderPsql`
|
||||||
|
|
||||||
|
* **reader**:
|
||||||
|
To access data from within pySim shell using the `CardKeyProviderPsql` the
|
||||||
|
reader user is the correct one to use. This user should have no write access
|
||||||
|
to the database or any of the tables.
|
||||||
|
|
||||||
|
|
||||||
|
Creating a config file
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
The default location for the config file is `~/.osmocom/pysim/card_data_pqsql.cfg`
|
||||||
|
The file uses `yaml` syntax and should look like the example below:
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
host: "127.0.0.1"
|
||||||
|
db_name: "my_database"
|
||||||
|
table_names:
|
||||||
|
- "uicc_keys"
|
||||||
|
- "euicc_keys"
|
||||||
|
db_users:
|
||||||
|
admin:
|
||||||
|
name: "my_admin_user"
|
||||||
|
pass: "my_admin_password"
|
||||||
|
importer:
|
||||||
|
name: "my_importer_user"
|
||||||
|
pass: "my_importer_password"
|
||||||
|
reader:
|
||||||
|
name: "my_reader_user"
|
||||||
|
pass: "my_reader_password"
|
||||||
|
|
||||||
|
This file is used by pySim-shell and by the importer tool. Both expect the file
|
||||||
|
in the aforementioned location. In case you want to store the file in a
|
||||||
|
different location you may use the `--pgsql` commandline option to provide a
|
||||||
|
custom config file path.
|
||||||
|
|
||||||
|
The hostname and the database name for the PostgreSQL database is set with the
|
||||||
|
`host` and `db_name` fields. The field `db_users` sets the user names and
|
||||||
|
passwords for each of the aforementioned users (or roles). In case only a single
|
||||||
|
admin user is used, all three entries may be populated with the same user name
|
||||||
|
and password (not recommended)
|
||||||
|
|
||||||
|
The field `table_names` sets the tables that the `CardKeyProviderPsql` shall
|
||||||
|
use to query to locate card key data. You can set up as many tables as you
|
||||||
|
want, `CardKeyProviderPsql` will query them in order, one by one until a
|
||||||
|
matching entry is found.
|
||||||
|
|
||||||
|
NOTE: In case you do not want to disclose the admin and the importer credentials
|
||||||
|
to pySim-shell you may remove those lines. pySim-shell will only require the
|
||||||
|
`reader` entry under `db_users`.
|
||||||
|
|
||||||
|
|
||||||
|
Using the Importer
|
||||||
|
^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Before data can be imported, you must first create a database table. Tables
|
||||||
|
are created with the provided importer tool, which can be found under
|
||||||
|
`contrib/csv-to-pgsql.py`. This tool is used to create the database table and
|
||||||
|
read the data from the provided CSV file into the database.
|
||||||
|
|
||||||
|
As mentioned before, all CSV file formats that work with `CardKeyProviderCsv`
|
||||||
|
may be used. To demonstrate how the import process works, let's assume you want
|
||||||
|
to import a CSV file format that looks like the following example. Let's also
|
||||||
|
assume that you didn't get the Global Platform keys from your card vendor for
|
||||||
|
this batch of UICC cards, so your CSV file lacks the columns for those fields.
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
"id","imsi","iccid","acc","pin1","puk1","pin2","puk2","ki","opc","adm1"
|
||||||
|
"card1","999700000000001","8900000000000000001","0001","1111","11111111","0101","01010101","11111111111111111111111111111111","11111111111111111111111111111111","11111111"
|
||||||
|
"card2","999700000000002","8900000000000000002","0002","2222","22222222","0202","02020202","22222222222222222222222222222222","22222222222222222222222222222222","22222222"
|
||||||
|
"card3","999700000000003","8900000000000000003","0003","3333","22222222","0303","03030303","33333333333333333333333333333333","33333333333333333333333333333333","33333333"
|
||||||
|
|
||||||
|
Since this is your first import, the database still lacks the table. To
|
||||||
|
instruct the importer to create a new table, you may use the `--create-table`
|
||||||
|
option. You also have to pick an appropriate name for the table. Any name may
|
||||||
|
be chosen as long as it contains the string `uicc_keys` or `euicc_keys`,
|
||||||
|
depending on the type of data (`UICC` or `eUICC`) you intend to store in the
|
||||||
|
table. The creation of the table is an administrative task and can only be done
|
||||||
|
with the `admin` user. The `admin` user is selected using the `--admin` switch.
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys --create-table --admin
|
||||||
|
INFO: CSV file: ./csv-to-pgsql_example_01.csv
|
||||||
|
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
|
||||||
|
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
|
||||||
|
INFO: Database host: 127.0.0.1
|
||||||
|
INFO: Database name: my_database
|
||||||
|
INFO: Database user: my_admin_user
|
||||||
|
INFO: New database table created: uicc_keys
|
||||||
|
INFO: Database table: uicc_keys
|
||||||
|
INFO: Database table columns: ['ICCID', 'IMSI']
|
||||||
|
INFO: Adding missing columns: ['PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
|
||||||
|
INFO: Changes to table uicc_keys committed!
|
||||||
|
|
||||||
|
The importer has created a new table with the name `uicc_keys`. The table is
|
||||||
|
now ready to be filled with data.
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys
|
||||||
|
INFO: CSV file: ./csv-to-pgsql_example_01.csv
|
||||||
|
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
|
||||||
|
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
|
||||||
|
INFO: Database host: 127.0.0.1
|
||||||
|
INFO: Database name: my_database
|
||||||
|
INFO: Database user: my_importer_user
|
||||||
|
INFO: Database table: uicc_keys
|
||||||
|
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
|
||||||
|
INFO: CSV file import done, 3 rows imported
|
||||||
|
INFO: Changes to table uicc_keys committed!
|
||||||
|
|
||||||
|
A quick `SELECT * FROM uicc_keys;` at the PostgreSQL console should now display
|
||||||
|
the contents of the CSV file you have fed into the importer.
|
||||||
|
|
||||||
|
Let's now assume that with your next batch of UICC cards your vendor includes
|
||||||
|
the Global Platform keys so your CSV format changes. It may now look like this:
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
"id","imsi","iccid","acc","pin1","puk1","pin2","puk2","ki","opc","adm1","scp02_dek_1","scp02_enc_1","scp02_mac_1"
|
||||||
|
"card4","999700000000004","8900000000000000004","0004","4444","44444444","0404","04040404","44444444444444444444444444444444","44444444444444444444444444444444","44444444","44444444444444444444444444444444","44444444444444444444444444444444","44444444444444444444444444444444"
|
||||||
|
"card5","999700000000005","8900000000000000005","0005","4444","55555555","0505","05050505","55555555555555555555555555555555","55555555555555555555555555555555","55555555","55555555555555555555555555555555","55555555555555555555555555555555","55555555555555555555555555555555"
|
||||||
|
"card6","999700000000006","8900000000000000006","0006","4444","66666666","0606","06060606","66666666666666666666666666666666","66666666666666666666666666666666","66666666","66666666666666666666666666666666","66666666666666666666666666666666","66666666666666666666666666666666"
|
||||||
|
|
||||||
|
When importing data from an updated CSV format the database table also has
|
||||||
|
to be updated. This is done using the `--update-columns` switch. Like when
|
||||||
|
creating new tables, this operation also requires admin privileges, so the
|
||||||
|
`--admin` switch is required again.
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys --update-columns --admin
|
||||||
|
INFO: CSV file: ./csv-to-pgsql_example_02.csv
|
||||||
|
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
|
||||||
|
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
|
||||||
|
INFO: Database host: 127.0.0.1
|
||||||
|
INFO: Database name: my_database
|
||||||
|
INFO: Database user: my_admin_user
|
||||||
|
INFO: Database table: uicc_keys
|
||||||
|
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
|
||||||
|
INFO: Adding missing columns: ['SCP02_ENC_1', 'SCP02_MAC_1', 'SCP02_DEK_1']
|
||||||
|
INFO: Changes to table uicc_keys committed!
|
||||||
|
|
||||||
|
When the new table columns are added, the import may be continued like the
|
||||||
|
first one:
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys
|
||||||
|
INFO: CSV file: ./csv-to-pgsql_example_02.csv
|
||||||
|
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
|
||||||
|
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
|
||||||
|
INFO: Database host: 127.0.0.1
|
||||||
|
INFO: Database name: my_database
|
||||||
|
INFO: Database user: my_importer_user
|
||||||
|
INFO: Database table: uicc_keys
|
||||||
|
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC', 'SCP02_ENC_1', 'SCP02_MAC_1', 'SCP02_DEK_1']
|
||||||
|
INFO: CSV file import done, 3 rows imported
|
||||||
|
INFO: Changes to table uicc_keys committed!
|
||||||
|
|
||||||
|
On the PostgreSQL console a `SELECT * FROM uicc_keys;` should now show the
|
||||||
|
imported data with the added columns. All important data should now also be
|
||||||
|
available from within pySim-shell via the `CardKeyProviderPgsql`.
|
||||||
|
|
||||||
|
|
||||||
Column-Level CSV encryption
|
Column-Level CSV encryption
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
---------------------------
|
||||||
|
|
||||||
pySim supports column-level CSV encryption. This feature will make sure
|
pySim supports column-level CSV encryption. This feature will make sure
|
||||||
that your key material is not stored in plaintext in the CSV file.
|
that your key material is not stored in plaintext in the CSV file (or
|
||||||
|
database).
|
||||||
|
|
||||||
The encryption mechanism uses AES in CBC mode. You can use any key
|
The encryption mechanism uses AES in CBC mode. You can use any key
|
||||||
length permitted by AES (128/192/256 bit).
|
length permitted by AES (128/192/256 bit).
|
||||||
@@ -72,6 +278,8 @@ by all columns of the set:
|
|||||||
* `SCP03_ISDA` is a group alias for `SCP03_ENC_ISDA`, `SCP03_MAC_ISDA`, `SCP03_DEK_ISDA`
|
* `SCP03_ISDA` is a group alias for `SCP03_ENC_ISDA`, `SCP03_MAC_ISDA`, `SCP03_DEK_ISDA`
|
||||||
* `SCP03_ISDR` is a group alias for `SCP03_ENC_ISDR`, `SCP03_MAC_ISDR`, `SCP03_DEK_ISDR`
|
* `SCP03_ISDR` is a group alias for `SCP03_ENC_ISDR`, `SCP03_MAC_ISDR`, `SCP03_DEK_ISDR`
|
||||||
|
|
||||||
|
NOTE: When using `CardKeyProviderPqsl`, the input CSV files must be encrypted
|
||||||
|
before import.
|
||||||
|
|
||||||
Field naming
|
Field naming
|
||||||
------------
|
------------
|
||||||
@@ -82,9 +290,9 @@ Field naming
|
|||||||
* For look-up of eUICC specific key material (like SCP03 keys for the
|
* For look-up of eUICC specific key material (like SCP03 keys for the
|
||||||
ISD-R, ECASD), pySim uses the `EID` field as lookup key.
|
ISD-R, ECASD), pySim uses the `EID` field as lookup key.
|
||||||
|
|
||||||
As soon as the CardKeyProviderCsv finds a line (row) in your CSV where
|
As soon as the CardKeyProvider finds a line (row) in your CSV file
|
||||||
the ICCID or EID match, it looks for the column containing the requested
|
(or database) where the ICCID or EID match, it looks for the column containing
|
||||||
data.
|
the requested data.
|
||||||
|
|
||||||
|
|
||||||
ADM PIN
|
ADM PIN
|
||||||
|
|||||||
@@ -1,2 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
python3 -m pylint -j0 --errors-only --disable E1102 --disable E0401 --enable W0301 pySim
|
|
||||||
@@ -1,4 +0,0 @@
|
|||||||
#!/bin/sh -e
|
|
||||||
set -x
|
|
||||||
cd "$(dirname "$0")"
|
|
||||||
ruff check .
|
|
||||||
@@ -69,7 +69,7 @@ from pySim.ts_102_222 import Ts102222Commands
|
|||||||
from pySim.gsm_r import DF_EIRENE
|
from pySim.gsm_r import DF_EIRENE
|
||||||
from pySim.cat import ProactiveCommand
|
from pySim.cat import ProactiveCommand
|
||||||
|
|
||||||
from pySim.card_key_provider import CardKeyProviderCsv
|
from pySim.card_key_provider import CardKeyProviderCsv, CardKeyProviderPgsql
|
||||||
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
|
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
|
||||||
|
|
||||||
from pySim.app import init_card
|
from pySim.app import init_card
|
||||||
@@ -1140,6 +1140,9 @@ card_key_group = option_parser.add_argument_group('Card Key Provider Options')
|
|||||||
card_key_group.add_argument('--csv', metavar='FILE',
|
card_key_group.add_argument('--csv', metavar='FILE',
|
||||||
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
|
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
|
||||||
help='Read card data from CSV file')
|
help='Read card data from CSV file')
|
||||||
|
card_key_group.add_argument('--pqsql', metavar='FILE',
|
||||||
|
default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
|
||||||
|
help='Read card data from PostgreSQL database (config file)')
|
||||||
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||||
help=argparse.SUPPRESS, dest='column_key')
|
help=argparse.SUPPRESS, dest='column_key')
|
||||||
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||||
@@ -1179,6 +1182,8 @@ if __name__ == '__main__':
|
|||||||
column_keys[name] = key
|
column_keys[name] = key
|
||||||
if os.path.isfile(opts.csv):
|
if os.path.isfile(opts.csv):
|
||||||
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
|
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
|
||||||
|
if os.path.isfile(opts.pqsql):
|
||||||
|
card_key_provider_register(CardKeyProviderPgsql(opts.pqsql, column_keys))
|
||||||
|
|
||||||
# Init card reader driver
|
# Init card reader driver
|
||||||
sl = init_reader(opts, proactive_handler = Proact())
|
sl = init_reader(opts, proactive_handler = Proact())
|
||||||
|
|||||||
@@ -36,6 +36,9 @@ from pySim.log import PySimLogger
|
|||||||
import abc
|
import abc
|
||||||
import csv
|
import csv
|
||||||
import logging
|
import logging
|
||||||
|
import yaml
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2.sql import Identifier, SQL
|
||||||
|
|
||||||
log = PySimLogger.get("CARDKEY")
|
log = PySimLogger.get("CARDKEY")
|
||||||
|
|
||||||
@@ -159,6 +162,7 @@ class CardKeyProviderCsv(CardKeyProvider):
|
|||||||
csv_filename : file name (path) of CSV file containing card-individual key/data
|
csv_filename : file name (path) of CSV file containing card-individual key/data
|
||||||
transport_keys : (see class CardKeyFieldCryptor)
|
transport_keys : (see class CardKeyFieldCryptor)
|
||||||
"""
|
"""
|
||||||
|
log.info("Using CSV file as card key data source: %s" % csv_filename)
|
||||||
self.csv_file = open(csv_filename, 'r')
|
self.csv_file = open(csv_filename, 'r')
|
||||||
if not self.csv_file:
|
if not self.csv_file:
|
||||||
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
||||||
@@ -186,6 +190,66 @@ class CardKeyProviderCsv(CardKeyProvider):
|
|||||||
return None
|
return None
|
||||||
return return_dict
|
return return_dict
|
||||||
|
|
||||||
|
class CardKeyProviderPgsql(CardKeyProvider):
|
||||||
|
"""Card key provider implementation that allows to query against a specified PostgreSQL database table."""
|
||||||
|
|
||||||
|
def __init__(self, config_filename: str, transport_keys: dict):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
config_filename : file name (path) of CSV file containing card-individual key/data
|
||||||
|
transport_keys : (see class CardKeyFieldCryptor)
|
||||||
|
"""
|
||||||
|
log.info("Using SQL database as card key data source: %s" % config_filename)
|
||||||
|
with open(config_filename, "r") as cfg:
|
||||||
|
config = yaml.load(cfg, Loader=yaml.FullLoader)
|
||||||
|
log.info("Card key database name: %s" % config.get('db_name'))
|
||||||
|
db_users = config.get('db_users')
|
||||||
|
user = db_users.get('reader')
|
||||||
|
if user is None:
|
||||||
|
raise ValueError("user for role 'reader' not set up in config file.")
|
||||||
|
self.conn = psycopg2.connect(dbname=config.get('db_name'),
|
||||||
|
user=user.get('name'),
|
||||||
|
password=user.get('pass'),
|
||||||
|
host=config.get('host'))
|
||||||
|
self.tables = config.get('table_names')
|
||||||
|
log.info("Card key database tables: %s" % str(self.tables))
|
||||||
|
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||||
|
|
||||||
|
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||||
|
db_result = None
|
||||||
|
for t in self.tables:
|
||||||
|
self.conn.rollback()
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
|
||||||
|
# Make sure that the database table and the key column actually exists. If not, move on to the next table
|
||||||
|
cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (t,))
|
||||||
|
cols_result = cur.fetchall()
|
||||||
|
if cols_result == []:
|
||||||
|
log.warning("Card Key database seems to lack table %s, check config file!" % t)
|
||||||
|
continue
|
||||||
|
if (key.lower(),) not in cols_result:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Query requested columns from database table
|
||||||
|
query = SQL("SELECT {}").format(Identifier(fields[0].lower()))
|
||||||
|
for f in fields[1:]:
|
||||||
|
query += SQL(", {}").format(Identifier(f.lower()))
|
||||||
|
query += SQL(" FROM {} WHERE {} = %s LIMIT 1;").format(Identifier(t.lower()),
|
||||||
|
Identifier(key.lower()))
|
||||||
|
cur.execute(query, (value,))
|
||||||
|
db_result = cur.fetchone()
|
||||||
|
cur.close()
|
||||||
|
|
||||||
|
if db_result:
|
||||||
|
break
|
||||||
|
|
||||||
|
if db_result is None:
|
||||||
|
return None
|
||||||
|
result = dict(zip(fields, db_result))
|
||||||
|
|
||||||
|
for k in result.keys():
|
||||||
|
result[k] = self.crypt.decrypt_field(k, result.get(k))
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ class File:
|
|||||||
self.file_type = template.file_type
|
self.file_type = template.file_type
|
||||||
self.fid = template.fid
|
self.fid = template.fid
|
||||||
self.sfi = template.sfi
|
self.sfi = template.sfi
|
||||||
self.arr = template.arr.to_bytes(1)
|
self.arr = template.arr.to_bytes(1, 'big')
|
||||||
if hasattr(template, 'rec_len'):
|
if hasattr(template, 'rec_len'):
|
||||||
self.rec_len = template.rec_len
|
self.rec_len = template.rec_len
|
||||||
else:
|
else:
|
||||||
@@ -227,7 +227,7 @@ class File:
|
|||||||
fileDescriptor['shortEFID'] = bytes([self.sfi])
|
fileDescriptor['shortEFID'] = bytes([self.sfi])
|
||||||
if self.df_name:
|
if self.df_name:
|
||||||
fileDescriptor['dfName'] = self.df_name
|
fileDescriptor['dfName'] = self.df_name
|
||||||
if self.arr and self.arr != self.template.arr.to_bytes(1):
|
if self.arr and self.arr != self.template.arr.to_bytes(1, 'big'):
|
||||||
fileDescriptor['securityAttributesReferenced'] = self.arr
|
fileDescriptor['securityAttributesReferenced'] = self.arr
|
||||||
if self.file_type in ['LF', 'CY']:
|
if self.file_type in ['LF', 'CY']:
|
||||||
fdb_dec['file_type'] = 'working_ef'
|
fdb_dec['file_type'] = 'working_ef'
|
||||||
@@ -264,7 +264,7 @@ class File:
|
|||||||
if self.read_and_update_when_deact:
|
if self.read_and_update_when_deact:
|
||||||
spfi |= 0x40 # TS 102 222 Table 5
|
spfi |= 0x40 # TS 102 222 Table 5
|
||||||
if spfi != 0x00:
|
if spfi != 0x00:
|
||||||
pefi['specialFileInformation'] = spfi.to_bytes(1)
|
pefi['specialFileInformation'] = spfi.to_bytes(1, 'big')
|
||||||
if self.fill_pattern:
|
if self.fill_pattern:
|
||||||
if not self.fill_pattern_repeat:
|
if not self.fill_pattern_repeat:
|
||||||
pefi['fillPattern'] = self.fill_pattern
|
pefi['fillPattern'] = self.fill_pattern
|
||||||
@@ -1006,13 +1006,6 @@ class SecurityDomainKey:
|
|||||||
'keyVersionNumber': bytes([self.key_version_number]),
|
'keyVersionNumber': bytes([self.key_version_number]),
|
||||||
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
||||||
|
|
||||||
def get_key_component(self, key_type):
|
|
||||||
for kc in self.key_components:
|
|
||||||
if kc.key_type == key_type:
|
|
||||||
return kc.key_data
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
class ProfileElementSD(ProfileElement):
|
class ProfileElementSD(ProfileElement):
|
||||||
"""Class representing a securityDomain ProfileElement."""
|
"""Class representing a securityDomain ProfileElement."""
|
||||||
type = 'securityDomain'
|
type = 'securityDomain'
|
||||||
|
|||||||
@@ -1,237 +0,0 @@
|
|||||||
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
|
|
||||||
#
|
|
||||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
|
||||||
#
|
|
||||||
# Author: nhofmeyr@sysmocom.de
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU Affero General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
import secrets
|
|
||||||
import re
|
|
||||||
from pySim.utils import all_subclasses_of
|
|
||||||
from osmocom.utils import b2h
|
|
||||||
|
|
||||||
class ParamSourceExn(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ParamSourceExhaustedExn(ParamSourceExn):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ParamSourceUndefinedExn(ParamSourceExn):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ParamSource:
|
|
||||||
'abstract parameter source. For usage, see personalization.BatchPersonalization.'
|
|
||||||
is_abstract = True
|
|
||||||
|
|
||||||
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
|
|
||||||
name = 'none'
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_all_implementations(cls, blacklist=None):
|
|
||||||
"return all subclasses of ParamSource that have is_abstract = False."
|
|
||||||
# return a set() so that multiple inheritance does not return dups
|
|
||||||
return set(c
|
|
||||||
for c in all_subclasses_of(cls)
|
|
||||||
if (not c.is_abstract) and ((not blacklist) or (c not in blacklist))
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_str(cls, s:str):
|
|
||||||
'''Subclasses implement this:
|
|
||||||
if a parameter source defines some string input magic, override this function.
|
|
||||||
For example, a RandomDigitSource derives the number of digits from the string length,
|
|
||||||
so the user can enter '0000' to get a four digit random number.'''
|
|
||||||
return cls(s)
|
|
||||||
|
|
||||||
def get_next(self, csv_row:dict=None):
|
|
||||||
'''Subclasses implement this: return the next value from the parameter source.
|
|
||||||
When there are no more values from the source, raise a ParamSourceExhaustedExn.'''
|
|
||||||
raise ParamSourceExhaustedExn()
|
|
||||||
|
|
||||||
|
|
||||||
class ConstantSource(ParamSource):
|
|
||||||
'one value for all'
|
|
||||||
is_abstract = False
|
|
||||||
name = 'constant'
|
|
||||||
|
|
||||||
def __init__(self, val:str):
|
|
||||||
self.val = val
|
|
||||||
|
|
||||||
def get_next(self, csv_row:dict=None):
|
|
||||||
return self.val
|
|
||||||
|
|
||||||
class InputExpandingParamSource(ParamSource):
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def expand_str(cls, s:str):
|
|
||||||
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
|
|
||||||
if '*' not in s:
|
|
||||||
return s
|
|
||||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
|
|
||||||
if len(tokens) < 3:
|
|
||||||
return s
|
|
||||||
parts = []
|
|
||||||
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
|
|
||||||
parts.append(unchanged)
|
|
||||||
repeat = int(repeat_str)
|
|
||||||
parts.append(snippet * repeat)
|
|
||||||
return ''.join(parts)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_str(cls, s:str):
|
|
||||||
return cls(cls.expand_str(s))
|
|
||||||
|
|
||||||
class RandomSourceMixin:
|
|
||||||
random_impl = secrets.SystemRandom()
|
|
||||||
|
|
||||||
class RandomDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
|
||||||
'return a different sequence of random decimal digits each'
|
|
||||||
is_abstract = False
|
|
||||||
name = 'random decimal digits'
|
|
||||||
used_keys = set()
|
|
||||||
|
|
||||||
def __init__(self, num_digits, first_value, last_value):
|
|
||||||
"""
|
|
||||||
See also from_str().
|
|
||||||
|
|
||||||
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
|
|
||||||
num_digits: number of random digits (possibly with leading zeros) to generate.
|
|
||||||
first_value, last_value: the decimal range in which to provide random digits.
|
|
||||||
"""
|
|
||||||
num_digits = int(num_digits)
|
|
||||||
first_value = int(first_value)
|
|
||||||
last_value = int(last_value)
|
|
||||||
assert num_digits > 0
|
|
||||||
assert first_value <= last_value
|
|
||||||
self.num_digits = num_digits
|
|
||||||
self.val_first_last = (first_value, last_value)
|
|
||||||
|
|
||||||
def get_next(self, csv_row:dict=None):
|
|
||||||
# try to generate random digits that are always different from previously produced random bytes
|
|
||||||
attempts = 10
|
|
||||||
while True:
|
|
||||||
val = self.random_impl.randint(*self.val_first_last)
|
|
||||||
if val in RandomDigitSource.used_keys:
|
|
||||||
attempts -= 1
|
|
||||||
if attempts:
|
|
||||||
continue
|
|
||||||
RandomDigitSource.used_keys.add(val)
|
|
||||||
break
|
|
||||||
return self.val_to_digit(val)
|
|
||||||
|
|
||||||
def val_to_digit(self, val:int):
|
|
||||||
return '%0*d' % (self.num_digits, val) # pylint: disable=consider-using-f-string
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_str(cls, s:str):
|
|
||||||
s = cls.expand_str(s)
|
|
||||||
|
|
||||||
if '..' in s:
|
|
||||||
first_str, last_str = s.split('..')
|
|
||||||
first_str = first_str.strip()
|
|
||||||
last_str = last_str.strip()
|
|
||||||
else:
|
|
||||||
first_str = s.strip()
|
|
||||||
last_str = None
|
|
||||||
|
|
||||||
first_value = int(first_str)
|
|
||||||
last_value = int(last_str) if last_str is not None else '9' * len(first_str)
|
|
||||||
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
|
|
||||||
|
|
||||||
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
|
||||||
'return a different sequence of random hexadecimal digits each'
|
|
||||||
is_abstract = False
|
|
||||||
name = 'random hexadecimal digits'
|
|
||||||
used_keys = set()
|
|
||||||
|
|
||||||
def __init__(self, num_digits):
|
|
||||||
'see from_str()'
|
|
||||||
num_digits = int(num_digits)
|
|
||||||
if num_digits < 1:
|
|
||||||
raise ValueError('zero number of digits')
|
|
||||||
# hex digits always come in two
|
|
||||||
if (num_digits & 1) != 0:
|
|
||||||
raise ValueError(f'hexadecimal value should have even number of digits, not {num_digits}')
|
|
||||||
self.num_digits = num_digits
|
|
||||||
|
|
||||||
def get_next(self, csv_row:dict=None):
|
|
||||||
# try to generate random bytes that are always different from previously produced random bytes
|
|
||||||
attempts = 10
|
|
||||||
while True:
|
|
||||||
val = self.random_impl.randbytes(self.num_digits // 2)
|
|
||||||
if val in RandomHexDigitSource.used_keys:
|
|
||||||
attempts -= 1
|
|
||||||
if attempts:
|
|
||||||
continue
|
|
||||||
RandomHexDigitSource.used_keys.add(val)
|
|
||||||
break
|
|
||||||
|
|
||||||
return b2h(val)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_str(cls, s:str):
|
|
||||||
s = cls.expand_str(s)
|
|
||||||
return cls(num_digits=len(s.strip()))
|
|
||||||
|
|
||||||
class IncDigitSource(RandomDigitSource):
|
|
||||||
'incrementing sequence of digits'
|
|
||||||
is_abstract = False
|
|
||||||
name = 'incrementing decimal digits'
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
"The arguments defining the number of digits and value range are identical to RandomDigitSource.__init__()."
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.next_val = None
|
|
||||||
self.reset()
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
"Restart from the first value of the defined range passed to __init__()."
|
|
||||||
self.next_val = self.val_first_last[0]
|
|
||||||
|
|
||||||
def get_next(self, csv_row:dict=None):
|
|
||||||
val = self.next_val
|
|
||||||
if val is None:
|
|
||||||
raise ParamSourceExhaustedExn()
|
|
||||||
|
|
||||||
returnval = self.val_to_digit(val)
|
|
||||||
|
|
||||||
val += 1
|
|
||||||
if val > self.val_first_last[1]:
|
|
||||||
self.next_val = None
|
|
||||||
else:
|
|
||||||
self.next_val = val
|
|
||||||
|
|
||||||
return returnval
|
|
||||||
|
|
||||||
class CsvSource(ParamSource):
|
|
||||||
'apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)'
|
|
||||||
is_abstract = False
|
|
||||||
name = 'from CSV'
|
|
||||||
|
|
||||||
def __init__(self, csv_column):
|
|
||||||
"""
|
|
||||||
csv_column: column name indicating the column to use for this parameter.
|
|
||||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
|
||||||
CsvSource picks the column with the name matching csv_column.
|
|
||||||
"""
|
|
||||||
self.csv_column = csv_column
|
|
||||||
|
|
||||||
def get_next(self, csv_row:dict=None):
|
|
||||||
val = None
|
|
||||||
if csv_row:
|
|
||||||
val = csv_row.get(self.csv_column)
|
|
||||||
if not val:
|
|
||||||
raise ParamSourceUndefinedExn(f'no value for CSV column {self.csv_column!r}')
|
|
||||||
return val
|
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -103,26 +103,6 @@ class CheckBasicStructure(ProfileConstraintChecker):
|
|||||||
if 'profile-a-p256' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs):
|
if 'profile-a-p256' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs):
|
||||||
raise ProfileError('profile-a-p256 mandatory, but no usim or isim')
|
raise ProfileError('profile-a-p256 mandatory, but no usim or isim')
|
||||||
|
|
||||||
def check_mandatory_services_aka(self, pes: ProfileElementSequence):
|
|
||||||
"""Ensure that no unnecessary authentication related services are marked as mandatory but not
|
|
||||||
actually used within the profile"""
|
|
||||||
m_svcs = pes.get_pe_for_type('header').decoded['eUICC-Mandatory-services']
|
|
||||||
# list of tuples (algo_id, key_len_in_octets) for all the akaParameters in the PE Sequence
|
|
||||||
algo_id_klen = [(x.decoded['algoConfiguration'][1]['algorithmID'],
|
|
||||||
len(x.decoded['algoConfiguration'][1]['key'])) for x in pes.get_pes_for_type('akaParameter')]
|
|
||||||
# just a plain list of algorithm IDs in akaParameters
|
|
||||||
algorithm_ids = [x[0] for x in algo_id_klen]
|
|
||||||
if 'milenage' in m_svcs and not 1 in algorithm_ids:
|
|
||||||
raise ProfileError('milenage mandatory, but no related algorithm_id in akaParameter')
|
|
||||||
if 'tuak128' in m_svcs and not (2, 128/8) in algo_id_klen:
|
|
||||||
raise ProfileError('tuak128 mandatory, but no related algorithm_id in akaParameter')
|
|
||||||
if 'cave' in m_svcs and not pes.get_pe_for_type('cdmaParameter'):
|
|
||||||
raise ProfileError('cave mandatory, but no related cdmaParameter')
|
|
||||||
if 'tuak256' in m_svcs and (2, 256/8) in algo_id_klen:
|
|
||||||
raise ProfileError('tuak256 mandatory, but no related algorithm_id in akaParameter')
|
|
||||||
if 'usim-test-algorithm' in m_svcs and not 3 in algorithm_ids:
|
|
||||||
raise ProfileError('usim-test-algorithm mandatory, but no related algorithm_id in akaParameter')
|
|
||||||
|
|
||||||
def check_identification_unique(self, pes: ProfileElementSequence):
|
def check_identification_unique(self, pes: ProfileElementSequence):
|
||||||
"""Ensure that each PE has a unique identification value."""
|
"""Ensure that each PE has a unique identification value."""
|
||||||
id_list = [pe.header['identification'] for pe in pes.pe_list if pe.header]
|
id_list = [pe.header['identification'] for pe in pes.pe_list if pe.header]
|
||||||
|
|||||||
@@ -91,7 +91,6 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
|||||||
|
|
||||||
# Key Usage:
|
# Key Usage:
|
||||||
# KVN 0x01 .. 0x0F reserved for SCP80
|
# KVN 0x01 .. 0x0F reserved for SCP80
|
||||||
# KVN 0x81 .. 0x8f reserved for SCP81
|
|
||||||
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
|
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
|
||||||
# KVN 0x20 .. 0x2F reserved for SCP02
|
# KVN 0x20 .. 0x2F reserved for SCP02
|
||||||
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK
|
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK
|
||||||
|
|||||||
@@ -108,7 +108,10 @@ class PySimLogger:
|
|||||||
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
|
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
|
||||||
color = PySimLogger.colors.get(record.levelno)
|
color = PySimLogger.colors.get(record.levelno)
|
||||||
if color:
|
if color:
|
||||||
PySimLogger.print_callback(style(formatted_message, fg = color))
|
if type(color) is str:
|
||||||
|
PySimLogger.print_callback(color + formatted_message + "\033[0m")
|
||||||
|
else:
|
||||||
|
PySimLogger.print_callback(style(formatted_message, fg = color))
|
||||||
else:
|
else:
|
||||||
PySimLogger.print_callback(formatted_message)
|
PySimLogger.print_callback(formatted_message)
|
||||||
|
|
||||||
|
|||||||
@@ -267,11 +267,11 @@ class EF_SMSP(LinFixedEF):
|
|||||||
raise ValueError
|
raise ValueError
|
||||||
def _encode(self, obj, context, path):
|
def _encode(self, obj, context, path):
|
||||||
if obj <= 12*60:
|
if obj <= 12*60:
|
||||||
return obj/5 - 1
|
return obj // 5 - 1
|
||||||
elif obj <= 24*60:
|
elif obj <= 24*60:
|
||||||
return 143 + ((obj - (12 * 60)) // 30)
|
return 143 + ((obj - (12 * 60)) // 30)
|
||||||
elif obj <= 30 * 24 * 60:
|
elif obj <= 30 * 24 * 60:
|
||||||
return 166 + (obj / (24 * 60))
|
return 166 + (obj // (24 * 60))
|
||||||
elif obj <= 63 * 7 * 24 * 60:
|
elif obj <= 63 * 7 * 24 * 60:
|
||||||
return 192 + (obj // (7 * 24 * 60))
|
return 192 + (obj // (7 * 24 * 60))
|
||||||
else:
|
else:
|
||||||
@@ -280,7 +280,7 @@ class EF_SMSP(LinFixedEF):
|
|||||||
def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs):
|
def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs):
|
||||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs)
|
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs)
|
||||||
ScAddr = Struct('length'/Int8ub, 'ton_npi'/TonNpi, 'call_number'/BcdAdapter(Rpad(Bytes(10))))
|
ScAddr = Struct('length'/Int8ub, 'ton_npi'/TonNpi, 'call_number'/BcdAdapter(Rpad(Bytes(10))))
|
||||||
self._construct = Struct('alpha_id'/COptional(GsmStringAdapter(Rpad(Bytes(this._.total_len-28)))),
|
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))),
|
||||||
'parameter_indicators'/InvertAdapter(FlagsEnum(Byte, tp_dest_addr=1, tp_sc_addr=2,
|
'parameter_indicators'/InvertAdapter(FlagsEnum(Byte, tp_dest_addr=1, tp_sc_addr=2,
|
||||||
tp_pid=3, tp_dcs=4, tp_vp=5)),
|
tp_pid=3, tp_dcs=4, tp_vp=5)),
|
||||||
'tp_dest_addr'/ScAddr,
|
'tp_dest_addr'/ScAddr,
|
||||||
|
|||||||
@@ -1109,9 +1109,3 @@ class CardCommandSet:
|
|||||||
if cla and not cmd.match_cla(cla):
|
if cla and not cmd.match_cla(cla):
|
||||||
return None
|
return None
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
|
|
||||||
def all_subclasses_of(cls):
|
|
||||||
for subc in cls.__subclasses__():
|
|
||||||
yield subc
|
|
||||||
yield from all_subclasses_of(subc)
|
|
||||||
|
|||||||
@@ -1,2 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
python3 -m pylint -j0 --errors-only --disable E1102 --disable E0401 --enable W0301 pySim
|
|
||||||
@@ -15,3 +15,4 @@ git+https://github.com/osmocom/asn1tools
|
|||||||
packaging
|
packaging
|
||||||
git+https://github.com/hologram-io/smpp.pdu
|
git+https://github.com/hologram-io/smpp.pdu
|
||||||
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
|
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
|
||||||
|
psycopg2-binary
|
||||||
|
|||||||
1
setup.py
1
setup.py
@@ -34,6 +34,7 @@ setup(
|
|||||||
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
|
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
|
||||||
"asn1tools",
|
"asn1tools",
|
||||||
"smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted",
|
"smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted",
|
||||||
|
"psycopg2-binary"
|
||||||
],
|
],
|
||||||
scripts=[
|
scripts=[
|
||||||
'pySim-prog.py',
|
'pySim-prog.py',
|
||||||
|
|||||||
@@ -1 +0,0 @@
|
|||||||
../../smdpp-data
|
|
||||||
@@ -1,399 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
|
||||||
#
|
|
||||||
# Author: Neels Hofmeyr
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 2 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
import io
|
|
||||||
import sys
|
|
||||||
import unittest
|
|
||||||
import io
|
|
||||||
from importlib import resources
|
|
||||||
from osmocom.utils import hexstr
|
|
||||||
from pySim.esim.saip import ProfileElementSequence
|
|
||||||
import pySim.esim.saip.personalization as p13n
|
|
||||||
import smdpp_data.upp
|
|
||||||
|
|
||||||
import xo
|
|
||||||
update_expected_output = False
|
|
||||||
|
|
||||||
def valstr(val):
|
|
||||||
if isinstance(val, io.BytesIO):
|
|
||||||
val = val.getvalue()
|
|
||||||
if isinstance(val, bytearray):
|
|
||||||
val = bytes(val)
|
|
||||||
return f'{val!r}'
|
|
||||||
|
|
||||||
def valtypestr(val):
|
|
||||||
if isinstance(val, dict):
|
|
||||||
types = []
|
|
||||||
for v in val.values():
|
|
||||||
types.append(f'{type(v).__name__}')
|
|
||||||
|
|
||||||
val_type = '{' + ', '.join(types) + '}'
|
|
||||||
else:
|
|
||||||
val_type = f'{type(val).__name__}'
|
|
||||||
return f'{valstr(val)}:{val_type}'
|
|
||||||
|
|
||||||
class D:
|
|
||||||
mandatory = set()
|
|
||||||
optional = set()
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
|
||||||
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
|
||||||
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
|
||||||
for k, v in kwargs.items():
|
|
||||||
setattr(self, k, v)
|
|
||||||
for k in self.optional:
|
|
||||||
if not hasattr(self, k):
|
|
||||||
setattr(self, k, None)
|
|
||||||
|
|
||||||
class ConfigurableParameterTest(unittest.TestCase):
|
|
||||||
|
|
||||||
def test_parameters(self):
|
|
||||||
|
|
||||||
upp_fnames = (
|
|
||||||
'TS48v5_SAIP2.1A_NoBERTLV.der',
|
|
||||||
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
|
|
||||||
'TS48v5_SAIP2.1B_NoBERTLV.der',
|
|
||||||
'TS48v5_SAIP2.3_NoBERTLV.der',
|
|
||||||
)
|
|
||||||
|
|
||||||
class Paramtest(D):
|
|
||||||
mandatory = (
|
|
||||||
'param_cls',
|
|
||||||
'val',
|
|
||||||
'expect_val',
|
|
||||||
)
|
|
||||||
optional = (
|
|
||||||
'expect_clean_val',
|
|
||||||
)
|
|
||||||
|
|
||||||
param_tests = [
|
|
||||||
Paramtest(param_cls=p13n.Imsi, val='123456',
|
|
||||||
expect_clean_val=str('123456'),
|
|
||||||
expect_val={'IMSI': hexstr('123456'),
|
|
||||||
'IMSI-ACC': '0040'}),
|
|
||||||
Paramtest(param_cls=p13n.Imsi, val=int(123456),
|
|
||||||
expect_val={'IMSI': hexstr('123456'),
|
|
||||||
'IMSI-ACC': '0040'}),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.Imsi, val='123456789012345',
|
|
||||||
expect_clean_val=str('123456789012345'),
|
|
||||||
expect_val={'IMSI': hexstr('123456789012345'),
|
|
||||||
'IMSI-ACC': '0020'}),
|
|
||||||
Paramtest(param_cls=p13n.Imsi, val=int(123456789012345),
|
|
||||||
expect_val={'IMSI': hexstr('123456789012345'),
|
|
||||||
'IMSI-ACC': '0020'}),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.Puk1,
|
|
||||||
val='12345678',
|
|
||||||
expect_clean_val=b'12345678',
|
|
||||||
expect_val='12345678'),
|
|
||||||
Paramtest(param_cls=p13n.Puk1,
|
|
||||||
val=int(12345678),
|
|
||||||
expect_clean_val=b'12345678',
|
|
||||||
expect_val='12345678'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.Puk2,
|
|
||||||
val='12345678',
|
|
||||||
expect_clean_val=b'12345678',
|
|
||||||
expect_val='12345678'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.Pin1,
|
|
||||||
val='1234',
|
|
||||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
|
||||||
expect_val='1234'),
|
|
||||||
Paramtest(param_cls=p13n.Pin1,
|
|
||||||
val='123456',
|
|
||||||
expect_clean_val=b'123456\xff\xff',
|
|
||||||
expect_val='123456'),
|
|
||||||
Paramtest(param_cls=p13n.Pin1,
|
|
||||||
val='12345678',
|
|
||||||
expect_clean_val=b'12345678',
|
|
||||||
expect_val='12345678'),
|
|
||||||
Paramtest(param_cls=p13n.Pin1,
|
|
||||||
val=int(1234),
|
|
||||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
|
||||||
expect_val='1234'),
|
|
||||||
Paramtest(param_cls=p13n.Pin1,
|
|
||||||
val=int(123456),
|
|
||||||
expect_clean_val=b'123456\xff\xff',
|
|
||||||
expect_val='123456'),
|
|
||||||
Paramtest(param_cls=p13n.Pin1,
|
|
||||||
val=int(12345678),
|
|
||||||
expect_clean_val=b'12345678',
|
|
||||||
expect_val='12345678'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.Adm1,
|
|
||||||
val='1234',
|
|
||||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
|
||||||
expect_val='1234'),
|
|
||||||
Paramtest(param_cls=p13n.Adm1,
|
|
||||||
val='123456',
|
|
||||||
expect_clean_val=b'123456\xff\xff',
|
|
||||||
expect_val='123456'),
|
|
||||||
Paramtest(param_cls=p13n.Adm1,
|
|
||||||
val='12345678',
|
|
||||||
expect_clean_val=b'12345678',
|
|
||||||
expect_val='12345678'),
|
|
||||||
Paramtest(param_cls=p13n.Adm1,
|
|
||||||
val=int(123456),
|
|
||||||
expect_clean_val=b'123456\xff\xff',
|
|
||||||
expect_val='123456'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.AlgorithmID,
|
|
||||||
val='Milenage',
|
|
||||||
expect_clean_val=1,
|
|
||||||
expect_val='Milenage'),
|
|
||||||
Paramtest(param_cls=p13n.AlgorithmID,
|
|
||||||
val='TUAK',
|
|
||||||
expect_clean_val=2,
|
|
||||||
expect_val='TUAK'),
|
|
||||||
Paramtest(param_cls=p13n.AlgorithmID,
|
|
||||||
val='usim-test',
|
|
||||||
expect_clean_val=3,
|
|
||||||
expect_val='usim-test'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.AlgorithmID,
|
|
||||||
val=1,
|
|
||||||
expect_clean_val=1,
|
|
||||||
expect_val='Milenage'),
|
|
||||||
Paramtest(param_cls=p13n.AlgorithmID,
|
|
||||||
val=2,
|
|
||||||
expect_clean_val=2,
|
|
||||||
expect_val='TUAK'),
|
|
||||||
Paramtest(param_cls=p13n.AlgorithmID,
|
|
||||||
val=3,
|
|
||||||
expect_clean_val=3,
|
|
||||||
expect_val='usim-test'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.K,
|
|
||||||
val='01020304050607080910111213141516',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
Paramtest(param_cls=p13n.K,
|
|
||||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
Paramtest(param_cls=p13n.K,
|
|
||||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
Paramtest(param_cls=p13n.K,
|
|
||||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
Paramtest(param_cls=p13n.K,
|
|
||||||
val=int(11020304050607080910111213141516),
|
|
||||||
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='11020304050607080910111213141516'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=p13n.Opc,
|
|
||||||
val='01020304050607080910111213141516',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
Paramtest(param_cls=p13n.Opc,
|
|
||||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
Paramtest(param_cls=p13n.Opc,
|
|
||||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
Paramtest(param_cls=p13n.Opc,
|
|
||||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516'),
|
|
||||||
]
|
|
||||||
|
|
||||||
for sdkey_cls in (
|
|
||||||
# thin out the number of tests, as a compromise between completeness and test runtime
|
|
||||||
p13n.SdKeyScp80Kvn01Enc,
|
|
||||||
#p13n.SdKeyScp80Kvn01Dek,
|
|
||||||
#p13n.SdKeyScp80Kvn01Mac,
|
|
||||||
#p13n.SdKeyScp80Kvn02Enc,
|
|
||||||
p13n.SdKeyScp80Kvn02Dek,
|
|
||||||
#p13n.SdKeyScp80Kvn02Mac,
|
|
||||||
#p13n.SdKeyScp81Kvn81Enc,
|
|
||||||
#p13n.SdKeyScp81Kvn81Dek,
|
|
||||||
p13n.SdKeyScp81Kvn81Mac,
|
|
||||||
#p13n.SdKeyScp81Kvn82Enc,
|
|
||||||
#p13n.SdKeyScp81Kvn82Dek,
|
|
||||||
#p13n.SdKeyScp81Kvn82Mac,
|
|
||||||
p13n.SdKeyScp81Kvn83Enc,
|
|
||||||
#p13n.SdKeyScp81Kvn83Dek,
|
|
||||||
#p13n.SdKeyScp81Kvn83Mac,
|
|
||||||
#p13n.SdKeyScp02Kvn20Enc,
|
|
||||||
p13n.SdKeyScp02Kvn20Dek,
|
|
||||||
#p13n.SdKeyScp02Kvn20Mac,
|
|
||||||
#p13n.SdKeyScp02Kvn21Enc,
|
|
||||||
#p13n.SdKeyScp02Kvn21Dek,
|
|
||||||
p13n.SdKeyScp02Kvn21Mac,
|
|
||||||
#p13n.SdKeyScp02Kvn22Enc,
|
|
||||||
#p13n.SdKeyScp02Kvn22Dek,
|
|
||||||
#p13n.SdKeyScp02Kvn22Mac,
|
|
||||||
p13n.SdKeyScp02KvnffEnc,
|
|
||||||
#p13n.SdKeyScp02KvnffDek,
|
|
||||||
#p13n.SdKeyScp02KvnffMac,
|
|
||||||
#p13n.SdKeyScp03Kvn30Enc,
|
|
||||||
p13n.SdKeyScp03Kvn30Dek,
|
|
||||||
#p13n.SdKeyScp03Kvn30Mac,
|
|
||||||
#p13n.SdKeyScp03Kvn31Enc,
|
|
||||||
#p13n.SdKeyScp03Kvn31Dek,
|
|
||||||
p13n.SdKeyScp03Kvn31Mac,
|
|
||||||
#p13n.SdKeyScp03Kvn32Enc,
|
|
||||||
#p13n.SdKeyScp03Kvn32Dek,
|
|
||||||
#p13n.SdKeyScp03Kvn32Mac,
|
|
||||||
):
|
|
||||||
|
|
||||||
param_tests.extend([
|
|
||||||
|
|
||||||
Paramtest(param_cls=sdkey_cls,
|
|
||||||
val='01020304050607080910111213141516',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516',
|
|
||||||
),
|
|
||||||
Paramtest(param_cls=sdkey_cls,
|
|
||||||
val='010203040506070809101112131415161718192021222324',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
|
||||||
b'\x17\x18\x19\x20\x21\x22\x23\x24',
|
|
||||||
expect_val='010203040506070809101112131415161718192021222324'),
|
|
||||||
Paramtest(param_cls=sdkey_cls,
|
|
||||||
val='0102030405060708091011121314151617181920212223242526272829303132',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
|
||||||
b'\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31\x32',
|
|
||||||
expect_val='0102030405060708091011121314151617181920212223242526272829303132'),
|
|
||||||
|
|
||||||
Paramtest(param_cls=sdkey_cls,
|
|
||||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516',
|
|
||||||
),
|
|
||||||
Paramtest(param_cls=sdkey_cls,
|
|
||||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516',
|
|
||||||
),
|
|
||||||
Paramtest(param_cls=sdkey_cls,
|
|
||||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
|
||||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='01020304050607080910111213141516',
|
|
||||||
),
|
|
||||||
Paramtest(param_cls=sdkey_cls,
|
|
||||||
val=11020304050607080910111213141516,
|
|
||||||
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
|
||||||
expect_val='11020304050607080910111213141516',
|
|
||||||
),
|
|
||||||
])
|
|
||||||
|
|
||||||
outputs = []
|
|
||||||
|
|
||||||
for upp_fname in upp_fnames:
|
|
||||||
test_idx = -1
|
|
||||||
try:
|
|
||||||
|
|
||||||
der = resources.read_binary(smdpp_data.upp, upp_fname)
|
|
||||||
|
|
||||||
for t in param_tests:
|
|
||||||
test_idx += 1
|
|
||||||
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
|
|
||||||
|
|
||||||
param = None
|
|
||||||
try:
|
|
||||||
param = t.param_cls()
|
|
||||||
param.input_value = t.val
|
|
||||||
param.validate()
|
|
||||||
except ValueError as e:
|
|
||||||
raise ValueError(f'{logloc}: {e}') from e
|
|
||||||
|
|
||||||
clean_val = param.value
|
|
||||||
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
|
|
||||||
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
|
|
||||||
raise ValueError(f'{logloc}: expected'
|
|
||||||
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
|
|
||||||
|
|
||||||
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
|
|
||||||
# pes = copy.deepcopy(orig_pes)
|
|
||||||
pes = ProfileElementSequence.from_der(der)
|
|
||||||
try:
|
|
||||||
param.apply(pes)
|
|
||||||
except ValueError as e:
|
|
||||||
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
|
|
||||||
|
|
||||||
changed_der = pes.to_der()
|
|
||||||
|
|
||||||
pes2 = ProfileElementSequence.from_der(changed_der)
|
|
||||||
|
|
||||||
read_back_val = t.param_cls.get_value_from_pes(pes2)
|
|
||||||
|
|
||||||
# compose log string to show the precise type of dict values
|
|
||||||
if isinstance(read_back_val, dict):
|
|
||||||
types = set()
|
|
||||||
for v in read_back_val.values():
|
|
||||||
types.add(f'{type(v).__name__}')
|
|
||||||
|
|
||||||
read_back_val_type = '{' + ', '.join(types) + '}'
|
|
||||||
else:
|
|
||||||
read_back_val_type = f'{type(read_back_val).__name__}'
|
|
||||||
|
|
||||||
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
|
|
||||||
|
|
||||||
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
|
|
||||||
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
|
|
||||||
|
|
||||||
expect_val = t.expect_val
|
|
||||||
if not isinstance(expect_val, dict):
|
|
||||||
expect_val = { t.param_cls.get_name(): expect_val }
|
|
||||||
if read_back_val != expect_val:
|
|
||||||
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
|
|
||||||
|
|
||||||
ok = logloc.replace(' clean_val', '\n\tclean_val'
|
|
||||||
).replace(' read_back_val', '\n\tread_back_val'
|
|
||||||
).replace('=', '=\t'
|
|
||||||
)
|
|
||||||
output = f'\nok: {ok}'
|
|
||||||
outputs.append(output)
|
|
||||||
print(output)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
raise RuntimeError(f'Error while testing UPP {upp_fname} {test_idx=}: {e}') from e
|
|
||||||
|
|
||||||
output = '\n'.join(outputs) + '\n'
|
|
||||||
xo_name = 'test_configurable_parameters'
|
|
||||||
if update_expected_output:
|
|
||||||
with resources.path(xo, xo_name) as xo_path:
|
|
||||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
|
||||||
f.write(output)
|
|
||||||
else:
|
|
||||||
xo_str = resources.read_text(xo, xo_name)
|
|
||||||
if xo_str != output:
|
|
||||||
at = 0
|
|
||||||
while at < len(output):
|
|
||||||
if output[at] == xo_str[at]:
|
|
||||||
at += 1
|
|
||||||
continue
|
|
||||||
break
|
|
||||||
|
|
||||||
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
if '-u' in sys.argv:
|
|
||||||
update_expected_output = True
|
|
||||||
sys.argv.remove('-u')
|
|
||||||
unittest.main()
|
|
||||||
@@ -63,44 +63,6 @@ class SaipTest(unittest.TestCase):
|
|||||||
# TODO: we don't actually test the results here, but we just verify there is no exception
|
# TODO: we don't actually test the results here, but we just verify there is no exception
|
||||||
pes.to_der()
|
pes.to_der()
|
||||||
|
|
||||||
def test_personalization2(self):
|
|
||||||
"""Test some of the personalization operations."""
|
|
||||||
pes = ProfileElementSequence.from_der(self.per_input)
|
|
||||||
prev_val = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
|
||||||
print(f'{prev_val=}')
|
|
||||||
self.assertTrue(prev_val)
|
|
||||||
|
|
||||||
set_val = '42342342342342342342342342342342'
|
|
||||||
param = SdKeyScp80_01Kic(set_val)
|
|
||||||
param.validate()
|
|
||||||
param.apply(pes)
|
|
||||||
|
|
||||||
get_val1 = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
|
||||||
print(f'{get_val1=} {set_val=}')
|
|
||||||
self.assertEqual(get_val1, set((set_val,)))
|
|
||||||
|
|
||||||
get_val1b = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
|
||||||
print(f'{get_val1b=} {set_val=}')
|
|
||||||
self.assertEqual(get_val1b, set((set_val,)))
|
|
||||||
|
|
||||||
print("HELLOO")
|
|
||||||
der = pes.to_der()
|
|
||||||
print("DONEDONE")
|
|
||||||
|
|
||||||
get_val1c = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
|
||||||
print(f'{get_val1c=} {set_val=}')
|
|
||||||
self.assertEqual(get_val1c, set((set_val,)))
|
|
||||||
|
|
||||||
# assertTrue to not dump the entire der.
|
|
||||||
# Expecting the modified DER to be different. If this assertion fails, then no change has happened in the output
|
|
||||||
# DER and the ConfigurableParameter subclass is buggy.
|
|
||||||
self.assertTrue(der != self.per_input)
|
|
||||||
|
|
||||||
pes2 = ProfileElementSequence.from_der(der)
|
|
||||||
get_val2 = set(SdKeyScp80_01Kic.get_values_from_pes(pes2))
|
|
||||||
print(f'{get_val2=} {set_val=}')
|
|
||||||
self.assertEqual(get_val2, set((set_val,)))
|
|
||||||
|
|
||||||
def test_constructor_encode(self):
|
def test_constructor_encode(self):
|
||||||
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
|
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
|
||||||
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
|
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
|
||||||
|
|||||||
@@ -1,216 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
|
||||||
#
|
|
||||||
# Author: Neels Hofmeyr
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 2 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import math
|
|
||||||
from importlib import resources
|
|
||||||
import unittest
|
|
||||||
from pySim.esim.saip import param_source
|
|
||||||
|
|
||||||
import xo
|
|
||||||
update_expected_output = False
|
|
||||||
|
|
||||||
class D:
|
|
||||||
mandatory = set()
|
|
||||||
optional = set()
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
|
||||||
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
|
||||||
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
|
||||||
for k, v in kwargs.items():
|
|
||||||
setattr(self, k, v)
|
|
||||||
for k in self.optional:
|
|
||||||
if not hasattr(self, k):
|
|
||||||
setattr(self, k, None)
|
|
||||||
|
|
||||||
decimals = '0123456789'
|
|
||||||
hexadecimals = '0123456789abcdefABCDEF'
|
|
||||||
|
|
||||||
class FakeRandom:
|
|
||||||
vals = b'\xab\xcfm\xf0\x98J_\xcf\x96\x87fp5l\xe7f\xd1\xd6\x97\xc1\xf9]\x8c\x86+\xdb\t^ke\xc1r'
|
|
||||||
i = 0
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def next(cls):
|
|
||||||
cls.i = (cls.i + 1) % len(cls.vals)
|
|
||||||
return cls.vals[cls.i]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def randint(a, b):
|
|
||||||
d = b - a
|
|
||||||
n_bytes = math.ceil(math.log(d, 2))
|
|
||||||
r = int.from_bytes( bytes(FakeRandom.next() for i in range(n_bytes)) )
|
|
||||||
return a + (r % (b - a))
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def randbytes(n):
|
|
||||||
return bytes(FakeRandom.next() for i in range(n))
|
|
||||||
|
|
||||||
|
|
||||||
class ParamSourceTest(unittest.TestCase):
|
|
||||||
|
|
||||||
def test_param_source(self):
|
|
||||||
|
|
||||||
class ParamSourceTest(D):
|
|
||||||
mandatory = (
|
|
||||||
'param_source',
|
|
||||||
'n',
|
|
||||||
'expect',
|
|
||||||
)
|
|
||||||
optional = (
|
|
||||||
'expect_arg',
|
|
||||||
'csv_rows',
|
|
||||||
)
|
|
||||||
|
|
||||||
def expect_const(t, vals):
|
|
||||||
return tuple(t.expect_arg) == tuple(vals)
|
|
||||||
|
|
||||||
def expect_random(t, vals):
|
|
||||||
chars = t.expect_arg.get('digits')
|
|
||||||
repetitions = (t.n - len(set(vals)))
|
|
||||||
if repetitions:
|
|
||||||
raise RuntimeError(f'expect_random: there are {repetitions} repetitions in the returned values: {vals}')
|
|
||||||
for val_i in range(len(vals)):
|
|
||||||
v = vals[val_i]
|
|
||||||
val_minlen = t.expect_arg.get('val_minlen')
|
|
||||||
val_maxlen = t.expect_arg.get('val_maxlen')
|
|
||||||
if len(v) < val_minlen or len(v) > val_maxlen:
|
|
||||||
raise RuntimeError(f'expect_random: invalid length {len(v)} for value [{val_i}]: {v!r}, expecting'
|
|
||||||
f' {val_minlen}..{val_maxlen}')
|
|
||||||
|
|
||||||
if chars is not None and not all(c in chars for c in v):
|
|
||||||
raise RuntimeError(f'expect_random: invalid char in value [{val_i}]: {v!r}')
|
|
||||||
return True
|
|
||||||
|
|
||||||
param_source_tests = [
|
|
||||||
ParamSourceTest(param_source=param_source.ConstantSource.from_str('123'),
|
|
||||||
n=3,
|
|
||||||
expect=expect_const,
|
|
||||||
expect_arg=('123', '123', '123')
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('12345'),
|
|
||||||
n=3,
|
|
||||||
expect=expect_random,
|
|
||||||
expect_arg={'digits': decimals,
|
|
||||||
'val_minlen': 5,
|
|
||||||
'val_maxlen': 5,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('1..999'),
|
|
||||||
n=10,
|
|
||||||
expect=expect_random,
|
|
||||||
expect_arg={'digits': decimals,
|
|
||||||
'val_minlen': 1,
|
|
||||||
'val_maxlen': 3,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('001..999'),
|
|
||||||
n=10,
|
|
||||||
expect=expect_random,
|
|
||||||
expect_arg={'digits': decimals,
|
|
||||||
'val_minlen': 3,
|
|
||||||
'val_maxlen': 3,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
|
|
||||||
n=3,
|
|
||||||
expect=expect_random,
|
|
||||||
expect_arg={'digits': hexadecimals,
|
|
||||||
'val_minlen': 8,
|
|
||||||
'val_maxlen': 8,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
|
|
||||||
n=3,
|
|
||||||
expect=expect_random,
|
|
||||||
expect_arg={'digits': hexadecimals,
|
|
||||||
'val_minlen': 8,
|
|
||||||
'val_maxlen': 8,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
|
|
||||||
n=3,
|
|
||||||
expect=expect_random,
|
|
||||||
expect_arg={'digits': hexadecimals,
|
|
||||||
'val_minlen': 8,
|
|
||||||
'val_maxlen': 8,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.IncDigitSource.from_str('10001'),
|
|
||||||
n=3,
|
|
||||||
expect=expect_const,
|
|
||||||
expect_arg=('10001', '10002', '10003')
|
|
||||||
),
|
|
||||||
ParamSourceTest(param_source=param_source.CsvSource('column_name'),
|
|
||||||
n=3,
|
|
||||||
expect=expect_const,
|
|
||||||
expect_arg=('first val', 'second val', 'third val'),
|
|
||||||
csv_rows=(
|
|
||||||
{'column_name': 'first val',},
|
|
||||||
{'column_name': 'second val',},
|
|
||||||
{'column_name': 'third val',},
|
|
||||||
)
|
|
||||||
),
|
|
||||||
]
|
|
||||||
|
|
||||||
outputs = []
|
|
||||||
|
|
||||||
for t in param_source_tests:
|
|
||||||
try:
|
|
||||||
if hasattr(t.param_source, 'random_impl'):
|
|
||||||
t.param_source.random_impl = FakeRandom
|
|
||||||
|
|
||||||
vals = []
|
|
||||||
for i in range(t.n):
|
|
||||||
csv_row = None
|
|
||||||
if t.csv_rows is not None:
|
|
||||||
csv_row = t.csv_rows[i]
|
|
||||||
vals.append( t.param_source.get_next(csv_row=csv_row) )
|
|
||||||
if not t.expect(t, vals):
|
|
||||||
raise RuntimeError(f'invalid values returned: returned {vals}')
|
|
||||||
output = f'ok: {t.param_source.__class__.__name__} {vals=!r}'
|
|
||||||
outputs.append(output)
|
|
||||||
print(output)
|
|
||||||
except RuntimeError as e:
|
|
||||||
raise RuntimeError(f'{t.param_source.__class__.__name__} {t.n=} {t.expect.__name__}({t.expect_arg!r}): {e}') from e
|
|
||||||
|
|
||||||
output = '\n'.join(outputs) + '\n'
|
|
||||||
xo_name = 'test_param_src'
|
|
||||||
if update_expected_output:
|
|
||||||
with resources.path(xo, xo_name) as xo_path:
|
|
||||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
|
||||||
f.write(output)
|
|
||||||
else:
|
|
||||||
xo_str = resources.read_text(xo, xo_name)
|
|
||||||
if xo_str != output:
|
|
||||||
at = 0
|
|
||||||
while at < len(output):
|
|
||||||
if output[at] == xo_str[at]:
|
|
||||||
at += 1
|
|
||||||
continue
|
|
||||||
break
|
|
||||||
|
|
||||||
raise RuntimeError(f'output differs from expected output at position {at}: {xo_str[at:at+128]!r}')
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
if '-u' in sys.argv:
|
|
||||||
update_expected_output = True
|
|
||||||
sys.argv.remove('-u')
|
|
||||||
unittest.main()
|
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -1,9 +0,0 @@
|
|||||||
ok: ConstantSource vals=['123', '123', '123']
|
|
||||||
ok: RandomDigitSource vals=['13987', '49298', '55670']
|
|
||||||
ok: RandomDigitSource vals=['650', '580', '49', '885', '497', '195', '320', '137', '245', '663']
|
|
||||||
ok: RandomDigitSource vals=['638', '025', '232', '779', '826', '972', '650', '580', '049', '885']
|
|
||||||
ok: RandomHexDigitSource vals=['6b65c172', 'abcf6df0', '984a5fcf']
|
|
||||||
ok: RandomHexDigitSource vals=['96876670', '356ce766', 'd1d697c1']
|
|
||||||
ok: RandomHexDigitSource vals=['f95d8c86', '2bdb095e', '6b65c172']
|
|
||||||
ok: IncDigitSource vals=['10001', '10002', '10003']
|
|
||||||
ok: CsvSource vals=['first val', 'second val', 'third val']
|
|
||||||
Reference in New Issue
Block a user