mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-06-24 08:48:30 +03:00
Compare commits
47 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d0e6a1b119 | |||
| 980282cc12 | |||
| 728940efb2 | |||
| cfe2b94f67 | |||
| 861ed0a1d8 | |||
| b576e8fcff | |||
| 38f93d974b | |||
| c5e7e59928 | |||
| 98af3dd2e9 | |||
| e9ff4f3b93 | |||
| ce039d69ba | |||
| aad92f2b73 | |||
| 512aba8b1d | |||
| b5ba274583 | |||
| 4307cffc82 | |||
| bfdfcad22c | |||
| ef0a2fcb37 | |||
| 3974e96933 | |||
| a7c762eb2e | |||
| 710a27d6cf | |||
| 08f40db8a3 | |||
| 4fb393e6ea | |||
| ce5da32a75 | |||
| 9ddd235a2c | |||
| 77eb30a782 | |||
| 2530329ae2 | |||
| f9e4291a43 | |||
| 20538775b2 | |||
| ef58c94dfe | |||
| 810c51c38f | |||
| 66d3b54f92 | |||
| 7d11f91778 | |||
| 58a324126e | |||
| 3cd5c41fb4 | |||
| 593bfa0911 | |||
| 8fa7727a14 | |||
| f1609424de | |||
| 1167b65e2a | |||
| cd4b01f67e | |||
| 393de033d3 | |||
| 5f1c7d603c | |||
| d7072e9263 | |||
| ac593bb14d | |||
| a95622a022 | |||
| 03b58985a5 | |||
| cc71dbf899 | |||
| aafc8d51c3 |
+20
-14
@@ -10,6 +10,11 @@
|
|||||||
|
|
||||||
export PYTHONUNBUFFERED=1
|
export PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
|
setup_venv() {
|
||||||
|
virtualenv -p python3 venv --system-site-packages
|
||||||
|
. venv/bin/activate
|
||||||
|
}
|
||||||
|
|
||||||
if [ ! -d "./tests/" ] ; then
|
if [ ! -d "./tests/" ] ; then
|
||||||
echo "###############################################"
|
echo "###############################################"
|
||||||
echo "Please call from pySim-prog top directory"
|
echo "Please call from pySim-prog top directory"
|
||||||
@@ -23,8 +28,7 @@ fi
|
|||||||
|
|
||||||
case "$JOB_TYPE" in
|
case "$JOB_TYPE" in
|
||||||
"test")
|
"test")
|
||||||
virtualenv -p python3 venv --system-site-packages
|
setup_venv
|
||||||
. venv/bin/activate
|
|
||||||
|
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
pip install pyshark
|
pip install pyshark
|
||||||
@@ -32,23 +36,27 @@ case "$JOB_TYPE" in
|
|||||||
# Execute automatically discovered unit tests first
|
# Execute automatically discovered unit tests first
|
||||||
python -m unittest discover -v -s tests/unittests
|
python -m unittest discover -v -s tests/unittests
|
||||||
|
|
||||||
# Run pySim-prog integration tests (requires physical cards)
|
|
||||||
cd tests/pySim-prog_test/
|
|
||||||
./pySim-prog_test.sh
|
|
||||||
cd ../../
|
|
||||||
|
|
||||||
# Run pySim-trace test
|
# Run pySim-trace test
|
||||||
tests/pySim-trace_test/pySim-trace_test.sh
|
tests/pySim-trace_test/pySim-trace_test.sh
|
||||||
|
;;
|
||||||
|
"card-test") # tests requiring physical cards
|
||||||
|
setup_venv
|
||||||
|
|
||||||
# Run pySim-shell integration tests (requires physical cards)
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
# Run pySim-prog integration tests
|
||||||
|
cd tests/pySim-prog_test/
|
||||||
|
./pySim-prog_test.sh
|
||||||
|
cd ../../
|
||||||
|
|
||||||
|
# Run pySim-shell integration tests
|
||||||
python3 -m unittest discover -v -s ./tests/pySim-shell_test/
|
python3 -m unittest discover -v -s ./tests/pySim-shell_test/
|
||||||
|
|
||||||
# Run pySim-smpp2sim test
|
# Run pySim-smpp2sim test
|
||||||
tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
|
tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
|
||||||
;;
|
;;
|
||||||
"distcheck")
|
"distcheck")
|
||||||
virtualenv -p python3 venv --system-site-packages
|
setup_venv
|
||||||
. venv/bin/activate
|
|
||||||
|
|
||||||
pip install .
|
pip install .
|
||||||
pip install pyshark
|
pip install pyshark
|
||||||
@@ -61,8 +69,7 @@ case "$JOB_TYPE" in
|
|||||||
# Print pylint version
|
# Print pylint version
|
||||||
pip3 freeze | grep pylint
|
pip3 freeze | grep pylint
|
||||||
|
|
||||||
virtualenv -p python3 venv --system-site-packages
|
setup_venv
|
||||||
. venv/bin/activate
|
|
||||||
|
|
||||||
pip install .
|
pip install .
|
||||||
|
|
||||||
@@ -80,8 +87,7 @@ case "$JOB_TYPE" in
|
|||||||
contrib/*.py
|
contrib/*.py
|
||||||
;;
|
;;
|
||||||
"docs")
|
"docs")
|
||||||
virtualenv -p python3 venv --system-site-packages
|
setup_venv
|
||||||
. venv/bin/activate
|
|
||||||
|
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
|||||||
+1
-1
@@ -640,7 +640,7 @@ class SmDppHttpServer:
|
|||||||
# look up profile based on matchingID. We simply check if a given file exists for now..
|
# look up profile based on matchingID. We simply check if a given file exists for now..
|
||||||
path = os.path.join(self.upp_dir, matchingId) + '.der'
|
path = os.path.join(self.upp_dir, matchingId) + '.der'
|
||||||
# prevent directory traversal attack
|
# prevent directory traversal attack
|
||||||
if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir:
|
if os.path.commonpath((os.path.realpath(path),self.upp_dir)) != self.upp_dir:
|
||||||
raise ApiError('8.2.6', '3.8', 'Refused')
|
raise ApiError('8.2.6', '3.8', 'Refused')
|
||||||
if not os.path.isfile(path) or not os.access(path, os.R_OK):
|
if not os.path.isfile(path) or not os.access(path, os.R_OK):
|
||||||
raise ApiError('8.2.6', '3.8', 'Refused')
|
raise ApiError('8.2.6', '3.8', 'Refused')
|
||||||
|
|||||||
+5
-24
@@ -69,8 +69,8 @@ from pySim.ts_102_222 import Ts102222Commands
|
|||||||
from pySim.gsm_r import DF_EIRENE
|
from pySim.gsm_r import DF_EIRENE
|
||||||
from pySim.cat import ProactiveCommand
|
from pySim.cat import ProactiveCommand
|
||||||
|
|
||||||
from pySim.card_key_provider import CardKeyProviderCsv, CardKeyProviderPgsql
|
from pySim.card_key_provider import card_key_provider_argparse_add_args, card_key_provider_init
|
||||||
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
|
from pySim.card_key_provider import card_key_provider_get_field, card_key_provider_get
|
||||||
|
|
||||||
from pySim.app import init_card
|
from pySim.app import init_card
|
||||||
|
|
||||||
@@ -1146,18 +1146,6 @@ global_group.add_argument("--skip-card-init", help="Skip all card/profile initia
|
|||||||
global_group.add_argument("--verbose", help="Enable verbose logging",
|
global_group.add_argument("--verbose", help="Enable verbose logging",
|
||||||
action='store_true', default=False)
|
action='store_true', default=False)
|
||||||
|
|
||||||
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
|
|
||||||
card_key_group.add_argument('--csv', metavar='FILE',
|
|
||||||
default="~/.osmocom/pysim/card_data.csv",
|
|
||||||
help='Read card data from CSV file')
|
|
||||||
card_key_group.add_argument('--pgsql', metavar='FILE',
|
|
||||||
default="~/.osmocom/pysim/card_data_pgsql.cfg",
|
|
||||||
help='Read card data from PostgreSQL database (config file)')
|
|
||||||
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
|
||||||
help=argparse.SUPPRESS, dest='column_key')
|
|
||||||
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
|
||||||
help='per-column AES transport key', dest='column_key')
|
|
||||||
|
|
||||||
adm_group = global_group.add_mutually_exclusive_group()
|
adm_group = global_group.add_mutually_exclusive_group()
|
||||||
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
|
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
|
||||||
help='ADM PIN used for provisioning (overwrites default)')
|
help='ADM PIN used for provisioning (overwrites default)')
|
||||||
@@ -1170,6 +1158,7 @@ option_parser.add_argument("command", nargs='?',
|
|||||||
help="A pySim-shell command that would optionally be executed at startup")
|
help="A pySim-shell command that would optionally be executed at startup")
|
||||||
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
|
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
|
||||||
help="Optional Arguments for command")
|
help="Optional Arguments for command")
|
||||||
|
card_key_provider_argparse_add_args(option_parser)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
startup_errors = False
|
startup_errors = False
|
||||||
@@ -1178,16 +1167,8 @@ if __name__ == '__main__':
|
|||||||
# Ensure that we are able to print formatted warnings from the beginning.
|
# Ensure that we are able to print formatted warnings from the beginning.
|
||||||
PySimLogger.setup(print, {logging.WARN: YELLOW}, opts.verbose)
|
PySimLogger.setup(print, {logging.WARN: YELLOW}, opts.verbose)
|
||||||
|
|
||||||
# Register csv-file as card data provider, either from specified CSV
|
# Init card key provider for automatic card key retrieval
|
||||||
# or from CSV file in home directory
|
card_key_provider_init(opts)
|
||||||
column_keys = {}
|
|
||||||
for par in opts.column_key:
|
|
||||||
name, key = par.split(':')
|
|
||||||
column_keys[name] = key
|
|
||||||
if os.path.isfile(os.path.expanduser(opts.csv)):
|
|
||||||
card_key_provider_register(CardKeyProviderCsv(os.path.expanduser(opts.csv), column_keys))
|
|
||||||
if os.path.isfile(os.path.expanduser(opts.pgsql)):
|
|
||||||
card_key_provider_register(CardKeyProviderPgsql(os.path.expanduser(opts.pgsql), column_keys))
|
|
||||||
|
|
||||||
# Init card reader driver
|
# Init card reader driver
|
||||||
sl = init_reader(opts, proactive_handler = Proact())
|
sl = init_reader(opts, proactive_handler = Proact())
|
||||||
|
|||||||
+7
-4
@@ -26,6 +26,9 @@ from pySim.cdma_ruim import CardProfileRUIM
|
|||||||
from pySim.ts_102_221 import CardProfileUICC
|
from pySim.ts_102_221 import CardProfileUICC
|
||||||
from pySim.utils import all_subclasses
|
from pySim.utils import all_subclasses
|
||||||
from pySim.exceptions import SwMatchError
|
from pySim.exceptions import SwMatchError
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
|
||||||
|
log = PySimLogger.get(__name__)
|
||||||
|
|
||||||
# we need to import this module so that the SysmocomSJA2 sub-class of
|
# we need to import this module so that the SysmocomSJA2 sub-class of
|
||||||
# CardModel is created, which will add the ATR-based matching and
|
# CardModel is created, which will add the ATR-based matching and
|
||||||
@@ -54,7 +57,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
|
|||||||
|
|
||||||
# Wait up to three seconds for a card in reader and try to detect
|
# Wait up to three seconds for a card in reader and try to detect
|
||||||
# the card type.
|
# the card type.
|
||||||
print("Waiting for card...")
|
log.info("Waiting for card...")
|
||||||
sl.wait_for_card(3)
|
sl.wait_for_card(3)
|
||||||
|
|
||||||
# The user may opt to skip all card initialization. In this case only the
|
# The user may opt to skip all card initialization. In this case only the
|
||||||
@@ -66,7 +69,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
|
|||||||
generic_card = False
|
generic_card = False
|
||||||
card = card_detect(scc)
|
card = card_detect(scc)
|
||||||
if card is None:
|
if card is None:
|
||||||
print("Warning: Could not detect card type - assuming a generic card type...")
|
log.warning("Could not detect card type - assuming a generic card type...")
|
||||||
card = SimCardBase(scc)
|
card = SimCardBase(scc)
|
||||||
generic_card = True
|
generic_card = True
|
||||||
|
|
||||||
@@ -76,7 +79,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
|
|||||||
# just means that pySim was unable to recognize the card profile. This
|
# just means that pySim was unable to recognize the card profile. This
|
||||||
# may happen in particular with unprovisioned cards that do not have
|
# may happen in particular with unprovisioned cards that do not have
|
||||||
# any files on them yet.
|
# any files on them yet.
|
||||||
print("Unsupported card type!")
|
log.warning("Unsupported card type!")
|
||||||
return None, card
|
return None, card
|
||||||
|
|
||||||
# ETSI TS 102 221, Table 9.3 specifies a default for the PIN key
|
# ETSI TS 102 221, Table 9.3 specifies a default for the PIN key
|
||||||
@@ -87,7 +90,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
|
|||||||
if generic_card and isinstance(profile, CardProfileUICC):
|
if generic_card and isinstance(profile, CardProfileUICC):
|
||||||
card._adm_chv_num = 0x0A
|
card._adm_chv_num = 0x0A
|
||||||
|
|
||||||
print("Info: Card is of type: %s" % str(profile))
|
log.info("Card is of type: %s", str(profile))
|
||||||
|
|
||||||
# FIXME: this shouldn't really be here but somewhere else/more generic.
|
# FIXME: this shouldn't really be here but somewhere else/more generic.
|
||||||
# We cannot do it within pySim/profile.py as that would create circular
|
# We cannot do it within pySim/profile.py as that would create circular
|
||||||
|
|||||||
+2
-2
@@ -334,10 +334,10 @@ class ADF_ARAM(CardADF):
|
|||||||
apdu_grp.add_argument(
|
apdu_grp.add_argument(
|
||||||
'--apdu-filter', help='APDU filter: multiple groups of 8 hex bytes (4 byte CLA/INS/P1/P2 followed by 4 byte mask)')
|
'--apdu-filter', help='APDU filter: multiple groups of 8 hex bytes (4 byte CLA/INS/P1/P2 followed by 4 byte mask)')
|
||||||
nfc_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
|
nfc_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
|
||||||
nfc_grp.add_argument('--nfc-always', action='store_true',
|
|
||||||
help='NFC event access is allowed')
|
|
||||||
nfc_grp.add_argument('--nfc-never', action='store_true',
|
nfc_grp.add_argument('--nfc-never', action='store_true',
|
||||||
help='NFC event access is not allowed')
|
help='NFC event access is not allowed')
|
||||||
|
nfc_grp.add_argument('--nfc-always', action='store_true',
|
||||||
|
help='NFC event access is allowed')
|
||||||
store_ref_ar_do_parse.add_argument(
|
store_ref_ar_do_parse.add_argument(
|
||||||
'--android-permissions', help='Android UICC Carrier Privilege Permissions (8 hex bytes)')
|
'--android-permissions', help='Android UICC Carrier Privilege Permissions (8 hex bytes)')
|
||||||
|
|
||||||
|
|||||||
@@ -33,10 +33,12 @@ from Cryptodome.Cipher import AES
|
|||||||
from osmocom.utils import h2b, b2h
|
from osmocom.utils import h2b, b2h
|
||||||
from pySim.log import PySimLogger
|
from pySim.log import PySimLogger
|
||||||
|
|
||||||
|
import os
|
||||||
import abc
|
import abc
|
||||||
import csv
|
import csv
|
||||||
import logging
|
import logging
|
||||||
import yaml
|
import yaml
|
||||||
|
import argparse
|
||||||
|
|
||||||
log = PySimLogger.get(__name__)
|
log = PySimLogger.get(__name__)
|
||||||
|
|
||||||
@@ -130,6 +132,31 @@ class CardKeyFieldCryptor:
|
|||||||
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
||||||
return b2h(cipher.encrypt(h2b(plaintext_val)))
|
return b2h(cipher.encrypt(h2b(plaintext_val)))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def argparse_add_args(arg_parser: argparse.ArgumentParser):
|
||||||
|
arg_parser.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||||
|
help='per-column AES transport key', dest='column_key')
|
||||||
|
# Depprecated argument, replaced by --column-key (see above)
|
||||||
|
arg_parser.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||||
|
help=argparse.SUPPRESS, dest='column_key')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def transport_keys_from_opts(opts: argparse.Namespace) -> dict:
|
||||||
|
"""
|
||||||
|
Transport keys are passed via the commandline using the '--column-key' option. Each column requires a
|
||||||
|
dedicated transport key. This method can be used to extract the column keys parameters from the commandline
|
||||||
|
options into a dict that can be directly passed to the construtor with the transport_keys argument.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
opts: parsed commandline options (Namespace)
|
||||||
|
"""
|
||||||
|
|
||||||
|
transport_keys = {}
|
||||||
|
for par in opts.column_key:
|
||||||
|
name, key = par.split(':')
|
||||||
|
transport_keys[name] = key
|
||||||
|
return transport_keys
|
||||||
|
|
||||||
class CardKeyProvider(abc.ABC):
|
class CardKeyProvider(abc.ABC):
|
||||||
"""Base class, not containing any concrete implementation."""
|
"""Base class, not containing any concrete implementation."""
|
||||||
|
|
||||||
@@ -148,24 +175,33 @@ class CardKeyProvider(abc.ABC):
|
|||||||
fond None shall be returned.
|
fond None shall be returned.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def argparse_add_args(arg_parser: argparse.ArgumentParser):
|
||||||
|
"""
|
||||||
|
Add the commandline arguments relevant for this card key provider.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
arg_parser : argument parser group
|
||||||
|
"""
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return type(self).__name__
|
return type(self).__name__
|
||||||
|
|
||||||
class CardKeyProviderCsv(CardKeyProvider):
|
class CardKeyProviderCsv(CardKeyProvider):
|
||||||
"""Card key provider implementation that allows to query against a specified CSV file."""
|
"""Card key provider implementation that allows to query against a specified CSV file."""
|
||||||
|
|
||||||
def __init__(self, csv_filename: str, transport_keys: dict):
|
def __init__(self, csv_filename: str, field_cryptor: CardKeyFieldCryptor):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
csv_filename : file name (path) of CSV file containing card-individual key/data
|
csv_filename : file name (path) of CSV file containing card-individual key/data
|
||||||
transport_keys : (see class CardKeyFieldCryptor)
|
field_cryptor : (see class CardKeyFieldCryptor)
|
||||||
"""
|
"""
|
||||||
log.info("Using CSV file as card key data source: %s" % csv_filename)
|
log.info("Using CSV file as card key data source: %s" % csv_filename)
|
||||||
self.csv_file = open(csv_filename, 'r')
|
self.csv_file = open(csv_filename, 'r')
|
||||||
if not self.csv_file:
|
if not self.csv_file:
|
||||||
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
||||||
self.csv_filename = csv_filename
|
self.csv_filename = csv_filename
|
||||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
self.crypt = field_cryptor
|
||||||
|
|
||||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||||
self.csv_file.seek(0)
|
self.csv_file.seek(0)
|
||||||
@@ -188,14 +224,20 @@ class CardKeyProviderCsv(CardKeyProvider):
|
|||||||
return None
|
return None
|
||||||
return return_dict
|
return return_dict
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def argparse_add_args(arg_parser: argparse.ArgumentParser):
|
||||||
|
arg_parser.add_argument('--csv', metavar='FILE',
|
||||||
|
default="~/.osmocom/pysim/card_data.csv",
|
||||||
|
help='Read card data from CSV file')
|
||||||
|
|
||||||
class CardKeyProviderPgsql(CardKeyProvider):
|
class CardKeyProviderPgsql(CardKeyProvider):
|
||||||
"""Card key provider implementation that allows to query against a specified PostgreSQL database table."""
|
"""Card key provider implementation that allows to query against a specified PostgreSQL database table."""
|
||||||
|
|
||||||
def __init__(self, config_filename: str, transport_keys: dict):
|
def __init__(self, config_filename: str, field_cryptor: CardKeyFieldCryptor):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
config_filename : file name (path) of CSV file containing card-individual key/data
|
config_filename : file name (path) of CSV file containing card-individual key/data
|
||||||
transport_keys : (see class CardKeyFieldCryptor)
|
field_cryptor : (see class CardKeyFieldCryptor)
|
||||||
"""
|
"""
|
||||||
import psycopg2
|
import psycopg2
|
||||||
log.info("Using SQL database as card key data source: %s" % config_filename)
|
log.info("Using SQL database as card key data source: %s" % config_filename)
|
||||||
@@ -212,7 +254,7 @@ class CardKeyProviderPgsql(CardKeyProvider):
|
|||||||
host=config.get('host'))
|
host=config.get('host'))
|
||||||
self.tables = config.get('table_names')
|
self.tables = config.get('table_names')
|
||||||
log.info("Card key database tables: %s" % str(self.tables))
|
log.info("Card key database tables: %s" % str(self.tables))
|
||||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
self.crypt = field_cryptor
|
||||||
|
|
||||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||||
import psycopg2
|
import psycopg2
|
||||||
@@ -252,6 +294,11 @@ class CardKeyProviderPgsql(CardKeyProvider):
|
|||||||
result[k] = self.crypt.decrypt_field(k, result.get(k))
|
result[k] = self.crypt.decrypt_field(k, result.get(k))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def argparse_add_args(arg_parser: argparse.ArgumentParser):
|
||||||
|
arg_parser.add_argument('--pgsql', metavar='FILE',
|
||||||
|
default="~/.osmocom/pysim/card_data_pgsql.cfg",
|
||||||
|
help='Read card data from PostgreSQL database (config file)')
|
||||||
|
|
||||||
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
||||||
"""Register a new card key provider.
|
"""Register a new card key provider.
|
||||||
@@ -305,3 +352,19 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
|
|||||||
fields = [field]
|
fields = [field]
|
||||||
result = card_key_provider_get(fields, key, value, card_key_providers)
|
result = card_key_provider_get(fields, key, value, card_key_providers)
|
||||||
return result.get(field.upper())
|
return result.get(field.upper())
|
||||||
|
|
||||||
|
def card_key_provider_argparse_add_args(arg_parser: argparse.ArgumentParser):
|
||||||
|
"""Add card key provider commandline options to the given argument parser"""
|
||||||
|
card_key_group = arg_parser.add_argument_group('Card Key Provider Options')
|
||||||
|
CardKeyProviderCsv.argparse_add_args(card_key_group)
|
||||||
|
CardKeyProviderPgsql.argparse_add_args(card_key_group)
|
||||||
|
CardKeyFieldCryptor.argparse_add_args(card_key_group)
|
||||||
|
|
||||||
|
def card_key_provider_init(opts: argparse.Namespace):
|
||||||
|
"""Initialize card key provider depending on the user provided commandline options"""
|
||||||
|
transport_keys = CardKeyFieldCryptor.transport_keys_from_opts(opts)
|
||||||
|
card_key_field_cryptor = CardKeyFieldCryptor(transport_keys)
|
||||||
|
if os.path.isfile(os.path.expanduser(opts.csv)):
|
||||||
|
card_key_provider_register(CardKeyProviderCsv(os.path.expanduser(opts.csv), card_key_field_cryptor))
|
||||||
|
if os.path.isfile(os.path.expanduser(opts.pgsql)):
|
||||||
|
card_key_provider_register(CardKeyProviderPgsql(os.path.expanduser(opts.pgsql), card_key_field_cryptor))
|
||||||
|
|||||||
@@ -1079,6 +1079,13 @@ class SecurityDomainKey:
|
|||||||
'keyVersionNumber': bytes([self.key_version_number]),
|
'keyVersionNumber': bytes([self.key_version_number]),
|
||||||
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
||||||
|
|
||||||
|
def get_key_component(self, key_type):
|
||||||
|
for kc in self.key_components:
|
||||||
|
if kc.key_type == key_type:
|
||||||
|
return kc.key_data
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
class ProfileElementSD(ProfileElement):
|
class ProfileElementSD(ProfileElement):
|
||||||
"""Class representing a securityDomain ProfileElement."""
|
"""Class representing a securityDomain ProfileElement."""
|
||||||
type = 'securityDomain'
|
type = 'securityDomain'
|
||||||
|
|||||||
@@ -0,0 +1,360 @@
|
|||||||
|
"""Implementation of Personalization of eSIM profiles in SimAlliance/TCA Interoperable Profile:
|
||||||
|
Run a batch of N personalizations"""
|
||||||
|
|
||||||
|
# (C) 2025-2026 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||||
|
#
|
||||||
|
# Author: nhofmeyr@sysmocom.de
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import pprint
|
||||||
|
from typing import Generator, Union
|
||||||
|
from pySim.esim.saip.personalization import ConfigurableParameter
|
||||||
|
from pySim.esim.saip import param_source
|
||||||
|
from pySim.esim.saip import ProfileElementSequence, ProfileElementSD
|
||||||
|
from pySim.global_platform import KeyUsageQualifier
|
||||||
|
from osmocom.utils import b2h
|
||||||
|
|
||||||
|
# a list of ConfigurableParameter classes and/or ConfigurableParameter class instances
|
||||||
|
ParamList = list[Union[type[ConfigurableParameter], ConfigurableParameter]]
|
||||||
|
|
||||||
|
class BatchPersonalization:
|
||||||
|
"""Produce a series of eSIM profiles from predefined parameters.
|
||||||
|
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
|
||||||
|
|
||||||
|
Usage example:
|
||||||
|
|
||||||
|
der_input = open('some_file', 'rb').read()
|
||||||
|
pes = ProfileElementSequence.from_der(der_input)
|
||||||
|
p = BatchPersonalization(
|
||||||
|
n=10,
|
||||||
|
src_pes=pes,
|
||||||
|
csv_rows=get_csv_reader())
|
||||||
|
|
||||||
|
p.add_param_and_src(
|
||||||
|
personalization.Iccid(),
|
||||||
|
param_source.IncDigitSource(
|
||||||
|
num_digits=18,
|
||||||
|
first_value=123456789012340001,
|
||||||
|
last_value=123456789012340010))
|
||||||
|
|
||||||
|
# add more parameters here, using ConfigurableParameter and ParamSource subclass instances to define the profile
|
||||||
|
# ...
|
||||||
|
|
||||||
|
# generate all 10 profiles (from n=10 above)
|
||||||
|
for result_pes in p.generate_profiles():
|
||||||
|
upp = result_pes.to_der()
|
||||||
|
store_upp(upp)
|
||||||
|
"""
|
||||||
|
|
||||||
|
class ParamAndSrc:
|
||||||
|
"""tie a ConfigurableParameter to a source of actual values"""
|
||||||
|
def __init__(self, param: ConfigurableParameter, src: param_source.ParamSource):
|
||||||
|
if isinstance(param, type):
|
||||||
|
self.param_cls = param
|
||||||
|
else:
|
||||||
|
self.param_cls = param.__class__
|
||||||
|
self.src = src
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
n: int,
|
||||||
|
src_pes: ProfileElementSequence,
|
||||||
|
params: list[ParamAndSrc]=None,
|
||||||
|
csv_rows: Generator=None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
n: number of eSIM profiles to generate.
|
||||||
|
src_pes: a decoded eSIM profile as ProfileElementSequence, to serve as template. This is not modified, only
|
||||||
|
copied.
|
||||||
|
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
|
||||||
|
profile values.
|
||||||
|
csv_rows: A generator (e.g. iter(list_of_rows)) producing all CSV rows one at a time, starting with a row
|
||||||
|
containing the column headers. This is compatible with the python csv.reader. Each row gets passed to
|
||||||
|
ParamSource.get_next(), such that ParamSource implementations can access the row items. See
|
||||||
|
param_source.CsvSource.
|
||||||
|
"""
|
||||||
|
self.n = n
|
||||||
|
self.params = params or []
|
||||||
|
self.src_pes = src_pes
|
||||||
|
self.csv_rows = csv_rows
|
||||||
|
|
||||||
|
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
|
||||||
|
self.params.append(BatchPersonalization.ParamAndSrc(param, src))
|
||||||
|
|
||||||
|
def generate_profiles(self):
|
||||||
|
# get first row of CSV: column names
|
||||||
|
csv_columns = None
|
||||||
|
if self.csv_rows:
|
||||||
|
try:
|
||||||
|
csv_columns = next(self.csv_rows)
|
||||||
|
except StopIteration as e:
|
||||||
|
raise ValueError('the input CSV file appears to be empty') from e
|
||||||
|
|
||||||
|
for i in range(self.n):
|
||||||
|
csv_row = None
|
||||||
|
if self.csv_rows and csv_columns:
|
||||||
|
try:
|
||||||
|
csv_row_list = next(self.csv_rows)
|
||||||
|
except StopIteration as e:
|
||||||
|
raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e
|
||||||
|
|
||||||
|
csv_row = dict(zip(csv_columns, csv_row_list))
|
||||||
|
|
||||||
|
pes = copy.deepcopy(self.src_pes)
|
||||||
|
|
||||||
|
for p in self.params:
|
||||||
|
try:
|
||||||
|
input_value = p.src.get_next(csv_row=csv_row)
|
||||||
|
assert input_value is not None
|
||||||
|
value = p.param_cls.validate_val(input_value)
|
||||||
|
p.param_cls.apply_val(pes, value)
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e
|
||||||
|
|
||||||
|
yield pes
|
||||||
|
|
||||||
|
|
||||||
|
class UppAudit(dict):
|
||||||
|
"""
|
||||||
|
Key-value pairs collected from a single UPP DER or PES.
|
||||||
|
|
||||||
|
UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_der(cls, der: bytes, params: ParamList, der_size=False, additional_sd_keys=False):
|
||||||
|
"""return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
|
||||||
|
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
|
||||||
|
both 'IMSI' and 'IMSI-ACC' parameters.
|
||||||
|
|
||||||
|
e.g.
|
||||||
|
UppAudit.from_der(my_der, [Imsi, ])
|
||||||
|
--> {'IMSI': {'001010000000023'}, 'IMSI-ACC': {'5'}}
|
||||||
|
|
||||||
|
(where 'IMSI' == Imsi.name)
|
||||||
|
|
||||||
|
Read all parameters listed in params. params is a list of either ConfigurableParameter classes or
|
||||||
|
ConfigurableParameter class instances. This calls only classmethods, so each entry in params can either be the
|
||||||
|
class itself, or a class-instance of, a (non-abstract) ConfigurableParameter subclass.
|
||||||
|
For example, params = [Imsi, ] is equivalent to params = [Imsi(), ].
|
||||||
|
|
||||||
|
For der_size=True, also include a {'der_size':12345} entry.
|
||||||
|
|
||||||
|
For additional_sd_keys=True, output also all Security Domain KVN that there are *no* ConfigurableParameter
|
||||||
|
subclasses for. For example, SCP80 has reserved kvn 0x01..0x0f, but we offer only Scp80Kvn01, Scp80Kvn02,
|
||||||
|
Scp80Kvn03. So we would not show kvn 0x04..0x0f in an audit. additional_sd_keys=True includes audits of all SD
|
||||||
|
key KVN there may be in the UPP. This helps to spot SD keys that may already be present in a UPP template, with
|
||||||
|
unexpected / unusual kvn.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# make an instance of this class
|
||||||
|
upp_audit = cls()
|
||||||
|
|
||||||
|
if der_size:
|
||||||
|
upp_audit['der_size'] = set((len(der), ))
|
||||||
|
|
||||||
|
pes = ProfileElementSequence.from_der(der)
|
||||||
|
for param in params:
|
||||||
|
try:
|
||||||
|
for valdict in param.get_values_from_pes(pes):
|
||||||
|
upp_audit.add_values(valdict)
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f'Error during audit for parameter {param}: {e}') from e
|
||||||
|
|
||||||
|
if not additional_sd_keys:
|
||||||
|
return upp_audit
|
||||||
|
|
||||||
|
# additional_sd_keys
|
||||||
|
for pe in pes.pe_list:
|
||||||
|
if pe.type != 'securityDomain':
|
||||||
|
continue
|
||||||
|
assert isinstance(pe, ProfileElementSD)
|
||||||
|
|
||||||
|
for key in pe.keys:
|
||||||
|
audit_key = f'SdKey_KVN{key.key_version_number:02x}_ID{key.key_identifier:02x}'
|
||||||
|
kuq_bin = KeyUsageQualifier.build(key.key_usage_qualifier).hex()
|
||||||
|
audit_val = f'{key.key_components=!r} key_usage_qualifier=0x{kuq_bin}={key.key_usage_qualifier!r}'
|
||||||
|
upp_audit.add_values({audit_key: audit_val})
|
||||||
|
|
||||||
|
return upp_audit
|
||||||
|
|
||||||
|
def get_single_val(self, key, allow_absent=False, absent_val=None):
|
||||||
|
"""
|
||||||
|
Return the audit's value for the given audit key (like 'IMSI' or 'IMSI-ACC').
|
||||||
|
Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value,
|
||||||
|
return that value. When they do not agree, raise a ValueError.
|
||||||
|
"""
|
||||||
|
# key should be a string, but if someone passes a ConfigurableParameter, just use its default name
|
||||||
|
if ConfigurableParameter.is_super_of(key):
|
||||||
|
key = key.get_name()
|
||||||
|
|
||||||
|
assert isinstance(key, str)
|
||||||
|
v = self.get(key)
|
||||||
|
if v is None and allow_absent:
|
||||||
|
return absent_val
|
||||||
|
if not isinstance(v, set):
|
||||||
|
raise ValueError(f'audit value should be a set(), got {v!r}')
|
||||||
|
if len(v) != 1:
|
||||||
|
raise ValueError(f'expected a single value for {key}, got {v!r}')
|
||||||
|
v = tuple(v)[0]
|
||||||
|
return v
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def audit_val_to_str(v):
|
||||||
|
"""
|
||||||
|
Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values,
|
||||||
|
audit values are always python sets. Turn it into a nice string representation: only the value when it is
|
||||||
|
unambiguous, otherwise a list of the ambiguous values.
|
||||||
|
A value may also be completely absent, then return 'not present'.
|
||||||
|
"""
|
||||||
|
def try_single_val(w):
|
||||||
|
'change single-entry sets to just the single value'
|
||||||
|
if isinstance(w, set):
|
||||||
|
if len(w) == 1:
|
||||||
|
return tuple(w)[0]
|
||||||
|
if len(w) == 0:
|
||||||
|
return None
|
||||||
|
return w
|
||||||
|
|
||||||
|
v = try_single_val(v)
|
||||||
|
if isinstance(v, bytes):
|
||||||
|
v = b2h(v)
|
||||||
|
if v is None:
|
||||||
|
return 'not present'
|
||||||
|
return str(v)
|
||||||
|
|
||||||
|
def get_val_str(self, key):
|
||||||
|
"""Return a string of the value stored for the given key"""
|
||||||
|
return UppAudit.audit_val_to_str(self.get(key))
|
||||||
|
|
||||||
|
def add_values(self, src:dict):
|
||||||
|
"""Merge a plain dict of values into self, which is a dict of sets.
|
||||||
|
For example from
|
||||||
|
self == { 'a': {123} }
|
||||||
|
and
|
||||||
|
src == { 'a': 456, 'b': 789 }
|
||||||
|
then after this function call:
|
||||||
|
self == { 'a': {123, 456}, 'b': {789} }
|
||||||
|
"""
|
||||||
|
assert isinstance(src, dict)
|
||||||
|
for key, srcval in src.items():
|
||||||
|
dstvalset = self.get(key)
|
||||||
|
if dstvalset is None:
|
||||||
|
dstvalset = set()
|
||||||
|
self[key] = dstvalset
|
||||||
|
dstvalset.add(srcval)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys()))
|
||||||
|
|
||||||
|
class BatchAudit(list):
|
||||||
|
"""
|
||||||
|
Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization.
|
||||||
|
Produce an output CSV.
|
||||||
|
|
||||||
|
Usage example:
|
||||||
|
|
||||||
|
ba = BatchAudit(params=(personalization.Iccid, ))
|
||||||
|
for upp_der in upps:
|
||||||
|
ba.add_audit(upp_der)
|
||||||
|
print(ba.summarize())
|
||||||
|
|
||||||
|
with open('output.csv', 'wb') as csv_data:
|
||||||
|
csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='')
|
||||||
|
csv.writer(csv_str).writerows( ba.to_csv_rows() )
|
||||||
|
csv_str.flush()
|
||||||
|
|
||||||
|
BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, params: ParamList):
|
||||||
|
assert params
|
||||||
|
self.params = params
|
||||||
|
|
||||||
|
def add_audit(self, upp_der:bytes):
|
||||||
|
audit = UppAudit.from_der(upp_der, self.params)
|
||||||
|
self.append(audit)
|
||||||
|
return audit
|
||||||
|
|
||||||
|
def summarize(self):
|
||||||
|
batch_audit = UppAudit()
|
||||||
|
|
||||||
|
audits = self
|
||||||
|
|
||||||
|
if len(audits) > 2:
|
||||||
|
val_sep = ', ..., '
|
||||||
|
else:
|
||||||
|
val_sep = ', '
|
||||||
|
|
||||||
|
first_audit = None
|
||||||
|
last_audit = None
|
||||||
|
if len(audits) >= 1:
|
||||||
|
first_audit = audits[0]
|
||||||
|
if len(audits) >= 2:
|
||||||
|
last_audit = audits[-1]
|
||||||
|
|
||||||
|
if first_audit:
|
||||||
|
if last_audit:
|
||||||
|
for key in first_audit.keys():
|
||||||
|
first_val = first_audit.get_val_str(key)
|
||||||
|
last_val = last_audit.get_val_str(key)
|
||||||
|
|
||||||
|
if first_val == last_val:
|
||||||
|
val = first_val
|
||||||
|
else:
|
||||||
|
val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}"
|
||||||
|
val = val_sep_with_newline.join((first_val, last_val))
|
||||||
|
batch_audit[key] = val
|
||||||
|
else:
|
||||||
|
batch_audit.update(first_audit)
|
||||||
|
|
||||||
|
return batch_audit
|
||||||
|
|
||||||
|
def to_csv_rows(self, headers=True, sort_key=None):
|
||||||
|
"""generator that yields all audits' values as rows, useful feed to a csv.writer."""
|
||||||
|
columns = set()
|
||||||
|
for audit in self:
|
||||||
|
columns.update(audit.keys())
|
||||||
|
|
||||||
|
columns = tuple(sorted(columns, key=sort_key))
|
||||||
|
|
||||||
|
if headers:
|
||||||
|
yield columns
|
||||||
|
|
||||||
|
for audit in self:
|
||||||
|
yield (audit.get_single_val(col, allow_absent=True, absent_val="") for col in columns)
|
||||||
|
|
||||||
|
def esim_profile_introspect(upp):
|
||||||
|
pes = ProfileElementSequence.from_der(upp.read())
|
||||||
|
d = {}
|
||||||
|
d['upp'] = repr(pes)
|
||||||
|
|
||||||
|
def show_bytes_as_hexdump(item):
|
||||||
|
if isinstance(item, bytes):
|
||||||
|
return b2h(item)
|
||||||
|
if isinstance(item, list):
|
||||||
|
return list(show_bytes_as_hexdump(i) for i in item)
|
||||||
|
if isinstance(item, tuple):
|
||||||
|
return tuple(show_bytes_as_hexdump(i) for i in item)
|
||||||
|
if isinstance(item, dict):
|
||||||
|
d = {}
|
||||||
|
for k, v in item.items():
|
||||||
|
d[k] = show_bytes_as_hexdump(v)
|
||||||
|
return d
|
||||||
|
return item
|
||||||
|
|
||||||
|
l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes)
|
||||||
|
d['pp'] = pprint.pformat(l, width=120)
|
||||||
|
return d
|
||||||
@@ -0,0 +1,221 @@
|
|||||||
|
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
|
||||||
|
#
|
||||||
|
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||||
|
#
|
||||||
|
# Author: nhofmeyr@sysmocom.de
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import secrets
|
||||||
|
import re
|
||||||
|
from osmocom.utils import b2h
|
||||||
|
|
||||||
|
class ParamSourceExn(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ParamSourceExhaustedExn(ParamSourceExn):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ParamSourceUndefinedExn(ParamSourceExn):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ParamSource:
|
||||||
|
"""abstract parameter source. For usage, see personalization.BatchPersonalization."""
|
||||||
|
|
||||||
|
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
|
||||||
|
name = "none"
|
||||||
|
numeric_base = None # or 10 or 16
|
||||||
|
|
||||||
|
def __init__(self, input_str:str):
|
||||||
|
"""Subclasses should call super().__init__(input_str) before evaluating self.input_str. Each subclass __init__()
|
||||||
|
may in turn manipulate self.input_str to apply expansions or decodings."""
|
||||||
|
self.input_str = input_str
|
||||||
|
|
||||||
|
def get_next(self, csv_row:dict=None):
|
||||||
|
"""Subclasses implement this: return the next value from the parameter source.
|
||||||
|
When there are no more values from the source, raise a ParamSourceExhaustedExn.
|
||||||
|
This default implementation is an empty source."""
|
||||||
|
raise ParamSourceExhaustedExn()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_str(cls, input_str:str):
|
||||||
|
"""compatibility with earlier version of ParamSource. Just use the constructor."""
|
||||||
|
return cls(input_str)
|
||||||
|
|
||||||
|
class ConstantSource(ParamSource):
|
||||||
|
"""one value for all"""
|
||||||
|
name = "constant"
|
||||||
|
|
||||||
|
def get_next(self, csv_row:dict=None):
|
||||||
|
return self.input_str
|
||||||
|
|
||||||
|
class InputExpandingParamSource(ParamSource):
|
||||||
|
|
||||||
|
def __init__(self, input_str:str):
|
||||||
|
super().__init__(input_str)
|
||||||
|
self.input_str = self.expand_input_str(self.input_str)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def expand_input_str(cls, input_str:str):
|
||||||
|
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
|
||||||
|
if "*" not in input_str:
|
||||||
|
return input_str
|
||||||
|
# re: "XX * 123" with optional spaces
|
||||||
|
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", input_str)
|
||||||
|
if len(tokens) < 3:
|
||||||
|
return input_str
|
||||||
|
parts = []
|
||||||
|
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
|
||||||
|
parts.append(unchanged)
|
||||||
|
repeat = int(repeat_str)
|
||||||
|
parts.append(snippet * repeat)
|
||||||
|
|
||||||
|
return "".join(parts)
|
||||||
|
|
||||||
|
class DecimalRangeSource(InputExpandingParamSource):
|
||||||
|
"""abstract: decimal numbers with a value range"""
|
||||||
|
|
||||||
|
numeric_base = 10
|
||||||
|
|
||||||
|
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
|
||||||
|
"""Constructor to set up values from a (user entered) string: DecimalRangeSource(input_str).
|
||||||
|
Constructor to set up values directly: DecimalRangeSource(num_digits=3, first_value=123, last_value=456)
|
||||||
|
|
||||||
|
num_digits produces leading zeros when first_value..last_value are shorter.
|
||||||
|
"""
|
||||||
|
assert ((input_str is not None and (num_digits, first_value, last_value) == (None, None, None))
|
||||||
|
or (input_str is None and None not in (num_digits, first_value, last_value)))
|
||||||
|
|
||||||
|
if input_str is not None:
|
||||||
|
super().__init__(input_str)
|
||||||
|
|
||||||
|
input_str = self.input_str
|
||||||
|
|
||||||
|
if ".." in input_str:
|
||||||
|
first_str, last_str = input_str.split('..')
|
||||||
|
first_str = first_str.strip()
|
||||||
|
last_str = last_str.strip()
|
||||||
|
else:
|
||||||
|
first_str = input_str.strip()
|
||||||
|
last_str = None
|
||||||
|
|
||||||
|
num_digits = len(first_str)
|
||||||
|
first_value = int(first_str)
|
||||||
|
last_value = int(last_str if last_str is not None else "9" * num_digits)
|
||||||
|
|
||||||
|
assert num_digits > 0
|
||||||
|
assert first_value <= last_value
|
||||||
|
self.num_digits = num_digits
|
||||||
|
self.first_value = first_value
|
||||||
|
self.last_value = last_value
|
||||||
|
|
||||||
|
def val_to_digit(self, val:int):
|
||||||
|
return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string
|
||||||
|
|
||||||
|
class RandomSourceMixin:
|
||||||
|
random_impl = secrets.SystemRandom()
|
||||||
|
|
||||||
|
class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
|
||||||
|
"""return a different sequence of random decimal digits each"""
|
||||||
|
name = "random decimal digits"
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.used_keys = set()
|
||||||
|
|
||||||
|
def get_next(self, csv_row:dict=None):
|
||||||
|
# try to generate random digits that are always different from previously produced random digits
|
||||||
|
for _ in range(10):
|
||||||
|
val = self.random_impl.randint(self.first_value, self.last_value)
|
||||||
|
if val not in self.used_keys:
|
||||||
|
break
|
||||||
|
self.used_keys.add(val)
|
||||||
|
return self.val_to_digit(val)
|
||||||
|
|
||||||
|
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||||
|
"""return a different sequence of random hexadecimal digits each"""
|
||||||
|
name = "random hexadecimal digits"
|
||||||
|
numeric_base = 16
|
||||||
|
def __init__(self, input_str:str):
|
||||||
|
super().__init__(input_str)
|
||||||
|
input_str = self.input_str
|
||||||
|
|
||||||
|
num_digits = len(input_str.strip())
|
||||||
|
if num_digits < 1:
|
||||||
|
raise ValueError("zero number of digits")
|
||||||
|
# hex digits always come in two
|
||||||
|
if (num_digits & 1) != 0:
|
||||||
|
raise ValueError(f"hexadecimal value should have even number of digits, not {num_digits}")
|
||||||
|
self.num_digits = num_digits
|
||||||
|
self.used_keys = set()
|
||||||
|
|
||||||
|
def get_next(self, csv_row:dict=None):
|
||||||
|
# try to generate random bytes that are always different from previously produced random bytes
|
||||||
|
for _ in range(10):
|
||||||
|
val = self.random_impl.randbytes(self.num_digits // 2)
|
||||||
|
if val not in self.used_keys:
|
||||||
|
break
|
||||||
|
self.used_keys.add(val)
|
||||||
|
|
||||||
|
return b2h(val)
|
||||||
|
|
||||||
|
class IncDigitSource(DecimalRangeSource):
|
||||||
|
"""incrementing sequence of digits"""
|
||||||
|
name = "incrementing decimal digits"
|
||||||
|
|
||||||
|
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
|
||||||
|
"""input_str: the range of values to iterate. Format: 'FIRST..LAST' (e.g. '0001..9999') or
|
||||||
|
just 'FIRST' (iterates to the maximum value for the given digit width). Leading zeros in
|
||||||
|
FIRST determine the digit width and are preserved in returned values."""
|
||||||
|
super().__init__(input_str, num_digits, first_value, last_value)
|
||||||
|
self.next_val = None
|
||||||
|
self.reset()
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""Restart from the first value of the defined range passed to __init__()."""
|
||||||
|
self.next_val = self.first_value
|
||||||
|
|
||||||
|
def get_next(self, csv_row:dict=None):
|
||||||
|
val = self.next_val
|
||||||
|
if val is None:
|
||||||
|
raise ParamSourceExhaustedExn()
|
||||||
|
|
||||||
|
returnval = self.val_to_digit(val)
|
||||||
|
|
||||||
|
val += 1
|
||||||
|
if val > self.last_value:
|
||||||
|
self.next_val = None
|
||||||
|
else:
|
||||||
|
self.next_val = val
|
||||||
|
|
||||||
|
return returnval
|
||||||
|
|
||||||
|
class CsvSource(ParamSource):
|
||||||
|
"""apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)"""
|
||||||
|
name = "from CSV"
|
||||||
|
|
||||||
|
def __init__(self, input_str:str):
|
||||||
|
"""input_str: the CSV column name to read values from.
|
||||||
|
The caller passes the current CSV row to get_next(), from which CsvSource picks the column matching
|
||||||
|
this name."""
|
||||||
|
super().__init__(input_str)
|
||||||
|
self.csv_column = self.input_str
|
||||||
|
|
||||||
|
def get_next(self, csv_row:dict=None):
|
||||||
|
val = None
|
||||||
|
if csv_row:
|
||||||
|
val = csv_row.get(self.csv_column)
|
||||||
|
if val is None:
|
||||||
|
raise ParamSourceUndefinedExn(f"no value for CSV column {self.csv_column!r}")
|
||||||
|
return val
|
||||||
+555
-106
@@ -16,13 +16,22 @@
|
|||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
import enum
|
||||||
import io
|
import io
|
||||||
from typing import List, Tuple
|
import re
|
||||||
|
from typing import List, Tuple, Generator, Optional
|
||||||
|
|
||||||
from osmocom.tlv import camel_to_snake
|
from osmocom.tlv import camel_to_snake
|
||||||
from pySim.utils import enc_iccid, enc_imsi, h2b, rpad, sanitize_iccid
|
from osmocom.utils import hexstr
|
||||||
from pySim.esim.saip import ProfileElement, ProfileElementSequence
|
from pySim.utils import enc_iccid, dec_iccid, enc_imsi, dec_imsi, h2b, b2h, rpad, sanitize_iccid
|
||||||
from pySim.ts_51_011 import EF_SMSP
|
from pySim.ts_51_011 import EF_SMSP
|
||||||
|
from pySim.esim.saip import param_source
|
||||||
|
from pySim.esim.saip import ProfileElement, ProfileElementSD, ProfileElementSequence
|
||||||
|
from pySim.esim.saip import SecurityDomainKey, SecurityDomainKeyComponent
|
||||||
|
from pySim.global_platform import KeyUsageQualifier, KeyType
|
||||||
|
|
||||||
|
def unrpad(s: hexstr, c='f') -> hexstr:
|
||||||
|
return hexstr(s.rstrip(c))
|
||||||
|
|
||||||
def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]:
|
def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]:
|
||||||
"""In a list of tuples, remove all tuples whose first part equals 'unwanted_key'."""
|
"""In a list of tuples, remove all tuples whose first part equals 'unwanted_key'."""
|
||||||
@@ -43,7 +52,6 @@ class ClassVarMeta(abc.ABCMeta):
|
|||||||
x = super().__new__(metacls, name, bases, namespace)
|
x = super().__new__(metacls, name, bases, namespace)
|
||||||
for k, v in kwargs.items():
|
for k, v in kwargs.items():
|
||||||
setattr(x, k, v)
|
setattr(x, k, v)
|
||||||
setattr(x, 'name', camel_to_snake(name))
|
|
||||||
return x
|
return x
|
||||||
|
|
||||||
class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||||
@@ -63,6 +71,7 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
|||||||
min_len: minimum length of an input str; min_len = 4
|
min_len: minimum length of an input str; min_len = 4
|
||||||
max_len: maximum length of an input str; max_len = 8
|
max_len: maximum length of an input str; max_len = 8
|
||||||
allow_len: permit only specific lengths; allow_len = (8, 16, 32)
|
allow_len: permit only specific lengths; allow_len = (8, 16, 32)
|
||||||
|
numeric_base: indicate hex / decimal, if any; numeric_base = None; numeric_base = 10; numeric_base = 16
|
||||||
|
|
||||||
Subclasses may change the meaning of these by overriding validate_val(), for example that the length counts
|
Subclasses may change the meaning of these by overriding validate_val(), for example that the length counts
|
||||||
resulting bytes instead of a hexstring length. Most subclasses will be covered by the default validate_val().
|
resulting bytes instead of a hexstring length. Most subclasses will be covered by the default validate_val().
|
||||||
@@ -117,6 +126,8 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
|||||||
max_len = None
|
max_len = None
|
||||||
allow_len = None # a list of specific lengths
|
allow_len = None # a list of specific lengths
|
||||||
example_input = None
|
example_input = None
|
||||||
|
default_source = None # a param_source.ParamSource subclass
|
||||||
|
numeric_base = None # or 10 or 16
|
||||||
|
|
||||||
def __init__(self, input_value=None):
|
def __init__(self, input_value=None):
|
||||||
self.input_value = input_value # the raw input value as given by caller
|
self.input_value = input_value # the raw input value as given by caller
|
||||||
@@ -178,19 +189,28 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
|||||||
if cls.allow_chars is not None:
|
if cls.allow_chars is not None:
|
||||||
if any(c not in cls.allow_chars for c in val):
|
if any(c not in cls.allow_chars for c in val):
|
||||||
raise ValueError(f"invalid characters in input value {val!r}, valid chars are {cls.allow_chars}")
|
raise ValueError(f"invalid characters in input value {val!r}, valid chars are {cls.allow_chars}")
|
||||||
|
elif isinstance(val, io.BytesIO):
|
||||||
|
val = val.getvalue()
|
||||||
|
|
||||||
|
if hasattr(val, '__len__'):
|
||||||
|
val_len = len(val)
|
||||||
|
else:
|
||||||
|
# e.g. int length
|
||||||
|
val_len = len(str(val))
|
||||||
|
|
||||||
if cls.allow_len is not None:
|
if cls.allow_len is not None:
|
||||||
l = cls.allow_len
|
l = cls.allow_len
|
||||||
# cls.allow_len could be one int, or a tuple of ints. Wrap a single int also in a tuple.
|
# cls.allow_len could be one int, or a tuple of ints. Wrap a single int also in a tuple.
|
||||||
if not isinstance(l, (tuple, list)):
|
if not isinstance(l, (tuple, list)):
|
||||||
l = (l,)
|
l = (l,)
|
||||||
if len(val) not in l:
|
if val_len not in l:
|
||||||
raise ValueError(f'length must be one of {cls.allow_len}, not {len(val)}: {val!r}')
|
raise ValueError(f'length must be one of {cls.allow_len}, not {val_len}: {val!r}')
|
||||||
if cls.min_len is not None:
|
if cls.min_len is not None:
|
||||||
if len(val) < cls.min_len:
|
if val_len < cls.min_len:
|
||||||
raise ValueError(f'length must be at least {cls.min_len}, not {len(val)}: {val!r}')
|
raise ValueError(f'length must be at least {cls.min_len}, not {val_len}: {val!r}')
|
||||||
if cls.max_len is not None:
|
if cls.max_len is not None:
|
||||||
if len(val) > cls.max_len:
|
if val_len > cls.max_len:
|
||||||
raise ValueError(f'length must be at most {cls.max_len}, not {len(val)}: {val!r}')
|
raise ValueError(f'length must be at most {cls.max_len}, not {val_len}: {val!r}')
|
||||||
return val
|
return val
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -199,6 +219,49 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
|||||||
Write the given val in the right format in all the right places in pes."""
|
Write the given val in the right format in all the right places in pes."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_value_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
"""Same as get_values_from_pes() but expecting a single value.
|
||||||
|
get_values_from_pes() may return values like this:
|
||||||
|
[{ 'AlgorithmID': 'Milenage' }, { 'AlgorithmID': 'Milenage' }]
|
||||||
|
This ensures that all these entries are identical and would return only
|
||||||
|
{ 'AlgorithmID': 'Milenage' }.
|
||||||
|
|
||||||
|
This is relevant for any profile element that may appear multiple times in the same PES (only a few),
|
||||||
|
where each occurrence should reflect the same value (all currently known parameters).
|
||||||
|
"""
|
||||||
|
|
||||||
|
val = None
|
||||||
|
for v in cls.get_values_from_pes(pes):
|
||||||
|
if val is None:
|
||||||
|
val = v
|
||||||
|
elif val != v:
|
||||||
|
raise ValueError(f'get_value_from_pes(): got distinct values: {val!r} != {v!r}')
|
||||||
|
return val
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence) -> Generator:
|
||||||
|
"""This is what subclasses implement: yield all values from a decoded profile package.
|
||||||
|
Find all values in the pes, and yield them decoded to a valid cls.input_value format.
|
||||||
|
Should be a generator function, i.e. use 'yield' instead of 'return'.
|
||||||
|
|
||||||
|
Yielded value must be a dict(). Usually, an implementation will return only one key, like
|
||||||
|
|
||||||
|
{ "ICCID": "1234567890123456789" }
|
||||||
|
|
||||||
|
Some implementations have more than one value to return, like
|
||||||
|
|
||||||
|
{ "IMSI": "00101012345678", "IMSI-ACC" : "5" }
|
||||||
|
|
||||||
|
Implementation example:
|
||||||
|
|
||||||
|
for pe in pes:
|
||||||
|
if my_condition(pe):
|
||||||
|
yield { cls.name: b2h(my_bin_value_from(pe)) }
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_len_range(cls):
|
def get_len_range(cls):
|
||||||
"""considering all of min_len, max_len and allow_len, get a tuple of the resulting (min, max) of permitted
|
"""considering all of min_len, max_len and allow_len, get a tuple of the resulting (min, max) of permitted
|
||||||
@@ -219,6 +282,20 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
|||||||
return (None, None)
|
return (None, None)
|
||||||
return (min(vals), max(vals))
|
return (min(vals), max(vals))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_typical_input_len(cls):
|
||||||
|
'''return a good length to use as the visible width of a user interface input field.
|
||||||
|
May be overridden by subclasses.
|
||||||
|
This default implementation returns the maximum allowed value length -- a good fit for most subclasses.
|
||||||
|
'''
|
||||||
|
return cls.get_len_range()[1] or 16
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_super_of(cls, other_class):
|
||||||
|
try:
|
||||||
|
return issubclass(other_class, cls)
|
||||||
|
except TypeError:
|
||||||
|
return False
|
||||||
|
|
||||||
class DecimalParam(ConfigurableParameter):
|
class DecimalParam(ConfigurableParameter):
|
||||||
"""Decimal digits. The input value may be a string of decimal digits like '012345', or an int. The output of
|
"""Decimal digits. The input value may be a string of decimal digits like '012345', or an int. The output of
|
||||||
@@ -226,6 +303,7 @@ class DecimalParam(ConfigurableParameter):
|
|||||||
"""
|
"""
|
||||||
allow_types = (str, int)
|
allow_types = (str, int)
|
||||||
allow_chars = '0123456789'
|
allow_chars = '0123456789'
|
||||||
|
numeric_base = 10
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate_val(cls, val):
|
def validate_val(cls, val):
|
||||||
@@ -249,6 +327,7 @@ class DecimalHexParam(DecimalParam):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def validate_val(cls, val):
|
def validate_val(cls, val):
|
||||||
val = super().validate_val(val)
|
val = super().validate_val(val)
|
||||||
|
assert isinstance(val, str)
|
||||||
val = ''.join('%02x' % ord(x) for x in val)
|
val = ''.join('%02x' % ord(x) for x in val)
|
||||||
if cls.rpad is not None:
|
if cls.rpad is not None:
|
||||||
c = cls.rpad_char
|
c = cls.rpad_char
|
||||||
@@ -256,9 +335,21 @@ class DecimalHexParam(DecimalParam):
|
|||||||
# a DecimalHexParam subclass expects the apply_val() input to be a bytes instance ready for the pes
|
# a DecimalHexParam subclass expects the apply_val() input to be a bytes instance ready for the pes
|
||||||
return h2b(val)
|
return h2b(val)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def decimal_hex_to_str(cls, val):
|
||||||
|
"""useful for get_values_from_pes() implementations of subclasses"""
|
||||||
|
if isinstance(val, bytes):
|
||||||
|
val = b2h(val)
|
||||||
|
assert isinstance(val, hexstr)
|
||||||
|
if cls.rpad is not None:
|
||||||
|
c = cls.rpad_char or 'f'
|
||||||
|
val = unrpad(val, c)
|
||||||
|
return val.to_bytes().decode('ascii')
|
||||||
|
|
||||||
class IntegerParam(ConfigurableParameter):
|
class IntegerParam(ConfigurableParameter):
|
||||||
allow_types = (str, int)
|
allow_types = (str, int)
|
||||||
allow_chars = '0123456789'
|
allow_chars = '0123456789'
|
||||||
|
numeric_base = 10
|
||||||
|
|
||||||
# two integers, if the resulting int should be range limited
|
# two integers, if the resulting int should be range limited
|
||||||
min_val = None
|
min_val = None
|
||||||
@@ -279,14 +370,28 @@ class IntegerParam(ConfigurableParameter):
|
|||||||
raise ValueError(f'Value {val} is out of range, must be [{cls.min_val}..{cls.max_val}]')
|
raise ValueError(f'Value {val} is out of range, must be [{cls.min_val}..{cls.max_val}]')
|
||||||
return val
|
return val
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
for valdict in super().get_values_from_pes(pes):
|
||||||
|
for key, val in valdict.items():
|
||||||
|
if isinstance(val, int):
|
||||||
|
valdict[key] = str(val)
|
||||||
|
yield valdict
|
||||||
|
|
||||||
class BinaryParam(ConfigurableParameter):
|
class BinaryParam(ConfigurableParameter):
|
||||||
allow_types = (str, io.BytesIO, bytes, bytearray)
|
allow_types = (str, io.BytesIO, bytes, bytearray, int)
|
||||||
allow_chars = '0123456789abcdefABCDEF'
|
allow_chars = '0123456789abcdefABCDEF'
|
||||||
strip_chars = ' \t\r\n'
|
strip_chars = ' \t\r\n'
|
||||||
|
numeric_base = 16
|
||||||
|
default_source = param_source.RandomHexDigitSource
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate_val(cls, val):
|
def validate_val(cls, val):
|
||||||
# take care that min_len and max_len are applied to the binary length by converting to bytes first
|
# take care that min_len and max_len are applied to the binary length by converting to bytes first
|
||||||
|
if isinstance(val, int):
|
||||||
|
min_len, _max_len = cls.get_len_range()
|
||||||
|
val = '%0*d' % (min_len, val)
|
||||||
|
|
||||||
if isinstance(val, str):
|
if isinstance(val, str):
|
||||||
if cls.strip_chars is not None:
|
if cls.strip_chars is not None:
|
||||||
val = ''.join(c for c in val if c not in cls.strip_chars)
|
val = ''.join(c for c in val if c not in cls.strip_chars)
|
||||||
@@ -301,6 +406,82 @@ class BinaryParam(ConfigurableParameter):
|
|||||||
val = super().validate_val(val)
|
val = super().validate_val(val)
|
||||||
return bytes(val)
|
return bytes(val)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_typical_input_len(cls):
|
||||||
|
# override to return twice the length, because of hex digits.
|
||||||
|
min_len, max_len = cls.get_len_range()
|
||||||
|
if max_len is None:
|
||||||
|
return None
|
||||||
|
# two hex characters per value octet.
|
||||||
|
# (maybe *3 to also allow for spaces?)
|
||||||
|
return max_len * 2
|
||||||
|
|
||||||
|
|
||||||
|
class EnumParam(ConfigurableParameter):
|
||||||
|
"""ConfigurableParameter for named integer enumeration values.
|
||||||
|
|
||||||
|
Subclasses must define a nested enum.IntEnum named 'Values' listing all valid names and their
|
||||||
|
integer codes. apply_val() and get_values_from_pes() are not implemented here and this must
|
||||||
|
be inherited from another mixin."""
|
||||||
|
|
||||||
|
class Values(enum.IntEnum):
|
||||||
|
pass # subclasses override this
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def validate_val(cls, val) -> int:
|
||||||
|
if isinstance(val, int):
|
||||||
|
try:
|
||||||
|
return int(cls.Values(val))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
elif isinstance(val, str):
|
||||||
|
member = cls.map_name_to_val(val, strict=False)
|
||||||
|
if member is not None:
|
||||||
|
return member
|
||||||
|
|
||||||
|
valid = ', '.join(m.name for m in cls.Values)
|
||||||
|
raise ValueError(f"{cls.get_name()}: invalid argument: {val!r}. Valid arguments are: {valid}")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def map_name_to_val(cls, name: str, strict=True) -> int:
|
||||||
|
"""Return the integer value for a given enum member name. Performs an exact match first,
|
||||||
|
then falls back to fuzzy matching (case-insensitive, punctuation-insensitive)."""
|
||||||
|
try:
|
||||||
|
return int(cls.Values[name])
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
clean = cls.clean_name_str(name)
|
||||||
|
for member in cls.Values:
|
||||||
|
if cls.clean_name_str(member.name) == clean:
|
||||||
|
return int(member)
|
||||||
|
|
||||||
|
if strict:
|
||||||
|
valid = ', '.join(m.name for m in cls.Values)
|
||||||
|
raise ValueError(f"{cls.get_name()}: {name!r} is not a known value. Known values are: {valid}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def map_val_to_name(cls, val, strict=False) -> str:
|
||||||
|
"""Return the enum member name for a given integer value."""
|
||||||
|
try:
|
||||||
|
return cls.Values(val).name
|
||||||
|
except ValueError:
|
||||||
|
if strict:
|
||||||
|
raise ValueError(f"{cls.get_name()}: {val!r} ({type(val).__name__}) is not a known value.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def name_normalize(cls, name: str) -> str:
|
||||||
|
"""Map a (possibly fuzzy) name to its canonical enum member name."""
|
||||||
|
return cls.Values(cls.map_name_to_val(name)).name
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def clean_name_str(cls, val: str) -> str:
|
||||||
|
"""Strip punctuation and case for fuzzy name comparison.
|
||||||
|
Treats hyphens and underscores as equivalent (both removed)."""
|
||||||
|
return re.sub('[^0-9A-Za-z]', '', val).lower()
|
||||||
|
|
||||||
|
|
||||||
class Iccid(DecimalParam):
|
class Iccid(DecimalParam):
|
||||||
"""ICCID Parameter. Input: string of decimal digits.
|
"""ICCID Parameter. Input: string of decimal digits.
|
||||||
@@ -309,6 +490,7 @@ class Iccid(DecimalParam):
|
|||||||
min_len = 18
|
min_len = 18
|
||||||
max_len = 20
|
max_len = 20
|
||||||
example_input = '998877665544332211'
|
example_input = '998877665544332211'
|
||||||
|
default_source = param_source.IncDigitSource
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate_val(cls, val):
|
def validate_val(cls, val):
|
||||||
@@ -322,6 +504,17 @@ class Iccid(DecimalParam):
|
|||||||
# patch MF/EF.ICCID
|
# patch MF/EF.ICCID
|
||||||
file_replace_content(pes.get_pe_for_type('mf').decoded['ef-iccid'], h2b(enc_iccid(val)))
|
file_replace_content(pes.get_pe_for_type('mf').decoded['ef-iccid'], h2b(enc_iccid(val)))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
padded = b2h(pes.get_pe_for_type('header').decoded['iccid'])
|
||||||
|
iccid = unrpad(padded)
|
||||||
|
yield { cls.name: iccid }
|
||||||
|
|
||||||
|
for pe in pes.get_pes_for_type('mf'):
|
||||||
|
iccid_f = pe.files.get('ef-iccid', None)
|
||||||
|
if iccid_f is not None:
|
||||||
|
yield { cls.name: dec_iccid(b2h(iccid_f.body)) }
|
||||||
|
|
||||||
class Imsi(DecimalParam):
|
class Imsi(DecimalParam):
|
||||||
"""Configurable IMSI. Expects value to be a string of digits. Automatically sets the ACC to
|
"""Configurable IMSI. Expects value to be a string of digits. Automatically sets the ACC to
|
||||||
the last digit of the IMSI."""
|
the last digit of the IMSI."""
|
||||||
@@ -330,6 +523,7 @@ class Imsi(DecimalParam):
|
|||||||
min_len = 6
|
min_len = 6
|
||||||
max_len = 15
|
max_len = 15
|
||||||
example_input = '00101' + ('0' * 10)
|
example_input = '00101' + ('0' * 10)
|
||||||
|
default_source = param_source.IncDigitSource
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||||
@@ -342,6 +536,18 @@ class Imsi(DecimalParam):
|
|||||||
file_replace_content(pe.decoded['ef-acc'], acc.to_bytes(2, 'big'))
|
file_replace_content(pe.decoded['ef-acc'], acc.to_bytes(2, 'big'))
|
||||||
# TODO: DF.GSM_ACCESS if not linked?
|
# TODO: DF.GSM_ACCESS if not linked?
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
for pe in pes.get_pes_for_type('usim'):
|
||||||
|
imsi_f = pe.files.get('ef-imsi', None)
|
||||||
|
acc_f = pe.files.get('ef-acc', None)
|
||||||
|
y = {}
|
||||||
|
if imsi_f:
|
||||||
|
y[cls.name] = dec_imsi(b2h(imsi_f.body))
|
||||||
|
if acc_f:
|
||||||
|
y[cls.name + '-ACC'] = b2h(acc_f.body)
|
||||||
|
yield y
|
||||||
|
|
||||||
class SmspTpScAddr(ConfigurableParameter):
|
class SmspTpScAddr(ConfigurableParameter):
|
||||||
"""Configurable SMSC (SMS Service Centre) TP-SC-ADDR. Expects to be a phone number in national or
|
"""Configurable SMSC (SMS Service Centre) TP-SC-ADDR. Expects to be a phone number in national or
|
||||||
international format (designated by a leading +). Automatically sets the NPI to E.164 and the TON based on
|
international format (designated by a leading +). Automatically sets the NPI to E.164 and the TON based on
|
||||||
@@ -350,25 +556,45 @@ class SmspTpScAddr(ConfigurableParameter):
|
|||||||
name = 'SMSP-TP-SC-ADDR'
|
name = 'SMSP-TP-SC-ADDR'
|
||||||
allow_chars = '+0123456789'
|
allow_chars = '+0123456789'
|
||||||
strip_chars = ' \t\r\n'
|
strip_chars = ' \t\r\n'
|
||||||
|
numeric_base = 10
|
||||||
max_len = 21 # '+' and 20 digits
|
max_len = 21 # '+' and 20 digits
|
||||||
min_len = 1
|
min_len = 1
|
||||||
example_input = '+49301234567'
|
example_input = '+49301234567'
|
||||||
|
default_source = param_source.ConstantSource
|
||||||
|
|
||||||
@classmethod
|
@staticmethod
|
||||||
def validate_val(cls, val):
|
def str_to_tuple(addr_str):
|
||||||
val = super().validate_val(val)
|
|
||||||
addr_str = str(val)
|
|
||||||
if addr_str[0] == '+':
|
if addr_str[0] == '+':
|
||||||
digits = addr_str[1:]
|
digits = addr_str[1:]
|
||||||
international = True
|
international = True
|
||||||
else:
|
else:
|
||||||
digits = addr_str
|
digits = addr_str
|
||||||
international = False
|
international = False
|
||||||
|
return (international, digits)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def tuple_to_str(addr_tuple):
|
||||||
|
international, digits = addr_tuple
|
||||||
|
if international:
|
||||||
|
ret = '+'
|
||||||
|
else:
|
||||||
|
ret = ''
|
||||||
|
ret += digits
|
||||||
|
return ret
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def validate_val(cls, val):
|
||||||
|
val = super().validate_val(val)
|
||||||
|
|
||||||
|
addr_tuple = cls.str_to_tuple(str(val))
|
||||||
|
|
||||||
|
international, digits = addr_tuple
|
||||||
if len(digits) > 20:
|
if len(digits) > 20:
|
||||||
raise ValueError(f'TP-SC-ADDR must not exceed 20 digits: {digits!r}')
|
raise ValueError(f'TP-SC-ADDR must not exceed 20 digits: {digits!r}')
|
||||||
if not digits.isdecimal():
|
if not digits.isdecimal():
|
||||||
raise ValueError(f'TP-SC-ADDR must only contain decimal digits: {digits!r}')
|
raise ValueError(f'TP-SC-ADDR must only contain decimal digits: {digits!r}')
|
||||||
return (international, digits)
|
|
||||||
|
return addr_tuple
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||||
@@ -392,98 +618,252 @@ class SmspTpScAddr(ConfigurableParameter):
|
|||||||
ef_smsp_dec['tp_sc_addr']['ton_npi']['type_of_number'] = 'international' if international else 'unknown'
|
ef_smsp_dec['tp_sc_addr']['ton_npi']['type_of_number'] = 'international' if international else 'unknown'
|
||||||
# ensure the parameter_indicators.tp_sc_addr is True
|
# ensure the parameter_indicators.tp_sc_addr is True
|
||||||
ef_smsp_dec['parameter_indicators']['tp_sc_addr'] = True
|
ef_smsp_dec['parameter_indicators']['tp_sc_addr'] = True
|
||||||
# re-encode into the File body
|
|
||||||
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1)
|
# alpha_id padding: to make room for a human readable SMSC name that can be provisioned to the profile later
|
||||||
|
# on, alpha_id needs to be empty but padded 0xff to some length.
|
||||||
|
# - alpha_id is optional, setting alpha_id = '' ensures the IE is present.
|
||||||
|
# - the length of the file is 28+Y where Y is the length of the alpha_id -- here the intended length of our padding
|
||||||
|
# (see 3GPP TS 31.102 4.2.27 EF.SMSP). So if we want a maximum length of alpha_id = 14, we set the total
|
||||||
|
# file size to 28+14 = 42.
|
||||||
|
# - this file size has to go in two places: encode_record_bin() needs to know the length to encode the right
|
||||||
|
# length of fillFileContent.
|
||||||
|
# - the f_smsp needs to show the right file size in the PES, as in
|
||||||
|
# 'ef-smsp': [('fileDescriptor', {'efFileSize': '2a', ...
|
||||||
|
# (where 2a == 42)
|
||||||
|
# - To generate the right amount of fillFileContent, pass total_len=42 to encode_record_bin().
|
||||||
|
# - To show the right size in the PES, set f_smsp.rec_len = 42
|
||||||
|
ef_smsp_dec['alpha_id'] = ''
|
||||||
|
f_smsp.rec_len = 42
|
||||||
|
|
||||||
|
# re-encode into the File body.
|
||||||
|
#
|
||||||
#print("SMSP (new): %s" % f_smsp.body)
|
#print("SMSP (new): %s" % f_smsp.body)
|
||||||
# re-generate the pe.decoded member from the File instance
|
# re-generate the pe.decoded member from the File instance
|
||||||
|
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1, total_len=f_smsp.rec_len)
|
||||||
pe.file2pe(f_smsp)
|
pe.file2pe(f_smsp)
|
||||||
|
|
||||||
class SdKey(BinaryParam, metaclass=ClassVarMeta):
|
@classmethod
|
||||||
"""Configurable Security Domain (SD) Key. Value is presented as bytes."""
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
for pe in pes.get_pes_for_type('usim'):
|
||||||
|
f_smsp = pe.files['ef-smsp']
|
||||||
|
ef_smsp = EF_SMSP()
|
||||||
|
ef_smsp_dec = ef_smsp.decode_record_bin(f_smsp.body, 1)
|
||||||
|
|
||||||
|
tp_sc_addr = ef_smsp_dec.get('tp_sc_addr', None)
|
||||||
|
|
||||||
|
digits = tp_sc_addr.get('call_number', None)
|
||||||
|
|
||||||
|
ton_npi = tp_sc_addr.get('ton_npi', None)
|
||||||
|
international = ton_npi.get('type_of_number', None)
|
||||||
|
international = (international == 'international')
|
||||||
|
|
||||||
|
yield { cls.name: cls.tuple_to_str((international, digits)) }
|
||||||
|
|
||||||
|
|
||||||
|
class SdKey(BinaryParam):
|
||||||
|
"""Configurable Security Domain (SD) Key. Value is presented as bytes.
|
||||||
|
Non-abstract implementations are generated in SdKey.generate_sd_key_classes"""
|
||||||
# these will be set by subclasses
|
# these will be set by subclasses
|
||||||
key_type = None
|
key_type = None
|
||||||
key_id = None
|
|
||||||
kvn = None
|
kvn = None
|
||||||
|
key_id = None
|
||||||
key_usage_qual = None
|
key_usage_qual = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _apply_sd(cls, pe: ProfileElement, value):
|
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||||
assert pe.type == 'securityDomain'
|
set_components = [ SecurityDomainKeyComponent(cls.key_type, val) ]
|
||||||
for key in pe.decoded['keyList']:
|
|
||||||
if key['keyIdentifier'][0] == cls.key_id and key['keyVersionNumber'][0] == cls.kvn:
|
for pe in pes.pe_list:
|
||||||
assert len(key['keyComponents']) == 1
|
if pe.type != 'securityDomain':
|
||||||
key['keyComponents'][0]['keyData'] = value
|
continue
|
||||||
return
|
assert isinstance(pe, ProfileElementSD)
|
||||||
# Could not find matching key to patch, create a new one
|
|
||||||
key = {
|
key = pe.find_key(key_version_number=cls.kvn, key_id=cls.key_id)
|
||||||
'keyUsageQualifier': bytes([cls.key_usage_qual]),
|
if not key:
|
||||||
'keyIdentifier': bytes([cls.key_id]),
|
# Could not find matching key to patch, create a new one
|
||||||
'keyVersionNumber': bytes([cls.kvn]),
|
key = SecurityDomainKey(
|
||||||
'keyComponents': [
|
key_version_number=cls.kvn,
|
||||||
{ 'keyType': bytes([cls.key_type]), 'keyData': value },
|
key_id=cls.key_id,
|
||||||
]
|
key_usage_qualifier=cls.key_usage_qual,
|
||||||
}
|
key_components=set_components,
|
||||||
pe.decoded['keyList'].append(key)
|
)
|
||||||
|
pe.add_key(key)
|
||||||
|
else:
|
||||||
|
# A key of this KVN and ID already exists in the profile.
|
||||||
|
|
||||||
|
# Keep the key_usage_qualifier as it was in the profile, so skip this here:
|
||||||
|
# key.key_usage_qualifier = cls.key_usage_qual
|
||||||
|
|
||||||
|
key.key_components = set_components
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def apply_val(cls, pes: ProfileElementSequence, value):
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
for pe in pes.get_pes_for_type('securityDomain'):
|
for pe in pes.pe_list:
|
||||||
cls._apply_sd(pe, value)
|
if pe.type != 'securityDomain':
|
||||||
|
continue
|
||||||
|
assert isinstance(pe, ProfileElementSD)
|
||||||
|
|
||||||
class SdKeyScp80_01(SdKey, kvn=0x01, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
key = pe.find_key(key_version_number=cls.kvn, key_id=cls.key_id)
|
||||||
pass
|
if not key:
|
||||||
class SdKeyScp80_01Kic(SdKeyScp80_01, key_id=0x01, key_usage_qual=0x18): # FIXME: ordering?
|
continue
|
||||||
pass
|
kc = key.get_key_component(cls.key_type)
|
||||||
class SdKeyScp80_01Kid(SdKeyScp80_01, key_id=0x02, key_usage_qual=0x14):
|
if kc:
|
||||||
pass
|
yield { cls.name: b2h(kc) }
|
||||||
class SdKeyScp80_01Kik(SdKeyScp80_01, key_id=0x03, key_usage_qual=0x48):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class SdKeyScp81_01(SdKey, kvn=0x81): # FIXME
|
|
||||||
pass
|
|
||||||
class SdKeyScp81_01Psk(SdKeyScp81_01, key_id=0x01, key_type=0x85, key_usage_qual=0x3C):
|
|
||||||
pass
|
|
||||||
class SdKeyScp81_01Dek(SdKeyScp81_01, key_id=0x02, key_type=0x88, key_usage_qual=0x48):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class SdKeyScp02_20(SdKey, kvn=0x20, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
|
||||||
pass
|
|
||||||
class SdKeyScp02_20Enc(SdKeyScp02_20, key_id=0x01, key_usage_qual=0x18):
|
|
||||||
pass
|
|
||||||
class SdKeyScp02_20Mac(SdKeyScp02_20, key_id=0x02, key_usage_qual=0x14):
|
|
||||||
pass
|
|
||||||
class SdKeyScp02_20Dek(SdKeyScp02_20, key_id=0x03, key_usage_qual=0x48):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class SdKeyScp03_30(SdKey, kvn=0x30, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_30Enc(SdKeyScp03_30, key_id=0x01, key_usage_qual=0x18):
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_30Mac(SdKeyScp03_30, key_id=0x02, key_usage_qual=0x14):
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_30Dek(SdKeyScp03_30, key_id=0x03, key_usage_qual=0x48):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class SdKeyScp03_31(SdKey, kvn=0x31, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_31Enc(SdKeyScp03_31, key_id=0x01, key_usage_qual=0x18):
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_31Mac(SdKeyScp03_31, key_id=0x02, key_usage_qual=0x14):
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_31Dek(SdKeyScp03_31, key_id=0x03, key_usage_qual=0x48):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class SdKeyScp03_32(SdKey, kvn=0x32, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_32Enc(SdKeyScp03_32, key_id=0x01, key_usage_qual=0x18):
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_32Mac(SdKeyScp03_32, key_id=0x02, key_usage_qual=0x14):
|
|
||||||
pass
|
|
||||||
class SdKeyScp03_32Dek(SdKeyScp03_32, key_id=0x03, key_usage_qual=0x48):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
|
LEN_128 = (16,)
|
||||||
|
LEN_128_192_256 = (16, 24, 32)
|
||||||
|
LEN_128_256 = (16, 32)
|
||||||
|
|
||||||
|
DES = ('DES', dict(key_type=KeyType.des, allow_len=LEN_128) )
|
||||||
|
AES = ('AES', dict(key_type=KeyType.aes, allow_len=LEN_128_192_256) )
|
||||||
|
|
||||||
|
ENC = ('ENC', dict(key_id=0x01, key_usage_qual=0x18) )
|
||||||
|
MAC = ('MAC', dict(key_id=0x02, key_usage_qual=0x14) )
|
||||||
|
DEK = ('DEK', dict(key_id=0x03, key_usage_qual=0x48) )
|
||||||
|
|
||||||
|
TLSPSK_PSK = ('TLSPSK', dict(key_type=KeyType.tls_psk, key_id=0x01, key_usage_qual=0x3c, allow_len=LEN_128_192_256) )
|
||||||
|
TLSPSK_DEK = ('DEK', dict(key_id=0x02, key_usage_qual=0x48) )
|
||||||
|
|
||||||
|
# THIS IS THE LIST that controls which SdKeyXxx subclasses exist:
|
||||||
|
SD_KEY_DEFS = (
|
||||||
|
# name KVN x variants x variants
|
||||||
|
('SCP02', (0x20, 0x21, 0x22, 0xff), (AES, ), (ENC, MAC, DEK) ),
|
||||||
|
('SCP03', (0x30, 0x31, 0x32), (AES, ), (ENC, MAC, DEK) ),
|
||||||
|
('SCP80', (0x01, 0x02, 0x03), (DES, AES), (ENC, MAC, DEK) ),
|
||||||
|
|
||||||
|
# key_id=1
|
||||||
|
('SCP81', (0x40, 0x41, 0x42), (TLSPSK_PSK, ), ),
|
||||||
|
# key_id=2
|
||||||
|
('SCP81', (0x40, 0x41, 0x42), (DES, AES), (TLSPSK_DEK, ) ),
|
||||||
|
)
|
||||||
|
|
||||||
|
all_implementations = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def generate_sd_key_classes(cls, sd_key_defs=SD_KEY_DEFS):
|
||||||
|
'''This generates python classes to be exported in this module, as subclasses of class SdKey.
|
||||||
|
|
||||||
|
We create SdKey subclasses dynamically from a list.
|
||||||
|
You can list all of them via:
|
||||||
|
from pySim.esim.saip.personalization import SdKey
|
||||||
|
SdKey.all_implementations
|
||||||
|
or
|
||||||
|
print('\n'.join(sorted(f'{x.__name__}\t{x.name}' for x in SdKey.all_implementations)))
|
||||||
|
|
||||||
|
at time of writing this comment, this prints:
|
||||||
|
|
||||||
|
SdKeyScp02Kvn20AesDek SCP02-KVN20-AES-DEK
|
||||||
|
SdKeyScp02Kvn20AesEnc SCP02-KVN20-AES-ENC
|
||||||
|
SdKeyScp02Kvn20AesMac SCP02-KVN20-AES-MAC
|
||||||
|
SdKeyScp02Kvn21AesDek SCP02-KVN21-AES-DEK
|
||||||
|
SdKeyScp02Kvn21AesEnc SCP02-KVN21-AES-ENC
|
||||||
|
SdKeyScp02Kvn21AesMac SCP02-KVN21-AES-MAC
|
||||||
|
SdKeyScp02Kvn22AesDek SCP02-KVN22-AES-DEK
|
||||||
|
SdKeyScp02Kvn22AesEnc SCP02-KVN22-AES-ENC
|
||||||
|
SdKeyScp02Kvn22AesMac SCP02-KVN22-AES-MAC
|
||||||
|
SdKeyScp02KvnffAesDek SCP02-KVNff-AES-DEK
|
||||||
|
SdKeyScp02KvnffAesEnc SCP02-KVNff-AES-ENC
|
||||||
|
SdKeyScp02KvnffAesMac SCP02-KVNff-AES-MAC
|
||||||
|
SdKeyScp03Kvn30AesDek SCP03-KVN30-AES-DEK
|
||||||
|
SdKeyScp03Kvn30AesEnc SCP03-KVN30-AES-ENC
|
||||||
|
SdKeyScp03Kvn30AesMac SCP03-KVN30-AES-MAC
|
||||||
|
SdKeyScp03Kvn31AesDek SCP03-KVN31-AES-DEK
|
||||||
|
SdKeyScp03Kvn31AesEnc SCP03-KVN31-AES-ENC
|
||||||
|
SdKeyScp03Kvn31AesMac SCP03-KVN31-AES-MAC
|
||||||
|
SdKeyScp03Kvn32AesDek SCP03-KVN32-AES-DEK
|
||||||
|
SdKeyScp03Kvn32AesEnc SCP03-KVN32-AES-ENC
|
||||||
|
SdKeyScp03Kvn32AesMac SCP03-KVN32-AES-MAC
|
||||||
|
SdKeyScp80Kvn01AesDek SCP80-KVN01-AES-DEK
|
||||||
|
SdKeyScp80Kvn01AesEnc SCP80-KVN01-AES-ENC
|
||||||
|
SdKeyScp80Kvn01AesMac SCP80-KVN01-AES-MAC
|
||||||
|
SdKeyScp80Kvn01DesDek SCP80-KVN01-DES-DEK
|
||||||
|
SdKeyScp80Kvn01DesEnc SCP80-KVN01-DES-ENC
|
||||||
|
SdKeyScp80Kvn01DesMac SCP80-KVN01-DES-MAC
|
||||||
|
SdKeyScp80Kvn02AesDek SCP80-KVN02-AES-DEK
|
||||||
|
SdKeyScp80Kvn02AesEnc SCP80-KVN02-AES-ENC
|
||||||
|
SdKeyScp80Kvn02AesMac SCP80-KVN02-AES-MAC
|
||||||
|
SdKeyScp80Kvn02DesDek SCP80-KVN02-DES-DEK
|
||||||
|
SdKeyScp80Kvn02DesEnc SCP80-KVN02-DES-ENC
|
||||||
|
SdKeyScp80Kvn02DesMac SCP80-KVN02-DES-MAC
|
||||||
|
SdKeyScp80Kvn03AesDek SCP80-KVN03-AES-DEK
|
||||||
|
SdKeyScp80Kvn03AesEnc SCP80-KVN03-AES-ENC
|
||||||
|
SdKeyScp80Kvn03AesMac SCP80-KVN03-AES-MAC
|
||||||
|
SdKeyScp80Kvn03DesDek SCP80-KVN03-DES-DEK
|
||||||
|
SdKeyScp80Kvn03DesEnc SCP80-KVN03-DES-ENC
|
||||||
|
SdKeyScp80Kvn03DesMac SCP80-KVN03-DES-MAC
|
||||||
|
SdKeyScp81Kvn40AesDek SCP81-KVN40-AES-DEK
|
||||||
|
SdKeyScp81Kvn40DesDek SCP81-KVN40-DES-DEK
|
||||||
|
SdKeyScp81Kvn40Tlspsk SCP81-KVN40-TLSPSK
|
||||||
|
SdKeyScp81Kvn41AesDek SCP81-KVN41-AES-DEK
|
||||||
|
SdKeyScp81Kvn41DesDek SCP81-KVN41-DES-DEK
|
||||||
|
SdKeyScp81Kvn41Tlspsk SCP81-KVN41-TLSPSK
|
||||||
|
SdKeyScp81Kvn42AesDek SCP81-KVN42-AES-DEK
|
||||||
|
SdKeyScp81Kvn42DesDek SCP81-KVN42-DES-DEK
|
||||||
|
SdKeyScp81Kvn42Tlspsk SCP81-KVN42-TLSPSK
|
||||||
|
'''
|
||||||
|
|
||||||
|
SdKey.all_implementations = []
|
||||||
|
|
||||||
|
def camel(s):
|
||||||
|
return s[:1].upper() + s[1:].lower()
|
||||||
|
|
||||||
|
def do_variants(name, kvn, remaining_variants, labels=[], attrs={}):
|
||||||
|
'recurse to unfold as many variants as there may be'
|
||||||
|
if remaining_variants:
|
||||||
|
# not a leaf node, collect more labels and attrs
|
||||||
|
variants = remaining_variants[0]
|
||||||
|
remaining_variants = remaining_variants[1:]
|
||||||
|
|
||||||
|
for label, valdict in variants:
|
||||||
|
# pass copies to recursion
|
||||||
|
inner_labels = list(labels)
|
||||||
|
inner_attrs = dict(attrs)
|
||||||
|
|
||||||
|
inner_labels.append(label)
|
||||||
|
inner_attrs.update(valdict)
|
||||||
|
do_variants(name, kvn, remaining_variants,
|
||||||
|
labels=inner_labels,
|
||||||
|
attrs=inner_attrs)
|
||||||
|
return
|
||||||
|
|
||||||
|
# leaf node. create a new class with all the accumulated vals
|
||||||
|
parts = [name, f'KVN{kvn:02x}',] + labels
|
||||||
|
cls_label = '-'.join(p for p in parts if p)
|
||||||
|
|
||||||
|
parts = ['Sd', 'Key', name, f'Kvn{kvn:02x}'] + labels
|
||||||
|
clsname = ''.join(camel(p) for p in parts)
|
||||||
|
|
||||||
|
max_key_len = attrs.get('allow_len')[-1]
|
||||||
|
|
||||||
|
attrs.update({
|
||||||
|
'name' : cls_label,
|
||||||
|
'kvn': kvn,
|
||||||
|
'example_input': f'00*{max_key_len}',
|
||||||
|
})
|
||||||
|
|
||||||
|
# below line is like
|
||||||
|
# class SdKeyScpNNKvnXXYyyZzz(SdKey):
|
||||||
|
# <set attrs>
|
||||||
|
cls_def = type(clsname, (cls,), attrs)
|
||||||
|
|
||||||
|
# for some unknown reason, subclassing from abc.ABC makes cls_def.__module__ == 'abc',
|
||||||
|
# but we don't want 'abc.SdKeyScp03Kvn32AesEnc'.
|
||||||
|
# Make sure it is 'pySim.esim.saip.personalization.SdKeyScp03Kvn32AesEnc'
|
||||||
|
cls_def.__module__ = __name__
|
||||||
|
|
||||||
|
globals()[clsname] = cls_def
|
||||||
|
SdKey.all_implementations.append(cls_def)
|
||||||
|
|
||||||
|
|
||||||
|
for items in sd_key_defs:
|
||||||
|
name, kvns = items[:2]
|
||||||
|
variants = items[2:]
|
||||||
|
for kvn in kvns:
|
||||||
|
do_variants(name, kvn, variants)
|
||||||
|
|
||||||
|
# this creates all of the classes named like SdKeyScp02Kvn20AesDek to be published in this python module:
|
||||||
|
SdKey.generate_sd_key_classes()
|
||||||
|
|
||||||
def obtain_all_pe_from_pelist(l: List[ProfileElement], wanted_type: str) -> ProfileElement:
|
def obtain_all_pe_from_pelist(l: List[ProfileElement], wanted_type: str) -> ProfileElement:
|
||||||
return (pe for pe in l if pe.type == wanted_type)
|
return (pe for pe in l if pe.type == wanted_type)
|
||||||
@@ -502,7 +882,8 @@ class Puk(DecimalHexParam):
|
|||||||
allow_len = 8
|
allow_len = 8
|
||||||
rpad = 16
|
rpad = 16
|
||||||
keyReference = None
|
keyReference = None
|
||||||
example_input = '0' * allow_len
|
example_input = f'0*{allow_len}'
|
||||||
|
default_source = param_source.RandomDigitSource
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||||
@@ -516,6 +897,14 @@ class Puk(DecimalHexParam):
|
|||||||
raise ValueError("input template UPP has unexpected structure:"
|
raise ValueError("input template UPP has unexpected structure:"
|
||||||
f" cannot find pukCode with keyReference={cls.keyReference}")
|
f" cannot find pukCode with keyReference={cls.keyReference}")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
mf_pes = pes.pes_by_naa['mf'][0]
|
||||||
|
for pukCodes in obtain_all_pe_from_pelist(mf_pes, 'pukCodes'):
|
||||||
|
for pukCode in pukCodes.decoded['pukCodes']:
|
||||||
|
if pukCode['keyReference'] == cls.keyReference:
|
||||||
|
yield { cls.name: cls.decimal_hex_to_str(pukCode['pukValue']) }
|
||||||
|
|
||||||
class Puk1(Puk):
|
class Puk1(Puk):
|
||||||
name = 'PUK1'
|
name = 'PUK1'
|
||||||
keyReference = 0x01
|
keyReference = 0x01
|
||||||
@@ -529,7 +918,8 @@ class Pin(DecimalHexParam):
|
|||||||
rpad = 16
|
rpad = 16
|
||||||
min_len = 4
|
min_len = 4
|
||||||
max_len = 8
|
max_len = 8
|
||||||
example_input = '0' * max_len
|
example_input = f'0*{max_len}'
|
||||||
|
default_source = param_source.RandomDigitSource
|
||||||
keyReference = None
|
keyReference = None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -551,9 +941,24 @@ class Pin(DecimalHexParam):
|
|||||||
raise ValueError('input template UPP has unexpected structure:'
|
raise ValueError('input template UPP has unexpected structure:'
|
||||||
+ f' {cls.get_name()} cannot find pinCode with keyReference={cls.keyReference}')
|
+ f' {cls.get_name()} cannot find pinCode with keyReference={cls.keyReference}')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _read_all_pinvalues_from_pe(cls, pe: ProfileElement):
|
||||||
|
"This is a separate function because subclasses may feed different pe arguments."
|
||||||
|
for pinCodes in obtain_all_pe_from_pelist(pe, 'pinCodes'):
|
||||||
|
if pinCodes.decoded['pinCodes'][0] != 'pinconfig':
|
||||||
|
continue
|
||||||
|
|
||||||
|
for pinCode in pinCodes.decoded['pinCodes'][1]:
|
||||||
|
if pinCode['keyReference'] == cls.keyReference:
|
||||||
|
yield { cls.name: cls.decimal_hex_to_str(pinCode['pinValue']) }
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
yield from cls._read_all_pinvalues_from_pe(pes.pes_by_naa['mf'][0])
|
||||||
|
|
||||||
class Pin1(Pin):
|
class Pin1(Pin):
|
||||||
name = 'PIN1'
|
name = 'PIN1'
|
||||||
example_input = '0' * 4 # PIN are usually 4 digits
|
example_input = '0*4' # PIN are usually 4 digits
|
||||||
keyReference = 0x01
|
keyReference = 0x01
|
||||||
|
|
||||||
class Pin2(Pin1):
|
class Pin2(Pin1):
|
||||||
@@ -572,6 +977,14 @@ class Pin2(Pin1):
|
|||||||
raise ValueError('input template UPP has unexpected structure:'
|
raise ValueError('input template UPP has unexpected structure:'
|
||||||
+ f' {cls.get_name()} cannot find pinCode with keyReference={cls.keyReference} in {naa=}')
|
+ f' {cls.get_name()} cannot find pinCode with keyReference={cls.keyReference} in {naa=}')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
for naa in pes.pes_by_naa:
|
||||||
|
if naa not in ['usim','isim','csim','telecom']:
|
||||||
|
continue
|
||||||
|
for pe in pes.pes_by_naa[naa]:
|
||||||
|
yield from cls._read_all_pinvalues_from_pe(pe)
|
||||||
|
|
||||||
class Adm1(Pin):
|
class Adm1(Pin):
|
||||||
name = 'ADM1'
|
name = 'ADM1'
|
||||||
keyReference = 0x0A
|
keyReference = 0x0A
|
||||||
@@ -596,26 +1009,59 @@ class AlgoConfig(ConfigurableParameter):
|
|||||||
raise ValueError('input template UPP has unexpected structure:'
|
raise ValueError('input template UPP has unexpected structure:'
|
||||||
f' {cls.__name__} cannot find algoParameter with key={cls.algo_config_key}')
|
f' {cls.__name__} cannot find algoParameter with key={cls.algo_config_key}')
|
||||||
|
|
||||||
class AlgorithmID(DecimalParam, AlgoConfig):
|
@classmethod
|
||||||
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
|
for pe in pes.get_pes_for_type('akaParameter'):
|
||||||
|
algoConfiguration = pe.decoded['algoConfiguration']
|
||||||
|
if len(algoConfiguration) < 2:
|
||||||
|
continue
|
||||||
|
if algoConfiguration[0] != 'algoParameter':
|
||||||
|
continue
|
||||||
|
if not algoConfiguration[1]:
|
||||||
|
continue
|
||||||
|
val = algoConfiguration[1].get(cls.algo_config_key, None)
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
if isinstance(val, bytes):
|
||||||
|
val = b2h(val)
|
||||||
|
# if it is an int (algorithmID), just pass thru as int
|
||||||
|
yield { cls.name: val }
|
||||||
|
|
||||||
|
class AlgorithmID(EnumParam, AlgoConfig):
|
||||||
|
"""use validate_val() from EnumParam, and apply_val() from AlgoConfig.
|
||||||
|
In get_values_from_pes(), return enum value names, not raw values."""
|
||||||
|
name = "Algorithm"
|
||||||
algo_config_key = 'algorithmID'
|
algo_config_key = 'algorithmID'
|
||||||
allow_len = 1
|
example_input = "Milenage"
|
||||||
example_input = 1 # Milenage
|
default_source = param_source.ConstantSource
|
||||||
|
|
||||||
|
# as in pySim/esim/asn1/saip/PE_Definitions-3.3.1.asn
|
||||||
|
class Values(enum.IntEnum):
|
||||||
|
Milenage = 1
|
||||||
|
TUAK = 2
|
||||||
|
usim_test = 3 # input 'usim-test' also accepted via fuzzy matching
|
||||||
|
|
||||||
|
# EnumParam.validate_val() returns the int values from Values
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate_val(cls, val):
|
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||||
val = super().validate_val(val)
|
# return enum names, not raw values.
|
||||||
val = int(val)
|
# use of super(): this intends to call AlgoConfig.get_values_from_pes() so that the cls argument is this cls
|
||||||
valid = (1, 2, 3)
|
# here (AlgorithmID); i.e. AlgoConfig.get_values_from_pes(pes) doesn't work, because AlgoConfig needs to look up
|
||||||
if val not in valid:
|
# cls.algo_config_key.
|
||||||
raise ValueError(f'Invalid algorithmID {val!r}, must be one of {valid}')
|
for d in super(cls, cls).get_values_from_pes(pes):
|
||||||
return val
|
if cls.name in d:
|
||||||
|
# convert int to value string
|
||||||
|
val = d[cls.name]
|
||||||
|
d[cls.name] = cls.map_val_to_name(val, strict=True)
|
||||||
|
yield d
|
||||||
|
|
||||||
class K(BinaryParam, AlgoConfig):
|
class K(BinaryParam, AlgoConfig):
|
||||||
"""use validate_val() from BinaryParam, and apply_val() from AlgoConfig"""
|
"""use validate_val() from BinaryParam, and apply_val() from AlgoConfig"""
|
||||||
name = 'K'
|
name = 'K'
|
||||||
algo_config_key = 'key'
|
algo_config_key = 'key'
|
||||||
allow_len = (128 // 8, 256 // 8) # length in bytes (from BinaryParam); TUAK also allows 256 bit
|
allow_len = (128 // 8, 256 // 8) # length in bytes (from BinaryParam); TUAK also allows 256 bit
|
||||||
example_input = '00' * allow_len[0]
|
example_input = f'00*{allow_len[0]}'
|
||||||
|
|
||||||
class Opc(K):
|
class Opc(K):
|
||||||
name = 'OPc'
|
name = 'OPc'
|
||||||
@@ -629,6 +1075,7 @@ class MilenageRotationConstants(BinaryParam, AlgoConfig):
|
|||||||
algo_config_key = 'rotationConstants'
|
algo_config_key = 'rotationConstants'
|
||||||
allow_len = 5 # length in bytes (from BinaryParam)
|
allow_len = 5 # length in bytes (from BinaryParam)
|
||||||
example_input = '40 00 20 40 60'
|
example_input = '40 00 20 40 60'
|
||||||
|
default_source = param_source.ConstantSource
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate_val(cls, val):
|
def validate_val(cls, val):
|
||||||
@@ -659,6 +1106,7 @@ class MilenageXoringConstants(BinaryParam, AlgoConfig):
|
|||||||
' 00000000000000000000000000000002'
|
' 00000000000000000000000000000002'
|
||||||
' 00000000000000000000000000000004'
|
' 00000000000000000000000000000004'
|
||||||
' 00000000000000000000000000000008')
|
' 00000000000000000000000000000008')
|
||||||
|
default_source = param_source.ConstantSource
|
||||||
|
|
||||||
class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
|
class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
|
||||||
"""Number of iterations of Keccak-f[1600] permutation as recomended by Section 7.2 of 3GPP TS 35.231"""
|
"""Number of iterations of Keccak-f[1600] permutation as recomended by Section 7.2 of 3GPP TS 35.231"""
|
||||||
@@ -667,3 +1115,4 @@ class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
|
|||||||
min_val = 1
|
min_val = 1
|
||||||
max_val = 255
|
max_val = 255
|
||||||
example_input = '1'
|
example_input = '1'
|
||||||
|
default_source = param_source.ConstantSource
|
||||||
|
|||||||
+41
-3
@@ -226,9 +226,28 @@ class Icon(BER_TLV_IE, tag=0x94):
|
|||||||
_construct = GreedyBytes
|
_construct = GreedyBytes
|
||||||
class ProfileClass(BER_TLV_IE, tag=0x95):
|
class ProfileClass(BER_TLV_IE, tag=0x95):
|
||||||
_construct = Enum(Int8ub, test=0, provisioning=1, operational=2)
|
_construct = Enum(Int8ub, test=0, provisioning=1, operational=2)
|
||||||
|
class ProfilePolicyRules(BER_TLV_IE, tag=0x99):
|
||||||
|
_construct = GreedyBytes
|
||||||
|
class NotificationConfigurationInfo(BER_TLV_IE, tag=0xb6):
|
||||||
|
_construct = GreedyBytes
|
||||||
|
|
||||||
|
# ProfileOwner
|
||||||
|
class ProfileOwnerPLMN(BER_TLV_IE, tag=0x80):
|
||||||
|
_construct = PlmnAdapter(Bytes(3))
|
||||||
|
class ProfileOwnerGID1(BER_TLV_IE, tag=0x81):
|
||||||
|
_construct = GreedyBytes
|
||||||
|
class ProfileOwnerGID2(BER_TLV_IE, tag=0x82):
|
||||||
|
_construct = GreedyBytes
|
||||||
|
class ProfileOwner(BER_TLV_IE, tag=0xb7, nested=[ProfileOwnerPLMN, ProfileOwnerGID1, ProfileOwnerGID2]):
|
||||||
|
_construct = GreedyBytes
|
||||||
|
|
||||||
|
class SMDPPProprietaryData(BER_TLV_IE, tag=0xb8):
|
||||||
|
_construct = GreedyBytes
|
||||||
|
|
||||||
class ProfileInfo(BER_TLV_IE, tag=0xe3, nested=[Iccid, IsdpAid, ProfileState, ProfileNickname,
|
class ProfileInfo(BER_TLV_IE, tag=0xe3, nested=[Iccid, IsdpAid, ProfileState, ProfileNickname,
|
||||||
ServiceProviderName, ProfileName, IconType, Icon,
|
ServiceProviderName, ProfileName, IconType, Icon,
|
||||||
ProfileClass]): # FIXME: more IEs
|
ProfileClass, ProfilePolicyRules, NotificationConfigurationInfo,
|
||||||
|
ProfileOwner, SMDPPProprietaryData]):
|
||||||
pass
|
pass
|
||||||
class ProfileInfoSeq(BER_TLV_IE, tag=0xa0, nested=[ProfileInfo]):
|
class ProfileInfoSeq(BER_TLV_IE, tag=0xa0, nested=[ProfileInfo]):
|
||||||
pass
|
pass
|
||||||
@@ -444,9 +463,28 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
|
|||||||
d = rn.to_dict()
|
d = rn.to_dict()
|
||||||
self._cmd.poutput_json(flatten_dict_lists(d['notification_sent_resp']))
|
self._cmd.poutput_json(flatten_dict_lists(d['notification_sent_resp']))
|
||||||
|
|
||||||
def do_get_profiles_info(self, _opts):
|
get_profiles_info_parser = argparse.ArgumentParser()
|
||||||
|
get_profiles_info_parser.add_argument('--all', action='store_true', help='Retrieve all known tags of a profile')
|
||||||
|
|
||||||
|
@cmd2.with_argparser(get_profiles_info_parser)
|
||||||
|
def do_get_profiles_info(self, opts):
|
||||||
"""Perform an ES10c GetProfilesInfo function."""
|
"""Perform an ES10c GetProfilesInfo function."""
|
||||||
pi = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, ProfileInfoListReq(), ProfileInfoListResp)
|
if opts.all:
|
||||||
|
tags = [nest.tag for nest in ProfileInfo.nested_collection_cls().nested]
|
||||||
|
u8tags = []
|
||||||
|
# TODO: rework TagList to support 2 byte tags to not filter it into u8 tags
|
||||||
|
for tag in tags:
|
||||||
|
if tag <= 255:
|
||||||
|
u8tags.append(tag)
|
||||||
|
elif tag <= 65535:
|
||||||
|
u8tags.append(tag >> 8)
|
||||||
|
u8tags.append(tag & 0xff)
|
||||||
|
# Ignoring 3 byte tags
|
||||||
|
req = ProfileInfoListReq(children=[TagList(decoded=u8tags)])
|
||||||
|
else:
|
||||||
|
req = ProfileInfoListReq()
|
||||||
|
|
||||||
|
pi = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, req, ProfileInfoListResp)
|
||||||
d = pi.to_dict()
|
d = pi.to_dict()
|
||||||
self._cmd.poutput_json(flatten_dict_lists(d['profile_info_list_resp']))
|
self._cmd.poutput_json(flatten_dict_lists(d['profile_info_list_resp']))
|
||||||
|
|
||||||
|
|||||||
+5
-2
@@ -44,6 +44,7 @@ from pySim.utils import sw_match, decomposeATR
|
|||||||
from pySim.jsonpath import js_path_modify
|
from pySim.jsonpath import js_path_modify
|
||||||
from pySim.commands import SimCardCommands
|
from pySim.commands import SimCardCommands
|
||||||
from pySim.exceptions import SwMatchError
|
from pySim.exceptions import SwMatchError
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
|
||||||
# int: a single service is associated with this file
|
# int: a single service is associated with this file
|
||||||
# list: any of the listed services requires this file
|
# list: any of the listed services requires this file
|
||||||
@@ -52,6 +53,8 @@ CardFileService = Union[int, List[int], Tuple[int, ...]]
|
|||||||
|
|
||||||
Size = Tuple[int, Optional[int]]
|
Size = Tuple[int, Optional[int]]
|
||||||
|
|
||||||
|
log = PySimLogger.get(__name__)
|
||||||
|
|
||||||
class CardFile:
|
class CardFile:
|
||||||
"""Base class for all objects in the smart card filesystem.
|
"""Base class for all objects in the smart card filesystem.
|
||||||
Serve as a common ancestor to all other file types; rarely used directly.
|
Serve as a common ancestor to all other file types; rarely used directly.
|
||||||
@@ -1609,14 +1612,14 @@ class CardModel(abc.ABC):
|
|||||||
card_atr = scc.get_atr()
|
card_atr = scc.get_atr()
|
||||||
for atr in cls._atrs:
|
for atr in cls._atrs:
|
||||||
if atr == card_atr:
|
if atr == card_atr:
|
||||||
print("Detected CardModel:", cls.__name__)
|
log.info("Detected CardModel: %s", cls.__name__)
|
||||||
return True
|
return True
|
||||||
# if nothing found try to just compare the Historical Bytes of the ATR
|
# if nothing found try to just compare the Historical Bytes of the ATR
|
||||||
card_atr_hb = decomposeATR(card_atr)['hb']
|
card_atr_hb = decomposeATR(card_atr)['hb']
|
||||||
for atr in cls._atrs:
|
for atr in cls._atrs:
|
||||||
atr_hb = decomposeATR(atr)['hb']
|
atr_hb = decomposeATR(atr)['hb']
|
||||||
if atr_hb == card_atr_hb:
|
if atr_hb == card_atr_hb:
|
||||||
print("Detected CardModel:", cls.__name__)
|
log.info("Detected CardModel: %s", cls.__name__)
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@@ -27,9 +27,9 @@ from osmocom.utils import b2h
|
|||||||
from osmocom.tlv import bertlv_parse_len, bertlv_encode_len
|
from osmocom.tlv import bertlv_parse_len, bertlv_encode_len
|
||||||
from pySim.utils import parse_command_apdu
|
from pySim.utils import parse_command_apdu
|
||||||
from pySim.secure_channel import SecureChannel
|
from pySim.secure_channel import SecureChannel
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
log = PySimLogger.get(__name__)
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
def scp02_key_derivation(constant: bytes, counter: int, base_key: bytes) -> bytes:
|
def scp02_key_derivation(constant: bytes, counter: int, base_key: bytes) -> bytes:
|
||||||
assert len(constant) == 2
|
assert len(constant) == 2
|
||||||
@@ -75,7 +75,7 @@ class Scp02SessionKeys:
|
|||||||
h = e.encrypt(strxor(h, bytes(padded_data[8*i:8*(i+1)])))
|
h = e.encrypt(strxor(h, bytes(padded_data[8*i:8*(i+1)])))
|
||||||
h = d.decrypt(h)
|
h = d.decrypt(h)
|
||||||
h = e.encrypt(h)
|
h = e.encrypt(h)
|
||||||
logger.debug("mac_1des(%s,icv=%s) -> %s", b2h(data), b2h(icv), b2h(h))
|
log.debug("mac_1des(%s,icv=%s) -> %s", b2h(data), b2h(icv), b2h(h))
|
||||||
if self.des_icv_enc:
|
if self.des_icv_enc:
|
||||||
self.icv = self.des_icv_enc.encrypt(h)
|
self.icv = self.des_icv_enc.encrypt(h)
|
||||||
else:
|
else:
|
||||||
@@ -89,7 +89,7 @@ class Scp02SessionKeys:
|
|||||||
h = b'\x00' * 8
|
h = b'\x00' * 8
|
||||||
for i in range(q):
|
for i in range(q):
|
||||||
h = e.encrypt(strxor(h, bytes(padded_data[8*i:8*(i+1)])))
|
h = e.encrypt(strxor(h, bytes(padded_data[8*i:8*(i+1)])))
|
||||||
logger.debug("mac_3des(%s) -> %s", b2h(data), b2h(h))
|
log.debug("mac_3des(%s) -> %s", b2h(data), b2h(h))
|
||||||
return h
|
return h
|
||||||
|
|
||||||
def __init__(self, counter: int, card_keys: 'GpCardKeyset', icv_encrypt=True):
|
def __init__(self, counter: int, card_keys: 'GpCardKeyset', icv_encrypt=True):
|
||||||
@@ -276,10 +276,10 @@ class SCP02(SCP):
|
|||||||
return cipher.decrypt(ciphertext)
|
return cipher.decrypt(ciphertext)
|
||||||
|
|
||||||
def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes):
|
def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes):
|
||||||
logger.debug("host_challenge(%s), card_challenge(%s)", b2h(host_challenge), b2h(card_challenge))
|
log.debug("host_challenge(%s), card_challenge(%s)", b2h(host_challenge), b2h(card_challenge))
|
||||||
self.host_cryptogram = self.sk.calc_mac_3des(self.sk.counter.to_bytes(2, 'big') + card_challenge + host_challenge)
|
self.host_cryptogram = self.sk.calc_mac_3des(self.sk.counter.to_bytes(2, 'big') + card_challenge + host_challenge)
|
||||||
self.card_cryptogram = self.sk.calc_mac_3des(self.host_challenge + self.sk.counter.to_bytes(2, 'big') + card_challenge)
|
self.card_cryptogram = self.sk.calc_mac_3des(self.host_challenge + self.sk.counter.to_bytes(2, 'big') + card_challenge)
|
||||||
logger.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
|
log.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
|
||||||
|
|
||||||
def gen_init_update_apdu(self, host_challenge: bytes = b'\x00'*8) -> bytes:
|
def gen_init_update_apdu(self, host_challenge: bytes = b'\x00'*8) -> bytes:
|
||||||
"""Generate INITIALIZE UPDATE APDU."""
|
"""Generate INITIALIZE UPDATE APDU."""
|
||||||
@@ -291,7 +291,7 @@ class SCP02(SCP):
|
|||||||
resp = self.constr_iur.parse(resp_bin)
|
resp = self.constr_iur.parse(resp_bin)
|
||||||
self.card_challenge = resp['card_challenge']
|
self.card_challenge = resp['card_challenge']
|
||||||
self.sk = Scp02SessionKeys(resp['seq_counter'], self.card_keys)
|
self.sk = Scp02SessionKeys(resp['seq_counter'], self.card_keys)
|
||||||
logger.debug(self.sk)
|
log.debug(self.sk)
|
||||||
self._compute_cryptograms(self.card_challenge, self.host_challenge)
|
self._compute_cryptograms(self.card_challenge, self.host_challenge)
|
||||||
if self.card_cryptogram != resp['card_cryptogram']:
|
if self.card_cryptogram != resp['card_cryptogram']:
|
||||||
raise ValueError("card cryptogram doesn't match")
|
raise ValueError("card cryptogram doesn't match")
|
||||||
@@ -311,7 +311,7 @@ class SCP02(SCP):
|
|||||||
|
|
||||||
def _wrap_cmd_apdu(self, apdu: bytes, *args, **kwargs) -> bytes:
|
def _wrap_cmd_apdu(self, apdu: bytes, *args, **kwargs) -> bytes:
|
||||||
"""Wrap Command APDU for SCP02: calculate MAC and encrypt."""
|
"""Wrap Command APDU for SCP02: calculate MAC and encrypt."""
|
||||||
logger.debug("wrap_cmd_apdu(%s)", b2h(apdu))
|
log.debug("wrap_cmd_apdu(%s)", b2h(apdu))
|
||||||
|
|
||||||
if not self.do_cmac:
|
if not self.do_cmac:
|
||||||
return apdu
|
return apdu
|
||||||
@@ -378,7 +378,7 @@ def scp03_key_derivation(constant: bytes, context: bytes, base_key: bytes, l: Op
|
|||||||
if l is None:
|
if l is None:
|
||||||
l = len(base_key) * 8
|
l = len(base_key) * 8
|
||||||
|
|
||||||
logger.debug("scp03_kdf(constant=%s, context=%s, base_key=%s, l=%u)", b2h(constant), b2h(context), b2h(base_key), l)
|
log.debug("scp03_kdf(constant=%s, context=%s, base_key=%s, l=%u)", b2h(constant), b2h(context), b2h(base_key), l)
|
||||||
output_len = l // 8
|
output_len = l // 8
|
||||||
# SCP03 Section 4.1.5 defines a different parameter order than NIST SP 800-108, so we cannot use the
|
# SCP03 Section 4.1.5 defines a different parameter order than NIST SP 800-108, so we cannot use the
|
||||||
# existing Cryptodome.Protocol.KDF.SP800_108_Counter function :(
|
# existing Cryptodome.Protocol.KDF.SP800_108_Counter function :(
|
||||||
@@ -451,7 +451,7 @@ class Scp03SessionKeys:
|
|||||||
# This block SHALL be encrypted with S-ENC to produce the ICV for command encryption.
|
# This block SHALL be encrypted with S-ENC to produce the ICV for command encryption.
|
||||||
cipher = AES.new(self.s_enc, AES.MODE_CBC, iv)
|
cipher = AES.new(self.s_enc, AES.MODE_CBC, iv)
|
||||||
icv = cipher.encrypt(data)
|
icv = cipher.encrypt(data)
|
||||||
logger.debug("_get_icv(data=%s, is_resp=%s) -> icv=%s", b2h(data), is_response, b2h(icv))
|
log.debug("_get_icv(data=%s, is_resp=%s) -> icv=%s", b2h(data), is_response, b2h(icv))
|
||||||
return icv
|
return icv
|
||||||
|
|
||||||
# TODO: Resolve duplication with pySim.esim.bsp.BspAlgoCryptAES128 which provides pad80-wrapping
|
# TODO: Resolve duplication with pySim.esim.bsp.BspAlgoCryptAES128 which provides pad80-wrapping
|
||||||
@@ -489,12 +489,12 @@ class SCP03(SCP):
|
|||||||
return cipher.decrypt(ciphertext)
|
return cipher.decrypt(ciphertext)
|
||||||
|
|
||||||
def _compute_cryptograms(self):
|
def _compute_cryptograms(self):
|
||||||
logger.debug("host_challenge(%s), card_challenge(%s)", b2h(self.host_challenge), b2h(self.card_challenge))
|
log.debug("host_challenge(%s), card_challenge(%s)", b2h(self.host_challenge), b2h(self.card_challenge))
|
||||||
# Card + Host Authentication Cryptogram: Section 6.2.2.2 + 6.2.2.3
|
# Card + Host Authentication Cryptogram: Section 6.2.2.2 + 6.2.2.3
|
||||||
context = self.host_challenge + self.card_challenge
|
context = self.host_challenge + self.card_challenge
|
||||||
self.card_cryptogram = scp03_key_derivation(self.sk.DERIV_CONST_AUTH_CGRAM_CARD, context, self.sk.s_mac, l=self.s_mode*8)
|
self.card_cryptogram = scp03_key_derivation(self.sk.DERIV_CONST_AUTH_CGRAM_CARD, context, self.sk.s_mac, l=self.s_mode*8)
|
||||||
self.host_cryptogram = scp03_key_derivation(self.sk.DERIV_CONST_AUTH_CGRAM_HOST, context, self.sk.s_mac, l=self.s_mode*8)
|
self.host_cryptogram = scp03_key_derivation(self.sk.DERIV_CONST_AUTH_CGRAM_HOST, context, self.sk.s_mac, l=self.s_mode*8)
|
||||||
logger.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
|
log.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
|
||||||
|
|
||||||
def gen_init_update_apdu(self, host_challenge: Optional[bytes] = None) -> bytes:
|
def gen_init_update_apdu(self, host_challenge: Optional[bytes] = None) -> bytes:
|
||||||
"""Generate INITIALIZE UPDATE APDU."""
|
"""Generate INITIALIZE UPDATE APDU."""
|
||||||
@@ -514,7 +514,7 @@ class SCP03(SCP):
|
|||||||
self.i_param = resp['i_param']
|
self.i_param = resp['i_param']
|
||||||
# derive session keys and compute cryptograms
|
# derive session keys and compute cryptograms
|
||||||
self.sk = Scp03SessionKeys(self.card_keys, self.host_challenge, self.card_challenge)
|
self.sk = Scp03SessionKeys(self.card_keys, self.host_challenge, self.card_challenge)
|
||||||
logger.debug(self.sk)
|
log.debug(self.sk)
|
||||||
self._compute_cryptograms()
|
self._compute_cryptograms()
|
||||||
# verify computed cryptogram matches received cryptogram
|
# verify computed cryptogram matches received cryptogram
|
||||||
if self.card_cryptogram != resp['card_cryptogram']:
|
if self.card_cryptogram != resp['card_cryptogram']:
|
||||||
@@ -529,7 +529,7 @@ class SCP03(SCP):
|
|||||||
|
|
||||||
def _wrap_cmd_apdu(self, apdu: bytes, skip_cenc: bool = False) -> bytes:
|
def _wrap_cmd_apdu(self, apdu: bytes, skip_cenc: bool = False) -> bytes:
|
||||||
"""Wrap Command APDU for SCP03: calculate MAC and encrypt."""
|
"""Wrap Command APDU for SCP03: calculate MAC and encrypt."""
|
||||||
logger.debug("wrap_cmd_apdu(%s)", b2h(apdu))
|
log.debug("wrap_cmd_apdu(%s)", b2h(apdu))
|
||||||
|
|
||||||
if not self.do_cmac:
|
if not self.do_cmac:
|
||||||
return apdu
|
return apdu
|
||||||
@@ -584,7 +584,7 @@ class SCP03(SCP):
|
|||||||
# status word: in this case only the status word shall be returned in the response. All status words
|
# status word: in this case only the status word shall be returned in the response. All status words
|
||||||
# except '9000' and warning status words (i.e. '62xx' and '63xx') shall be interpreted as error status
|
# except '9000' and warning status words (i.e. '62xx' and '63xx') shall be interpreted as error status
|
||||||
# words.
|
# words.
|
||||||
logger.debug("unwrap_rsp_apdu(sw=%s, rsp_apdu=%s)", sw, rsp_apdu)
|
log.debug("unwrap_rsp_apdu(sw=%s, rsp_apdu=%s)", sw, rsp_apdu)
|
||||||
if not self.do_rmac:
|
if not self.do_rmac:
|
||||||
assert not self.do_renc
|
assert not self.do_renc
|
||||||
return rsp_apdu
|
return rsp_apdu
|
||||||
@@ -600,9 +600,9 @@ class SCP03(SCP):
|
|||||||
if self.do_renc:
|
if self.do_renc:
|
||||||
# decrypt response data
|
# decrypt response data
|
||||||
decrypted = self.sk._decrypt(response_data)
|
decrypted = self.sk._decrypt(response_data)
|
||||||
logger.debug("decrypted: %s", b2h(decrypted))
|
log.debug("decrypted: %s", b2h(decrypted))
|
||||||
# remove padding
|
# remove padding
|
||||||
response_data = unpad80(decrypted)
|
response_data = unpad80(decrypted)
|
||||||
logger.debug("response_data: %s", b2h(response_data))
|
log.debug("response_data: %s", b2h(response_data))
|
||||||
|
|
||||||
return response_data
|
return response_data
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
|||||||
|
|
||||||
# Key Usage:
|
# Key Usage:
|
||||||
# KVN 0x01 .. 0x0F reserved for SCP80
|
# KVN 0x01 .. 0x0F reserved for SCP80
|
||||||
|
# KVN 0x81 .. 0x8f reserved for SCP81
|
||||||
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
|
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
|
||||||
# KVN 0x20 .. 0x2F reserved for SCP02
|
# KVN 0x20 .. 0x2F reserved for SCP02
|
||||||
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK
|
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK
|
||||||
|
|||||||
@@ -152,7 +152,8 @@ class SimCard(SimCardBase):
|
|||||||
return sw
|
return sw
|
||||||
|
|
||||||
def update_smsp(self, smsp):
|
def update_smsp(self, smsp):
|
||||||
data, sw = self._scc.update_record(EF['SMSP'], 1, rpad(smsp, 84))
|
print("using update_smsp")
|
||||||
|
data, sw = self._scc.update_record(EF['SMSP'], 1, smsp, leftpad=True)
|
||||||
return sw
|
return sw
|
||||||
|
|
||||||
def update_ad(self, mnc=None, opmode=None, ofm=None, path=EF['AD']):
|
def update_ad(self, mnc=None, opmode=None, ofm=None, path=EF['AD']):
|
||||||
|
|||||||
+1
-1
@@ -44,7 +44,7 @@ class PySimLogger:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
LOG_FMTSTR = "%(levelname)s: %(message)s"
|
LOG_FMTSTR = "%(levelname)s: %(message)s"
|
||||||
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- " + LOG_FMTSTR
|
LOG_FMTSTR_VERBOSE = "%(name)s.%(lineno)d -- " + LOG_FMTSTR
|
||||||
__formatter = logging.Formatter(LOG_FMTSTR)
|
__formatter = logging.Formatter(LOG_FMTSTR)
|
||||||
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
|
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
|
||||||
|
|
||||||
|
|||||||
+46
-16
@@ -301,24 +301,54 @@ class LinkBaseTpdu(LinkBase):
|
|||||||
|
|
||||||
prev_tpdu = tpdu
|
prev_tpdu = tpdu
|
||||||
data, sw = self.send_tpdu(tpdu)
|
data, sw = self.send_tpdu(tpdu)
|
||||||
|
log.debug("T0: case #%u TPDU: %s => %s %s", case, tpdu, data or "(no data)", sw or "(no status word)")
|
||||||
|
if sw is None:
|
||||||
|
raise ValueError("no status word received")
|
||||||
|
|
||||||
# When we have sent the first APDU, the SW may indicate that there are response bytes
|
# After sending the APDU/TPDU the UICC/eUICC or SIM may response with a status word that indicates that further
|
||||||
# available. There are two SWs commonly used for this 9fxx (sim) and 61xx (usim), where
|
# TPDUs have to be sent in order to complete the task.
|
||||||
# xx is the number of response bytes available.
|
if case == 4 or self.apdu_strict == False:
|
||||||
# See also:
|
# In case the APDU is a case #4 APDU, the UICC/eUICC/SIM may indicate that there is response data
|
||||||
if sw is not None:
|
# available which has to be retrieved using a GET RESPONSE command TPDU.
|
||||||
while (sw[0:2] in ['9f', '61', '62', '63']):
|
#
|
||||||
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
|
# ETSI TS 102 221, section 7.3.1.1.4 is very cleare about the fact that the GET RESPONSE mechanism
|
||||||
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
|
# shall only apply on case #4 APDUs but unfortunately it is impossible to distinguish between case #3
|
||||||
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
|
# and case #4 when the APDU format is not strictly followed. In order to be able to detect case #4
|
||||||
tpdu_gr = tpdu[0:2] + 'c00000' + sw[2:4]
|
# correctly the Le byte (usually 0x00) must be present, is often forgotten. To avoid problems with
|
||||||
|
# legacy scripts that use raw APDU strings, we will still loosely apply GET RESPONSE based on what
|
||||||
|
# the status word indicates. Unless the user explicitly enables the strict mode (set apdu_strict true)
|
||||||
|
while True:
|
||||||
|
if sw in ['9000', '9100']:
|
||||||
|
# A status word of 9000 (or 9100 in case there is pending data from a proactive SIM command)
|
||||||
|
# indicates that either no response data was returnd or all response data has been retrieved
|
||||||
|
# successfully. We may discontinue the processing at this point.
|
||||||
|
break;
|
||||||
|
if sw[0:2] in ['61', '9f']:
|
||||||
|
# A status word of 61xx or 9fxx indicates that there is (still) response data available. We
|
||||||
|
# send a GET RESPONSE command with the length value indicated in the second byte of the status
|
||||||
|
# word. (see also ETSI TS 102 221, section 7.3.1.1.4, clause 4a and 3GPP TS 51.011 9.4.1 and
|
||||||
|
# ISO/IEC 7816-4, Table 5)
|
||||||
|
le_gr = sw[2:4]
|
||||||
|
elif sw[0:2] in ['62', '63']:
|
||||||
|
# There are corner cases (status word is 62xx or 63xx) where the UICC/eUICC/SIM asks us
|
||||||
|
# to send a dummy GET RESPONSE command. We send a GET RESPONSE command with a length of 0.
|
||||||
|
# (see also ETSI TS 102 221, section 7.3.1.1.4, clause 4b and ETSI TS 151 011, section 9.4.1)
|
||||||
|
le_gr = '00'
|
||||||
|
else:
|
||||||
|
# A status word other then the ones covered by the above logic may indicate an error. In this
|
||||||
|
# case we will discontinue the processing as well.
|
||||||
|
# (see also ETSI TS 102 221, section 7.3.1.1.4, clause 4c)
|
||||||
|
break
|
||||||
|
tpdu_gr = tpdu[0:2] + 'c00000' + le_gr
|
||||||
prev_tpdu = tpdu_gr
|
prev_tpdu = tpdu_gr
|
||||||
d, sw = self.send_tpdu(tpdu_gr)
|
data_gr, sw = self.send_tpdu(tpdu_gr)
|
||||||
data += d
|
log.debug("T0: GET RESPONSE TPDU: %s => %s %s", tpdu_gr, data_gr or "(no data)", sw or "(no status word)")
|
||||||
if sw[0:2] == '6c':
|
data += data_gr
|
||||||
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
|
if sw[0:2] == '6c':
|
||||||
tpdu_gr = prev_tpdu[0:8] + sw[2:4]
|
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
|
||||||
data, sw = self.send_tpdu(tpdu_gr)
|
tpdu_gr = prev_tpdu[0:8] + sw[2:4]
|
||||||
|
data, sw = self.send_tpdu(tpdu_gr)
|
||||||
|
log.debug("T0: repated case #%u TPDU: %s => %s %s", case, tpdu_gr, data or "(no data)", sw or "(no status word)")
|
||||||
|
|
||||||
return data, sw
|
return data, sw
|
||||||
|
|
||||||
|
|||||||
+51
-8
@@ -251,6 +251,16 @@ class EF_SMSP(LinFixedEF):
|
|||||||
"numbering_plan_id": "isdn_e164" },
|
"numbering_plan_id": "isdn_e164" },
|
||||||
"call_number": "4915790109999" },
|
"call_number": "4915790109999" },
|
||||||
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
|
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
|
||||||
|
( 'e1ffffffffffffffffffffffff0891945197109099f9ffffff0000a9',
|
||||||
|
{ "alpha_id": "", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
|
||||||
|
"tp_pid": True, "tp_dcs": True, "tp_vp": True },
|
||||||
|
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
|
||||||
|
"numbering_plan_id": "reserved_for_extension" },
|
||||||
|
"call_number": "" },
|
||||||
|
"tp_sc_addr": { "length": 8, "ton_npi": { "ext": True, "type_of_number": "international",
|
||||||
|
"numbering_plan_id": "isdn_e164" },
|
||||||
|
"call_number": "4915790109999" },
|
||||||
|
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
|
||||||
( '454e6574776f726b73fffffffffffffff1ffffffffffffffffffffffffffffffffffffffffffffffff0000a7',
|
( '454e6574776f726b73fffffffffffffff1ffffffffffffffffffffffffffffffffffffffffffffffff0000a7',
|
||||||
{ "alpha_id": "ENetworks", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
|
{ "alpha_id": "ENetworks", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
|
||||||
"tp_pid": True, "tp_dcs": True, "tp_vp": False },
|
"tp_pid": True, "tp_dcs": True, "tp_vp": False },
|
||||||
@@ -331,7 +341,8 @@ class EF_SMSP(LinFixedEF):
|
|||||||
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
|
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
|
||||||
DestAddr = Struct('length'/Rebuild(Int8ub, lambda ctx: EF_SMSP.dest_addr_len(ctx)),
|
DestAddr = Struct('length'/Rebuild(Int8ub, lambda ctx: EF_SMSP.dest_addr_len(ctx)),
|
||||||
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
|
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
|
||||||
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))),
|
# (see comment below)
|
||||||
|
self._construct = Struct('alpha_id'/GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28))),
|
||||||
'parameter_indicators'/InvertAdapter(BitStruct(
|
'parameter_indicators'/InvertAdapter(BitStruct(
|
||||||
Const(7, BitsInteger(3)),
|
Const(7, BitsInteger(3)),
|
||||||
'tp_vp'/Flag,
|
'tp_vp'/Flag,
|
||||||
@@ -345,6 +356,25 @@ class EF_SMSP(LinFixedEF):
|
|||||||
'tp_dcs'/Bytes(1),
|
'tp_dcs'/Bytes(1),
|
||||||
'tp_vp_minutes'/EF_SMSP.ValidityPeriodAdapter(Byte))
|
'tp_vp_minutes'/EF_SMSP.ValidityPeriodAdapter(Byte))
|
||||||
|
|
||||||
|
# Ensure 'alpha_id' is always present
|
||||||
|
def encode_record_hex(self, abstract_data: dict, record_nr: int, total_len: int = None) -> str:
|
||||||
|
# Problem: TS 51.011 Section 10.5.6 describes the 'alpha_id' field as optional. However, this is only true
|
||||||
|
# at the time when the record length of the file is set up in the file system. A card manufacturer may decide
|
||||||
|
# to remove the field by setting the record length to 28. Likewise, the card manaufacturer may also decide to
|
||||||
|
# set the field to a distinct length by setting the record length to a value greater than 28 (e.g. 14 bytes
|
||||||
|
# 'alpha_id' + 28 bytes). Due to the fixed nature of the record length, this eventually means that in practice
|
||||||
|
# 'alpha_id' is a mandatory field with a fixed length.
|
||||||
|
#
|
||||||
|
# Due to the problematic specification of 'alpha_id' as a pseudo-optional field at the beginning of a
|
||||||
|
# fixed-size memory, the construct definition in self._construct has been incorrectly implemented and the field
|
||||||
|
# has been marked as COptional. We may correct the problem by removing COptional. But to maintain compatibility,
|
||||||
|
# we then have to ensure that in case the field is not provided (None), it is set to an empty string ('').
|
||||||
|
#
|
||||||
|
# See also ts_31_102.py, class EF_OCI for a correct example.
|
||||||
|
if abstract_data['alpha_id'] is None:
|
||||||
|
abstract_data['alpha_id'] = ''
|
||||||
|
return super().encode_record_hex(abstract_data, record_nr, total_len)
|
||||||
|
|
||||||
# TS 51.011 Section 10.5.7
|
# TS 51.011 Section 10.5.7
|
||||||
class EF_SMSS(TransparentEF):
|
class EF_SMSS(TransparentEF):
|
||||||
class MemCapAdapter(Adapter):
|
class MemCapAdapter(Adapter):
|
||||||
@@ -1233,9 +1263,11 @@ class CardProfileSIM(CardProfile):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def decode_select_response(resp_hex: str) -> object:
|
def decode_select_response(resp_hex: str) -> object:
|
||||||
# we try to build something that resembles a dict resulting from the TLV decoder
|
"""
|
||||||
# of TS 102.221 (FcpTemplate), so that higher-level code only has to deal with one
|
Decode the select response to a dict representation, similar to the one of TS 102.221 (see ts_102_221.py,
|
||||||
# format of SELECT response
|
class FcpTemplate), so that higher-level code only has to deal with one respresentation. See also
|
||||||
|
3GPP TS 51.011, section 9.2.1
|
||||||
|
"""
|
||||||
resp_bin = h2b(resp_hex)
|
resp_bin = h2b(resp_hex)
|
||||||
struct_of_file_map = {
|
struct_of_file_map = {
|
||||||
0: 'transparent',
|
0: 'transparent',
|
||||||
@@ -1273,13 +1305,24 @@ class CardProfileSIM(CardProfile):
|
|||||||
record_len = resp_bin[14]
|
record_len = resp_bin[14]
|
||||||
ret['file_descriptor']['record_len'] = record_len
|
ret['file_descriptor']['record_len'] = record_len
|
||||||
ret['file_descriptor']['num_of_rec'] = ret['file_size'] // record_len
|
ret['file_descriptor']['num_of_rec'] = ret['file_size'] // record_len
|
||||||
ret['access_conditions'] = b2h(resp_bin[8:10])
|
ret['access_conditions'] = b2h(resp_bin[8:11])
|
||||||
if resp_bin[11] & 0x01 == 0:
|
|
||||||
|
# Life cycle status integer, see also ETSI TS 102 221, table 11.7b
|
||||||
|
lcsi = resp_bin[11]
|
||||||
|
if lcsi == 0x00:
|
||||||
|
ret['life_cycle_status_int'] = 'no_information'
|
||||||
|
elif lcsi == 0x01:
|
||||||
|
ret['life_cycle_status_int'] = 'creation'
|
||||||
|
elif lcsi == 0x03:
|
||||||
|
ret['life_cycle_status_int'] = 'initialization'
|
||||||
|
elif lcsi & 0xFD == 0x05:
|
||||||
ret['life_cycle_status_int'] = 'operational_activated'
|
ret['life_cycle_status_int'] = 'operational_activated'
|
||||||
elif resp_bin[11] & 0x04:
|
elif lcsi & 0xFD == 0x04:
|
||||||
ret['life_cycle_status_int'] = 'operational_deactivated'
|
ret['life_cycle_status_int'] = 'operational_deactivated'
|
||||||
|
elif lcsi & 0xFC == 0x0C:
|
||||||
|
ret['life_cycle_status_int'] = 'termination'
|
||||||
else:
|
else:
|
||||||
ret['life_cycle_status_int'] = 'terminated'
|
ret['life_cycle_status_int'] = lcsi
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -4,3 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[tool.pylint.main]
|
[tool.pylint.main]
|
||||||
ignored-classes = ["twisted.internet.reactor"]
|
ignored-classes = ["twisted.internet.reactor"]
|
||||||
|
|
||||||
|
[tool.pylint.TYPECHECK]
|
||||||
|
# SdKey subclasses are generated dynamically via SdKey.generate_sd_key_classes()
|
||||||
|
generated-members = ["SdKey[A-Za-z0-9]+"]
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ ICCID: 8988219000000117833
|
|||||||
IMSI: 001010000000111
|
IMSI: 001010000000111
|
||||||
GID1: ffffffffffffffff
|
GID1: ffffffffffffffff
|
||||||
GID2: ffffffffffffffff
|
GID2: ffffffffffffffff
|
||||||
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
|
SMSP: ffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
|
||||||
SMSC: 0015555
|
SMSC: 0015555
|
||||||
SPN: Fairwaves
|
SPN: Fairwaves
|
||||||
Show in HPLMN: False
|
Show in HPLMN: False
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ ICCID: 89445310150011013678
|
|||||||
IMSI: 001010000000102
|
IMSI: 001010000000102
|
||||||
GID1: Can't read file -- SW match failed! Expected 9000 and got 6a82.
|
GID1: Can't read file -- SW match failed! Expected 9000 and got 6a82.
|
||||||
GID2: Can't read file -- SW match failed! Expected 9000 and got 6a82.
|
GID2: Can't read file -- SW match failed! Expected 9000 and got 6a82.
|
||||||
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
|
SMSP: ffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
|
||||||
SMSC: 0015555
|
SMSC: 0015555
|
||||||
SPN: wavemobile
|
SPN: wavemobile
|
||||||
Show in HPLMN: False
|
Show in HPLMN: False
|
||||||
|
|||||||
@@ -7,10 +7,24 @@ set apdu_strict true
|
|||||||
# No command data field, No response data field present
|
# No command data field, No response data field present
|
||||||
apdu 00700001 --expect-sw 9000 --expect-response-regex '^$'
|
apdu 00700001 --expect-sw 9000 --expect-response-regex '^$'
|
||||||
|
|
||||||
|
# Case #1: (verify pin)
|
||||||
|
# This command returns the number of remaining authentication attempts in the
|
||||||
|
# form of a status that has the form 63cX, where X is the number of remaining
|
||||||
|
# attempts. Such a status word can be easily confused with the response to a
|
||||||
|
# case #4 APDU. This test checks if the transport layer correctly distinguishes
|
||||||
|
# the between APDU case #1 and APDU case #4.
|
||||||
|
apdu 0020000A --expect-sw 63c? --expect-response-regex '^$'
|
||||||
|
|
||||||
# Case #2: (status)
|
# Case #2: (status)
|
||||||
# No command data field, Response data field present
|
# No command data field, Response data field present
|
||||||
apdu 80F2000000 --expect-sw 9000 --expect-response-regex '^[a-fA-F0-9]+$'
|
apdu 80F2000000 --expect-sw 9000 --expect-response-regex '^[a-fA-F0-9]+$'
|
||||||
|
|
||||||
|
# Case #2: (verify pin)
|
||||||
|
# (see also above). This test checks if the transport layer is also able to
|
||||||
|
# distinguish correctly between APDU case #2 (with zero length response) and
|
||||||
|
# APDU case #4.
|
||||||
|
apdu 0020000A00 --expect-sw 63c? --expect-response-regex '^$'
|
||||||
|
|
||||||
# Case #3: (terminal capability)
|
# Case #3: (terminal capability)
|
||||||
# Command data field present, No response data field
|
# Command data field present, No response data field
|
||||||
apdu 80AA000005a903830180 --expect-sw 9000 --expect-response-regex '^$'
|
apdu 80AA000005a903830180 --expect-sw 9000 --expect-response-regex '^$'
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
# Utility to verify the functionality of pySim-trace.py
|
# Utility to verify the functionality of pySim-smpp2sim.py
|
||||||
#
|
#
|
||||||
# (C) 2026 by sysmocom - s.f.m.c. GmbH
|
# (C) 2026 by sysmocom - s.f.m.c. GmbH
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
|
|||||||
Symlink
+1
@@ -0,0 +1 @@
|
|||||||
|
../../smdpp-data
|
||||||
@@ -20,7 +20,8 @@ class TestCardKeyProviderCsv(unittest.TestCase):
|
|||||||
"KIK3" : "00010204040506070809488B0C0D0E0F"}
|
"KIK3" : "00010204040506070809488B0C0D0E0F"}
|
||||||
|
|
||||||
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
|
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
|
||||||
card_key_provider_register(CardKeyProviderCsv(csv_file_path, column_keys))
|
card_key_field_cryptor = CardKeyFieldCryptor(column_keys)
|
||||||
|
card_key_provider_register(CardKeyProviderCsv(csv_file_path, card_key_field_cryptor))
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
def test_card_key_provider_get(self):
|
def test_card_key_provider_get(self):
|
||||||
|
|||||||
+633
@@ -0,0 +1,633 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||||
|
#
|
||||||
|
# Author: Neels Hofmeyr
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import enum
|
||||||
|
import io
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from importlib import resources
|
||||||
|
from osmocom.utils import hexstr
|
||||||
|
from pySim.esim.saip import ProfileElementSequence
|
||||||
|
import pySim.esim.saip.personalization as p13n
|
||||||
|
import smdpp_data.upp
|
||||||
|
|
||||||
|
import xo
|
||||||
|
update_expected_output = False
|
||||||
|
|
||||||
|
def valstr(val):
|
||||||
|
if isinstance(val, io.BytesIO):
|
||||||
|
val = val.getvalue()
|
||||||
|
if isinstance(val, bytearray):
|
||||||
|
val = bytes(val)
|
||||||
|
return f'{val!r}'
|
||||||
|
|
||||||
|
def valtypestr(val):
|
||||||
|
if isinstance(val, dict):
|
||||||
|
types = []
|
||||||
|
for v in val.values():
|
||||||
|
types.append(f'{type(v).__name__}')
|
||||||
|
|
||||||
|
val_type = '{' + ', '.join(types) + '}'
|
||||||
|
else:
|
||||||
|
val_type = f'{type(val).__name__}'
|
||||||
|
return f'{valstr(val)}:{val_type}'
|
||||||
|
|
||||||
|
class ConfigurableParameterTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_parameters(self):
|
||||||
|
|
||||||
|
upp_fnames = (
|
||||||
|
'TS48v5_SAIP2.1A_NoBERTLV.der',
|
||||||
|
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
|
||||||
|
'TS48v5_SAIP2.1B_NoBERTLV.der',
|
||||||
|
'TS48v5_SAIP2.3_NoBERTLV.der',
|
||||||
|
)
|
||||||
|
|
||||||
|
class Paramtest:
|
||||||
|
def __init__(self, param_cls, val, expect_val, expect_clean_val=None):
|
||||||
|
self.param_cls = param_cls
|
||||||
|
self.val = val
|
||||||
|
self.expect_clean_val = expect_clean_val
|
||||||
|
self.expect_val = expect_val
|
||||||
|
|
||||||
|
param_tests = [
|
||||||
|
Paramtest(param_cls=p13n.Imsi, val='123456',
|
||||||
|
expect_clean_val=str('123456'),
|
||||||
|
expect_val={'IMSI': hexstr('123456'),
|
||||||
|
'IMSI-ACC': '0040'}),
|
||||||
|
Paramtest(param_cls=p13n.Imsi, val=int(123456),
|
||||||
|
expect_val={'IMSI': hexstr('123456'),
|
||||||
|
'IMSI-ACC': '0040'}),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.Imsi, val='123456789012345',
|
||||||
|
expect_clean_val=str('123456789012345'),
|
||||||
|
expect_val={'IMSI': hexstr('123456789012345'),
|
||||||
|
'IMSI-ACC': '0020'}),
|
||||||
|
Paramtest(param_cls=p13n.Imsi, val=int(123456789012345),
|
||||||
|
expect_val={'IMSI': hexstr('123456789012345'),
|
||||||
|
'IMSI-ACC': '0020'}),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.Puk1,
|
||||||
|
val='12345678',
|
||||||
|
expect_clean_val=b'12345678',
|
||||||
|
expect_val='12345678'),
|
||||||
|
Paramtest(param_cls=p13n.Puk1,
|
||||||
|
val=int(12345678),
|
||||||
|
expect_clean_val=b'12345678',
|
||||||
|
expect_val='12345678'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.Puk2,
|
||||||
|
val='12345678',
|
||||||
|
expect_clean_val=b'12345678',
|
||||||
|
expect_val='12345678'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.Pin1,
|
||||||
|
val='1234',
|
||||||
|
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||||
|
expect_val='1234'),
|
||||||
|
Paramtest(param_cls=p13n.Pin1,
|
||||||
|
val='123456',
|
||||||
|
expect_clean_val=b'123456\xff\xff',
|
||||||
|
expect_val='123456'),
|
||||||
|
Paramtest(param_cls=p13n.Pin1,
|
||||||
|
val='12345678',
|
||||||
|
expect_clean_val=b'12345678',
|
||||||
|
expect_val='12345678'),
|
||||||
|
Paramtest(param_cls=p13n.Pin1,
|
||||||
|
val=int(1234),
|
||||||
|
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||||
|
expect_val='1234'),
|
||||||
|
Paramtest(param_cls=p13n.Pin1,
|
||||||
|
val=int(123456),
|
||||||
|
expect_clean_val=b'123456\xff\xff',
|
||||||
|
expect_val='123456'),
|
||||||
|
Paramtest(param_cls=p13n.Pin1,
|
||||||
|
val=int(12345678),
|
||||||
|
expect_clean_val=b'12345678',
|
||||||
|
expect_val='12345678'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.Adm1,
|
||||||
|
val='1234',
|
||||||
|
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||||
|
expect_val='1234'),
|
||||||
|
Paramtest(param_cls=p13n.Adm1,
|
||||||
|
val='123456',
|
||||||
|
expect_clean_val=b'123456\xff\xff',
|
||||||
|
expect_val='123456'),
|
||||||
|
Paramtest(param_cls=p13n.Adm1,
|
||||||
|
val='12345678',
|
||||||
|
expect_clean_val=b'12345678',
|
||||||
|
expect_val='12345678'),
|
||||||
|
Paramtest(param_cls=p13n.Adm1,
|
||||||
|
val=int(123456),
|
||||||
|
expect_clean_val=b'123456\xff\xff',
|
||||||
|
expect_val='123456'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.AlgorithmID,
|
||||||
|
val='Milenage',
|
||||||
|
expect_clean_val=1,
|
||||||
|
expect_val='Milenage'),
|
||||||
|
Paramtest(param_cls=p13n.AlgorithmID,
|
||||||
|
val='TUAK',
|
||||||
|
expect_clean_val=2,
|
||||||
|
expect_val='TUAK'),
|
||||||
|
Paramtest(param_cls=p13n.AlgorithmID,
|
||||||
|
val='usim-test',
|
||||||
|
expect_clean_val=3,
|
||||||
|
expect_val='usim_test'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.AlgorithmID,
|
||||||
|
val=1,
|
||||||
|
expect_clean_val=1,
|
||||||
|
expect_val='Milenage'),
|
||||||
|
Paramtest(param_cls=p13n.AlgorithmID,
|
||||||
|
val=2,
|
||||||
|
expect_clean_val=2,
|
||||||
|
expect_val='TUAK'),
|
||||||
|
Paramtest(param_cls=p13n.AlgorithmID,
|
||||||
|
val=3,
|
||||||
|
expect_clean_val=3,
|
||||||
|
expect_val='usim_test'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.K,
|
||||||
|
val='01020304050607080910111213141516',
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
Paramtest(param_cls=p13n.K,
|
||||||
|
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
Paramtest(param_cls=p13n.K,
|
||||||
|
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
Paramtest(param_cls=p13n.K,
|
||||||
|
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
Paramtest(param_cls=p13n.K,
|
||||||
|
val=int(11020304050607080910111213141516),
|
||||||
|
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='11020304050607080910111213141516'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.Opc,
|
||||||
|
val='01020304050607080910111213141516',
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
Paramtest(param_cls=p13n.Opc,
|
||||||
|
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
Paramtest(param_cls=p13n.Opc,
|
||||||
|
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
Paramtest(param_cls=p13n.Opc,
|
||||||
|
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||||
|
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||||
|
expect_val='01020304050607080910111213141516'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.SmspTpScAddr,
|
||||||
|
val='+1234567',
|
||||||
|
expect_clean_val=(True, '1234567'),
|
||||||
|
expect_val='+1234567'),
|
||||||
|
Paramtest(param_cls=p13n.SmspTpScAddr,
|
||||||
|
val=1234567,
|
||||||
|
expect_clean_val=(False, '1234567'),
|
||||||
|
expect_val='1234567'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
|
||||||
|
val='123',
|
||||||
|
expect_clean_val=123,
|
||||||
|
expect_val='123'),
|
||||||
|
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
|
||||||
|
val=123,
|
||||||
|
expect_clean_val=123,
|
||||||
|
expect_val='123'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.MilenageRotationConstants,
|
||||||
|
val='0a 0b 0c 01 02',
|
||||||
|
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
|
||||||
|
expect_val='0a0b0c0102'),
|
||||||
|
Paramtest(param_cls=p13n.MilenageRotationConstants,
|
||||||
|
val=b'\x0a\x0b\x0c\x01\x02',
|
||||||
|
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
|
||||||
|
expect_val='0a0b0c0102'),
|
||||||
|
Paramtest(param_cls=p13n.MilenageRotationConstants,
|
||||||
|
val=bytearray(b'\x0a\x0b\x0c\x01\x02'),
|
||||||
|
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
|
||||||
|
expect_val='0a0b0c0102'),
|
||||||
|
|
||||||
|
Paramtest(param_cls=p13n.MilenageXoringConstants,
|
||||||
|
val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
||||||
|
' bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
|
||||||
|
' cccccccccccccccccccccccccccccccc'
|
||||||
|
' 11111111111111111111111111111111'
|
||||||
|
' 22222222222222222222222222222222',
|
||||||
|
expect_clean_val=b'\xaa' * 16
|
||||||
|
+ b'\xbb' * 16
|
||||||
|
+ b'\xcc' * 16
|
||||||
|
+ b'\x11' * 16
|
||||||
|
+ b'\x22' * 16,
|
||||||
|
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
||||||
|
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
|
||||||
|
'cccccccccccccccccccccccccccccccc'
|
||||||
|
'11111111111111111111111111111111'
|
||||||
|
'22222222222222222222222222222222'),
|
||||||
|
Paramtest(param_cls=p13n.MilenageXoringConstants,
|
||||||
|
val=b'\xaa' * 16
|
||||||
|
+ b'\xbb' * 16
|
||||||
|
+ b'\xcc' * 16
|
||||||
|
+ b'\x11' * 16
|
||||||
|
+ b'\x22' * 16,
|
||||||
|
expect_clean_val=b'\xaa' * 16
|
||||||
|
+ b'\xbb' * 16
|
||||||
|
+ b'\xcc' * 16
|
||||||
|
+ b'\x11' * 16
|
||||||
|
+ b'\x22' * 16,
|
||||||
|
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
||||||
|
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
|
||||||
|
'cccccccccccccccccccccccccccccccc'
|
||||||
|
'11111111111111111111111111111111'
|
||||||
|
'22222222222222222222222222222222'),
|
||||||
|
|
||||||
|
]
|
||||||
|
|
||||||
|
for sdkey_cls in (
|
||||||
|
# thin out the number of tests, as a compromise between completeness and test runtime
|
||||||
|
p13n.SdKeyScp02Kvn20AesDek,
|
||||||
|
#p13n.SdKeyScp02Kvn20AesEnc,
|
||||||
|
#p13n.SdKeyScp02Kvn20AesMac,
|
||||||
|
#p13n.SdKeyScp02Kvn21AesDek,
|
||||||
|
p13n.SdKeyScp02Kvn21AesEnc,
|
||||||
|
#p13n.SdKeyScp02Kvn21AesMac,
|
||||||
|
#p13n.SdKeyScp02Kvn22AesDek,
|
||||||
|
#p13n.SdKeyScp02Kvn22AesEnc,
|
||||||
|
p13n.SdKeyScp02Kvn22AesMac,
|
||||||
|
#p13n.SdKeyScp02KvnffAesDek,
|
||||||
|
#p13n.SdKeyScp02KvnffAesEnc,
|
||||||
|
#p13n.SdKeyScp02KvnffAesMac,
|
||||||
|
p13n.SdKeyScp03Kvn30AesDek,
|
||||||
|
#p13n.SdKeyScp03Kvn30AesEnc,
|
||||||
|
#p13n.SdKeyScp03Kvn30AesMac,
|
||||||
|
#p13n.SdKeyScp03Kvn31AesDek,
|
||||||
|
p13n.SdKeyScp03Kvn31AesEnc,
|
||||||
|
#p13n.SdKeyScp03Kvn31AesMac,
|
||||||
|
#p13n.SdKeyScp03Kvn32AesDek,
|
||||||
|
#p13n.SdKeyScp03Kvn32AesEnc,
|
||||||
|
p13n.SdKeyScp03Kvn32AesMac,
|
||||||
|
#p13n.SdKeyScp80Kvn01AesDek,
|
||||||
|
#p13n.SdKeyScp80Kvn01AesEnc,
|
||||||
|
#p13n.SdKeyScp80Kvn01AesMac,
|
||||||
|
p13n.SdKeyScp80Kvn01DesDek,
|
||||||
|
#p13n.SdKeyScp80Kvn01DesEnc,
|
||||||
|
#p13n.SdKeyScp80Kvn01DesMac,
|
||||||
|
#p13n.SdKeyScp80Kvn02AesDek,
|
||||||
|
p13n.SdKeyScp80Kvn02AesEnc,
|
||||||
|
#p13n.SdKeyScp80Kvn02AesMac,
|
||||||
|
#p13n.SdKeyScp80Kvn02DesDek,
|
||||||
|
#p13n.SdKeyScp80Kvn02DesEnc,
|
||||||
|
p13n.SdKeyScp80Kvn02DesMac,
|
||||||
|
#p13n.SdKeyScp80Kvn03AesDek,
|
||||||
|
#p13n.SdKeyScp80Kvn03AesEnc,
|
||||||
|
#p13n.SdKeyScp80Kvn03AesMac,
|
||||||
|
p13n.SdKeyScp80Kvn03DesDek,
|
||||||
|
#p13n.SdKeyScp80Kvn03DesEnc,
|
||||||
|
#p13n.SdKeyScp80Kvn03DesMac,
|
||||||
|
p13n.SdKeyScp81Kvn40AesDek,
|
||||||
|
#p13n.SdKeyScp81Kvn40Tlspsk,
|
||||||
|
#p13n.SdKeyScp81Kvn41AesDek,
|
||||||
|
p13n.SdKeyScp81Kvn41Tlspsk,
|
||||||
|
#p13n.SdKeyScp81Kvn42AesDek,
|
||||||
|
#p13n.SdKeyScp81Kvn42Tlspsk,
|
||||||
|
):
|
||||||
|
|
||||||
|
for key_len in sdkey_cls.allow_len:
|
||||||
|
val = '0102030405060708091011121314151617181920212223242526272829303132'
|
||||||
|
expect_clean_val = (b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
||||||
|
b'\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31\x32')
|
||||||
|
expect_val = '0102030405060708091011121314151617181920212223242526272829303132'
|
||||||
|
|
||||||
|
val = val[:key_len*2]
|
||||||
|
expect_clean_val = expect_clean_val[:key_len]
|
||||||
|
expect_val = val
|
||||||
|
|
||||||
|
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||||
|
|
||||||
|
# test bytes input
|
||||||
|
val = expect_clean_val
|
||||||
|
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||||
|
|
||||||
|
# test bytearray input
|
||||||
|
val = bytearray(expect_clean_val)
|
||||||
|
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||||
|
|
||||||
|
# test BytesIO input
|
||||||
|
val = io.BytesIO(expect_clean_val)
|
||||||
|
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||||
|
|
||||||
|
if key_len == 16:
|
||||||
|
# test huge integer input.
|
||||||
|
# needs to start with nonzero.. stupid
|
||||||
|
val = 11020304050607080910111213141516
|
||||||
|
expect_clean_val = (b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16')
|
||||||
|
expect_val = '11020304050607080910111213141516'
|
||||||
|
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||||
|
|
||||||
|
outputs = []
|
||||||
|
|
||||||
|
for upp_fname in upp_fnames:
|
||||||
|
test_idx = -1
|
||||||
|
try:
|
||||||
|
|
||||||
|
der = resources.read_binary(smdpp_data.upp, upp_fname)
|
||||||
|
|
||||||
|
for t in param_tests:
|
||||||
|
test_idx += 1
|
||||||
|
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
|
||||||
|
|
||||||
|
param = None
|
||||||
|
try:
|
||||||
|
param = t.param_cls()
|
||||||
|
param.input_value = t.val
|
||||||
|
param.validate()
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f'{logloc}: {e}') from e
|
||||||
|
|
||||||
|
clean_val = param.value
|
||||||
|
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
|
||||||
|
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
|
||||||
|
raise ValueError(f'{logloc}: expected'
|
||||||
|
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
|
||||||
|
|
||||||
|
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
|
||||||
|
# pes = copy.deepcopy(orig_pes)
|
||||||
|
pes = ProfileElementSequence.from_der(der)
|
||||||
|
try:
|
||||||
|
param.apply(pes)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
|
||||||
|
|
||||||
|
changed_der = pes.to_der()
|
||||||
|
|
||||||
|
pes2 = ProfileElementSequence.from_der(changed_der)
|
||||||
|
|
||||||
|
read_back_val = t.param_cls.get_value_from_pes(pes2)
|
||||||
|
|
||||||
|
# compose log string to show the precise type of dict values
|
||||||
|
if isinstance(read_back_val, dict):
|
||||||
|
types = set()
|
||||||
|
for v in read_back_val.values():
|
||||||
|
types.add(f'{type(v).__name__}')
|
||||||
|
|
||||||
|
read_back_val_type = '{' + ', '.join(types) + '}'
|
||||||
|
else:
|
||||||
|
read_back_val_type = f'{type(read_back_val).__name__}'
|
||||||
|
|
||||||
|
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
|
||||||
|
|
||||||
|
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
|
||||||
|
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
|
||||||
|
|
||||||
|
expect_val = t.expect_val
|
||||||
|
if not isinstance(expect_val, dict):
|
||||||
|
expect_val = { t.param_cls.get_name(): expect_val }
|
||||||
|
if read_back_val != expect_val:
|
||||||
|
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
|
||||||
|
|
||||||
|
ok = logloc.replace(' clean_val', '\n\tclean_val'
|
||||||
|
).replace(' read_back_val', '\n\tread_back_val'
|
||||||
|
).replace('=', '=\t'
|
||||||
|
)
|
||||||
|
output = f'\nok: {ok}'
|
||||||
|
outputs.append(output)
|
||||||
|
print(output)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f'Error while testing UPP {upp_fname} {test_idx=}: {e}') from e
|
||||||
|
|
||||||
|
output = '\n'.join(outputs) + '\n'
|
||||||
|
xo_name = 'test_configurable_parameters'
|
||||||
|
if update_expected_output:
|
||||||
|
with resources.path(xo, xo_name) as xo_path:
|
||||||
|
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(output)
|
||||||
|
else:
|
||||||
|
xo_str = resources.read_text(xo, xo_name)
|
||||||
|
if xo_str != output:
|
||||||
|
at = 0
|
||||||
|
while at < len(output):
|
||||||
|
if output[at] == xo_str[at]:
|
||||||
|
at += 1
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
|
||||||
|
|
||||||
|
|
||||||
|
class TestValidateVal(unittest.TestCase):
|
||||||
|
"""validate_val() tests for various ConfigurableParameter subclasses."""
|
||||||
|
|
||||||
|
def _ok(self, cls, val, expected=None):
|
||||||
|
result = cls.validate_val(val)
|
||||||
|
if expected is not None:
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _err(self, cls, val):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
cls.validate_val(val)
|
||||||
|
|
||||||
|
# --- Iccid ---
|
||||||
|
|
||||||
|
def test_iccid_18digits_adds_luhn(self):
|
||||||
|
result = self._ok(p13n.Iccid, '998877665544332211')
|
||||||
|
self.assertIsInstance(result, str)
|
||||||
|
self.assertEqual(len(result), 19)
|
||||||
|
self.assertTrue(result.isdecimal())
|
||||||
|
|
||||||
|
def test_iccid_19digits_passthrough(self):
|
||||||
|
result = self._ok(p13n.Iccid, '9988776655443322110')
|
||||||
|
self.assertIsInstance(result, str)
|
||||||
|
self.assertEqual(len(result), 19)
|
||||||
|
|
||||||
|
def test_iccid_too_short(self):
|
||||||
|
self._err(p13n.Iccid, '12345678901234567') # 17 digits
|
||||||
|
|
||||||
|
def test_iccid_too_long(self):
|
||||||
|
self._err(p13n.Iccid, '1' * 21)
|
||||||
|
|
||||||
|
def test_iccid_non_digits(self):
|
||||||
|
self._err(p13n.Iccid, '99887766554433221X')
|
||||||
|
|
||||||
|
# --- Imsi ---
|
||||||
|
|
||||||
|
def test_imsi_valid_short(self):
|
||||||
|
self._ok(p13n.Imsi, '001010', '001010')
|
||||||
|
|
||||||
|
def test_imsi_valid_long(self):
|
||||||
|
self._ok(p13n.Imsi, '001010123456789', '001010123456789')
|
||||||
|
|
||||||
|
def test_imsi_too_short(self):
|
||||||
|
self._err(p13n.Imsi, '12345') # 5 digits, min is 6
|
||||||
|
|
||||||
|
def test_imsi_too_long(self):
|
||||||
|
self._err(p13n.Imsi, '1' * 16)
|
||||||
|
|
||||||
|
def test_imsi_non_digits(self):
|
||||||
|
self._err(p13n.Imsi, '00101A123456789')
|
||||||
|
|
||||||
|
# --- Pin1 ---
|
||||||
|
|
||||||
|
def test_pin1_4digits(self):
|
||||||
|
# DecimalHexParam encodes each digit as its ASCII byte, then rpad to 8 bytes with 0xff
|
||||||
|
self._ok(p13n.Pin1, '1234', b'1234\xff\xff\xff\xff')
|
||||||
|
|
||||||
|
def test_pin1_8digits(self):
|
||||||
|
self._ok(p13n.Pin1, '12345678', b'12345678')
|
||||||
|
|
||||||
|
def test_pin1_too_short(self):
|
||||||
|
self._err(p13n.Pin1, '123')
|
||||||
|
|
||||||
|
def test_pin1_too_long(self):
|
||||||
|
self._err(p13n.Pin1, '123456789')
|
||||||
|
|
||||||
|
def test_pin1_non_digits(self):
|
||||||
|
self._err(p13n.Pin1, '123A')
|
||||||
|
|
||||||
|
# --- Puk1 ---
|
||||||
|
|
||||||
|
def test_puk1_8digits(self):
|
||||||
|
self._ok(p13n.Puk1, '12345678', b'12345678')
|
||||||
|
|
||||||
|
def test_puk1_wrong_length(self):
|
||||||
|
self._err(p13n.Puk1, '1234567') # 7 digits
|
||||||
|
self._err(p13n.Puk1, '123456789') # 9 digits
|
||||||
|
|
||||||
|
def test_puk1_non_digits(self):
|
||||||
|
self._err(p13n.Puk1, '1234567X')
|
||||||
|
|
||||||
|
# --- K (BinaryParam) ---
|
||||||
|
|
||||||
|
def test_k_valid_hex_str(self):
|
||||||
|
self._ok(p13n.K, '000102030405060708090a0b0c0d0e0f',
|
||||||
|
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f')
|
||||||
|
|
||||||
|
def test_k_valid_bytes(self):
|
||||||
|
raw = bytes(range(16))
|
||||||
|
self._ok(p13n.K, raw, raw)
|
||||||
|
|
||||||
|
def test_k_wrong_length(self):
|
||||||
|
self._err(p13n.K, '00' * 15) # 15 bytes, allow_len requires 16 or 32
|
||||||
|
|
||||||
|
def test_k_non_hex(self):
|
||||||
|
self._err(p13n.K, 'gg' * 16)
|
||||||
|
|
||||||
|
def test_k_odd_hex_digits(self):
|
||||||
|
self._err(p13n.K, '0' * 31) # odd number of hex digits
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnumParam(unittest.TestCase):
|
||||||
|
"""Tests for the EnumParam machinery, using AlgorithmID as the concrete subclass."""
|
||||||
|
|
||||||
|
# --- validate_val ---
|
||||||
|
|
||||||
|
def test_validate_by_name_exact(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val('Milenage'), 1)
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val('TUAK'), 2)
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val('usim_test'), 3)
|
||||||
|
|
||||||
|
def test_validate_by_int(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val(1), 1)
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val(2), 2)
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val(3), 3)
|
||||||
|
|
||||||
|
def test_validate_fuzzy_case(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val('milenage'), 1)
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val('MILENAGE'), 1)
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val('tuak'), 2)
|
||||||
|
|
||||||
|
def test_validate_fuzzy_hyphen_underscore(self):
|
||||||
|
# 'usim-test' has a hyphen; enum member is 'usim_test' — must fuzzy-match
|
||||||
|
self.assertEqual(p13n.AlgorithmID.validate_val('usim-test'), 3)
|
||||||
|
|
||||||
|
def test_validate_invalid_name(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
p13n.AlgorithmID.validate_val('unknown')
|
||||||
|
|
||||||
|
def test_validate_invalid_int(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
p13n.AlgorithmID.validate_val(99)
|
||||||
|
|
||||||
|
def test_validate_returns_int(self):
|
||||||
|
result = p13n.AlgorithmID.validate_val('Milenage')
|
||||||
|
self.assertIsInstance(result, int)
|
||||||
|
self.assertNotIsInstance(result, enum.Enum)
|
||||||
|
|
||||||
|
# --- map_name_to_val ---
|
||||||
|
|
||||||
|
def test_map_name_exact(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.map_name_to_val('Milenage'), 1)
|
||||||
|
|
||||||
|
def test_map_name_fuzzy(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.map_name_to_val('milenage'), 1)
|
||||||
|
self.assertEqual(p13n.AlgorithmID.map_name_to_val('usim-test'), 3)
|
||||||
|
|
||||||
|
def test_map_name_strict_raises(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
p13n.AlgorithmID.map_name_to_val('unknown', strict=True)
|
||||||
|
|
||||||
|
def test_map_name_nonstrict_returns_none(self):
|
||||||
|
self.assertIsNone(p13n.AlgorithmID.map_name_to_val('unknown', strict=False))
|
||||||
|
|
||||||
|
# --- map_val_to_name ---
|
||||||
|
|
||||||
|
def test_map_val_known(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.map_val_to_name(1), 'Milenage')
|
||||||
|
self.assertEqual(p13n.AlgorithmID.map_val_to_name(2), 'TUAK')
|
||||||
|
self.assertEqual(p13n.AlgorithmID.map_val_to_name(3), 'usim_test')
|
||||||
|
|
||||||
|
def test_map_val_unknown_nonstrict(self):
|
||||||
|
self.assertIsNone(p13n.AlgorithmID.map_val_to_name(99))
|
||||||
|
|
||||||
|
def test_map_val_unknown_strict(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
p13n.AlgorithmID.map_val_to_name(99, strict=True)
|
||||||
|
|
||||||
|
# --- name_normalize ---
|
||||||
|
|
||||||
|
def test_name_normalize(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.name_normalize('Milenage'), 'Milenage')
|
||||||
|
self.assertEqual(p13n.AlgorithmID.name_normalize('milenage'), 'Milenage')
|
||||||
|
self.assertEqual(p13n.AlgorithmID.name_normalize('usim-test'), 'usim_test')
|
||||||
|
|
||||||
|
# --- clean_name_str ---
|
||||||
|
|
||||||
|
def test_clean_name_str(self):
|
||||||
|
self.assertEqual(p13n.AlgorithmID.clean_name_str('usim-test'), 'usimtest')
|
||||||
|
self.assertEqual(p13n.AlgorithmID.clean_name_str('usim_test'), 'usimtest')
|
||||||
|
self.assertEqual(p13n.AlgorithmID.clean_name_str('Milenage'), 'milenage')
|
||||||
|
self.assertEqual(p13n.AlgorithmID.clean_name_str('foo bar!'), 'foobar')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if '-u' in sys.argv:
|
||||||
|
update_expected_output = True
|
||||||
|
sys.argv.remove('-u')
|
||||||
|
unittest.main()
|
||||||
@@ -0,0 +1,78 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# (C) 2026 by sysmocom - s.f.m.c. GmbH
|
||||||
|
# All Rights Reserved
|
||||||
|
#
|
||||||
|
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
from pySim.profile import CardProfile
|
||||||
|
from pySim.ts_51_011 import CardProfileSIM
|
||||||
|
from pySim.ts_102_221 import CardProfileUICC
|
||||||
|
|
||||||
|
class TestDecodeSelectResponse_CardProfile(unittest.TestCase):
|
||||||
|
|
||||||
|
def decode_select_response(self, card_Profile: CardProfile, testcases: list[dict]):
|
||||||
|
for testcase in testcases:
|
||||||
|
resp_hex = testcase['resp_hex']
|
||||||
|
decoded = card_Profile.decode_select_response(resp_hex)
|
||||||
|
if testcase['decoded']:
|
||||||
|
self.assertEqual(decoded, testcase['decoded'])
|
||||||
|
else:
|
||||||
|
print("no testvector to compare against, assuming the following output is correct:")
|
||||||
|
print("resp_hex:", resp_hex)
|
||||||
|
print("decoded:", decoded)
|
||||||
|
|
||||||
|
def test_CardProfileSIM(self):
|
||||||
|
testcases = [
|
||||||
|
# MF
|
||||||
|
{"resp_hex" : "000000003f000100000000000981020c0400838a838a",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'mf'}}, 'proprietary_info': {'available_memory': 0}, 'file_id': '3f00', 'file_characteristics': '81', 'num_direct_child_df': 2, 'num_direct_child_ef': 12, 'num_chv_unblock_adm_codes': 4}},
|
||||||
|
# DF.TELECOM
|
||||||
|
{"resp_hex" : "000000007f100200000000000981000d0400838a838a",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'df'}}, 'proprietary_info': {'available_memory': 0}, 'file_id': '7f10', 'file_characteristics': '81', 'num_direct_child_df': 0, 'num_direct_child_ef': 13, 'num_chv_unblock_adm_codes': 4}},
|
||||||
|
# EF.MSISDN
|
||||||
|
{"resp_hex" : "000000346f40040011ffff0102011a",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'working_ef', 'structure': 'linear_fixed'}, 'record_len': 26, 'num_of_rec': 2}, 'proprietary_info': {}, 'file_id': '6f40', 'file_size': 52, 'access_conditions': '11ffff', 'life_cycle_status_int': 'creation'}},
|
||||||
|
# EF.ICCID
|
||||||
|
{"resp_hex" : "0000000a2fe204000cffff01020000",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'working_ef', 'structure': 'transparent'}}, 'proprietary_info': {}, 'file_id': '2fe2', 'file_size': 10, 'access_conditions': '0cffff', 'life_cycle_status_int': 'creation'}},
|
||||||
|
]
|
||||||
|
self.decode_select_response(CardProfileSIM, testcases)
|
||||||
|
|
||||||
|
def test_CardProfileUICC(self):
|
||||||
|
testcases = [
|
||||||
|
# MF
|
||||||
|
{"resp_hex" : "622c8202782183023f00a50c80017183040003a7388701018a01058b032f0601c60c90016083010183010a83010b",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'df', 'structure': 'no_info_given'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'?\x00', 'proprietary_information': {'uicc_characteristics': b'q', 'available_memory': 239416, 'supported_filesystem_commands': {'terminal_capability': True}}, 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'/\x06', 'ef_arr_record_nr': 1}, 'pin_status_template_do': [{'ps_do': b'`'}, {'key_reference': 1}, {'key_reference': 10}, {'key_reference': 11}]}},
|
||||||
|
# ADF.USIM
|
||||||
|
{"resp_hex" : "623d8202782183027fd0840ca0000000871002ff49ff0589a50c80017183040003a7388701018a01058b032f0601c60f90017083010183018183010a83010b",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'df', 'structure': 'no_info_given'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'\x7f\xd0', 'df_name': b'\xa0\x00\x00\x00\x87\x10\x02\xffI\xff\x05\x89', 'proprietary_information': {'uicc_characteristics': b'q', 'available_memory': 239416, 'supported_filesystem_commands': {'terminal_capability': True}}, 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'/\x06', 'ef_arr_record_nr': 1}, 'pin_status_template_do': [{'ps_do': b'p'}, {'key_reference': 1}, {'key_reference': 129}, {'key_reference': 10}, {'key_reference': 11}]}},
|
||||||
|
# ADF.ISIM
|
||||||
|
{"resp_hex" : "623d8202782183027fb0840ca0000000871004ff49ff0589a50c80017183040003a7388701018a01058b032f0601c60f90017083010183018183010a83010b",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'df', 'structure': 'no_info_given'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'\x7f\xb0', 'df_name': b'\xa0\x00\x00\x00\x87\x10\x04\xffI\xff\x05\x89', 'proprietary_information': {'uicc_characteristics': b'q', 'available_memory': 239416, 'supported_filesystem_commands': {'terminal_capability': True}}, 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'/\x06', 'ef_arr_record_nr': 1}, 'pin_status_template_do': [{'ps_do': b'p'}, {'key_reference': 1}, {'key_reference': 129}, {'key_reference': 10}, {'key_reference': 11}]}},
|
||||||
|
# EF.IMSI
|
||||||
|
{"resp_hex" : "62178202412183026f078a01058b036f060a80020009880138",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'working_ef', 'structure': 'transparent'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'o\x07', 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'o\x06', 'ef_arr_record_nr': 10}, 'file_size': 9, 'short_file_identifier': 7}},
|
||||||
|
# EF.ECC
|
||||||
|
{"resp_hex" : "621a82054221000e0283026fb78a01058b036f06088002001c880108",
|
||||||
|
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'working_ef', 'structure': 'linear_fixed'}, 'record_len': 14, 'num_of_rec': 2}, 'file_identifier': b'o\xb7', 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'o\x06', 'ef_arr_record_nr': 8}, 'file_size': 28, 'short_file_identifier': 1}},
|
||||||
|
]
|
||||||
|
self.decode_select_response(CardProfileUICC, testcases)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -21,7 +21,7 @@ import copy
|
|||||||
from osmocom.utils import h2b, b2h
|
from osmocom.utils import h2b, b2h
|
||||||
|
|
||||||
from pySim.esim.saip import *
|
from pySim.esim.saip import *
|
||||||
from pySim.esim.saip.personalization import *
|
from pySim.esim.saip import personalization
|
||||||
from pprint import pprint as pp
|
from pprint import pprint as pp
|
||||||
|
|
||||||
|
|
||||||
@@ -55,14 +55,56 @@ class SaipTest(unittest.TestCase):
|
|||||||
def test_personalization(self):
|
def test_personalization(self):
|
||||||
"""Test some of the personalization operations."""
|
"""Test some of the personalization operations."""
|
||||||
pes = copy.deepcopy(self.pes)
|
pes = copy.deepcopy(self.pes)
|
||||||
params = [Puk1('01234567'), Puk2(98765432), Pin1('1111'), Pin2(2222), Adm1('11111111'),
|
params = [personalization.Puk1('01234567'),
|
||||||
K(h2b('000102030405060708090a0b0c0d0e0f')), Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
|
personalization.Puk2(98765432),
|
||||||
|
personalization.Pin1('1111'),
|
||||||
|
personalization.Pin2(2222),
|
||||||
|
personalization.Adm1('11111111'),
|
||||||
|
personalization.K(h2b('000102030405060708090a0b0c0d0e0f')),
|
||||||
|
personalization.Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
|
||||||
for p in params:
|
for p in params:
|
||||||
p.validate()
|
p.validate()
|
||||||
p.apply(pes)
|
p.apply(pes)
|
||||||
# TODO: we don't actually test the results here, but we just verify there is no exception
|
# TODO: we don't actually test the results here, but we just verify there is no exception
|
||||||
pes.to_der()
|
pes.to_der()
|
||||||
|
|
||||||
|
def test_personalization2(self):
|
||||||
|
"""Test some of the personalization operations."""
|
||||||
|
cls = personalization.SdKeyScp80Kvn01DesEnc
|
||||||
|
pes = ProfileElementSequence.from_der(self.per_input)
|
||||||
|
prev_val = tuple(cls.get_values_from_pes(pes))
|
||||||
|
print(f'{prev_val=}')
|
||||||
|
self.assertTrue(prev_val)
|
||||||
|
|
||||||
|
set_val = '42342342342342342342342342342342'
|
||||||
|
param = cls(set_val)
|
||||||
|
param.validate()
|
||||||
|
param.apply(pes)
|
||||||
|
|
||||||
|
get_val1 = tuple(cls.get_values_from_pes(pes))
|
||||||
|
print(f'{get_val1=} {set_val=}')
|
||||||
|
self.assertEqual(get_val1, ({cls.name: set_val},))
|
||||||
|
|
||||||
|
get_val1b = tuple(cls.get_values_from_pes(pes))
|
||||||
|
print(f'{get_val1b=} {set_val=}')
|
||||||
|
self.assertEqual(get_val1b, ({cls.name: set_val},))
|
||||||
|
|
||||||
|
der = pes.to_der()
|
||||||
|
|
||||||
|
get_val1c = tuple(cls.get_values_from_pes(pes))
|
||||||
|
print(f'{get_val1c=} {set_val=}')
|
||||||
|
self.assertEqual(get_val1c, ({cls.name: set_val},))
|
||||||
|
|
||||||
|
# assertTrue to not dump the entire der.
|
||||||
|
# Expecting the modified DER to be different. If this assertion fails, then no change has happened in the output
|
||||||
|
# DER and the ConfigurableParameter subclass is buggy.
|
||||||
|
self.assertTrue(der != self.per_input)
|
||||||
|
|
||||||
|
pes2 = ProfileElementSequence.from_der(der)
|
||||||
|
get_val2 = tuple(cls.get_values_from_pes(pes2))
|
||||||
|
print(f'{get_val2=} {set_val=}')
|
||||||
|
self.assertEqual(get_val2, ({cls.name: set_val},))
|
||||||
|
|
||||||
def test_constructor_encode(self):
|
def test_constructor_encode(self):
|
||||||
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
|
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
|
||||||
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
|
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
|
||||||
|
|||||||
Executable
+206
@@ -0,0 +1,206 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||||
|
#
|
||||||
|
# Author: Neels Hofmeyr
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import math
|
||||||
|
from importlib import resources
|
||||||
|
import unittest
|
||||||
|
from pySim.esim.saip import param_source
|
||||||
|
|
||||||
|
import xo
|
||||||
|
update_expected_output = False
|
||||||
|
|
||||||
|
class D:
|
||||||
|
mandatory = set()
|
||||||
|
optional = set()
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
||||||
|
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
||||||
|
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
||||||
|
for k, v in kwargs.items():
|
||||||
|
setattr(self, k, v)
|
||||||
|
for k in self.optional:
|
||||||
|
if not hasattr(self, k):
|
||||||
|
setattr(self, k, None)
|
||||||
|
|
||||||
|
decimals = '0123456789'
|
||||||
|
hexadecimals = '0123456789abcdefABCDEF'
|
||||||
|
|
||||||
|
class FakeRandom:
|
||||||
|
vals = b'\xab\xcfm\xf0\x98J_\xcf\x96\x87fp5l\xe7f\xd1\xd6\x97\xc1\xf9]\x8c\x86+\xdb\t^ke\xc1r'
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def next(cls):
|
||||||
|
cls.i = (cls.i + 1) % len(cls.vals)
|
||||||
|
return cls.vals[cls.i]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def randint(a, b):
|
||||||
|
d = b - a
|
||||||
|
n_bytes = math.ceil(math.log(d, 2))
|
||||||
|
r = int.from_bytes( bytes(FakeRandom.next() for i in range(n_bytes)) )
|
||||||
|
return a + (r % (b - a))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def randbytes(n):
|
||||||
|
return bytes(FakeRandom.next() for i in range(n))
|
||||||
|
|
||||||
|
|
||||||
|
class ParamSourceTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_param_source(self):
|
||||||
|
|
||||||
|
class Paramtest(D):
|
||||||
|
mandatory = (
|
||||||
|
'param_source',
|
||||||
|
'n',
|
||||||
|
'expect',
|
||||||
|
)
|
||||||
|
optional = (
|
||||||
|
'expect_arg',
|
||||||
|
'csv_rows',
|
||||||
|
)
|
||||||
|
param_source: param_source.ParamSource
|
||||||
|
n: int
|
||||||
|
expect: object
|
||||||
|
expect_arg: object
|
||||||
|
csv_rows: object
|
||||||
|
|
||||||
|
def expect_const(t, vals):
|
||||||
|
return tuple(t.expect_arg) == tuple(vals)
|
||||||
|
|
||||||
|
def expect_random(t, vals):
|
||||||
|
chars = t.expect_arg.get('digits')
|
||||||
|
repetitions = (t.n - len(set(vals)))
|
||||||
|
if repetitions:
|
||||||
|
raise RuntimeError(f'expect_random: there are {repetitions} repetitions in the returned values: {vals}')
|
||||||
|
for val_i in range(len(vals)):
|
||||||
|
v = vals[val_i]
|
||||||
|
val_minlen = t.expect_arg.get('val_minlen')
|
||||||
|
val_maxlen = t.expect_arg.get('val_maxlen')
|
||||||
|
if len(v) < val_minlen or len(v) > val_maxlen:
|
||||||
|
raise RuntimeError(f'expect_random: invalid length {len(v)} for value [{val_i}]: {v!r}, expecting'
|
||||||
|
f' {val_minlen}..{val_maxlen}')
|
||||||
|
|
||||||
|
if chars is not None and not all(c in chars for c in v):
|
||||||
|
raise RuntimeError(f'expect_random: invalid char in value [{val_i}]: {v!r}')
|
||||||
|
return True
|
||||||
|
|
||||||
|
param_source_tests = [
|
||||||
|
Paramtest(param_source=param_source.ConstantSource.from_str('123'),
|
||||||
|
n=3,
|
||||||
|
expect=expect_const,
|
||||||
|
expect_arg=('123', '123', '123')),
|
||||||
|
Paramtest(param_source=param_source.RandomDigitSource.from_str('12345'),
|
||||||
|
n=3,
|
||||||
|
expect=expect_random,
|
||||||
|
expect_arg={'digits': decimals,
|
||||||
|
'val_minlen': 5,
|
||||||
|
'val_maxlen': 5}),
|
||||||
|
Paramtest(param_source=param_source.RandomDigitSource.from_str('1..999'),
|
||||||
|
n=10,
|
||||||
|
expect=expect_random,
|
||||||
|
expect_arg={'digits': decimals,
|
||||||
|
'val_minlen': 1,
|
||||||
|
'val_maxlen': 3}),
|
||||||
|
Paramtest(param_source=param_source.RandomDigitSource.from_str('001..999'),
|
||||||
|
n=10,
|
||||||
|
expect=expect_random,
|
||||||
|
expect_arg={'digits': decimals,
|
||||||
|
'val_minlen': 3,
|
||||||
|
'val_maxlen': 3}),
|
||||||
|
Paramtest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
|
||||||
|
n=3,
|
||||||
|
expect=expect_random,
|
||||||
|
expect_arg={'digits': hexadecimals,
|
||||||
|
'val_minlen': 8,
|
||||||
|
'val_maxlen': 8}),
|
||||||
|
Paramtest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
|
||||||
|
n=3,
|
||||||
|
expect=expect_random,
|
||||||
|
expect_arg={'digits': hexadecimals,
|
||||||
|
'val_minlen': 8,
|
||||||
|
'val_maxlen': 8}),
|
||||||
|
Paramtest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
|
||||||
|
n=3,
|
||||||
|
expect=expect_random,
|
||||||
|
expect_arg={'digits': hexadecimals,
|
||||||
|
'val_minlen': 8,
|
||||||
|
'val_maxlen': 8}),
|
||||||
|
Paramtest(param_source=param_source.IncDigitSource.from_str('10001'),
|
||||||
|
n=3,
|
||||||
|
expect=expect_const,
|
||||||
|
expect_arg=('10001', '10002', '10003')),
|
||||||
|
Paramtest(param_source=param_source.CsvSource('column_name'),
|
||||||
|
n=3,
|
||||||
|
expect=expect_const,
|
||||||
|
expect_arg=('first val', 'second val', 'third val'),
|
||||||
|
csv_rows=(
|
||||||
|
{'column_name': 'first val'},
|
||||||
|
{'column_name': 'second val'},
|
||||||
|
{'column_name': 'third val'},
|
||||||
|
)),
|
||||||
|
]
|
||||||
|
|
||||||
|
outputs = []
|
||||||
|
|
||||||
|
for t in param_source_tests:
|
||||||
|
try:
|
||||||
|
if hasattr(t.param_source, 'random_impl'):
|
||||||
|
t.param_source.random_impl = FakeRandom
|
||||||
|
|
||||||
|
vals = []
|
||||||
|
for i in range(t.n):
|
||||||
|
csv_row = None
|
||||||
|
if t.csv_rows is not None:
|
||||||
|
csv_row = t.csv_rows[i]
|
||||||
|
vals.append( t.param_source.get_next(csv_row=csv_row) )
|
||||||
|
if not t.expect(t, vals):
|
||||||
|
raise RuntimeError(f'invalid values returned: returned {vals}')
|
||||||
|
output = f'ok: {t.param_source.__class__.__name__} {vals=!r}'
|
||||||
|
outputs.append(output)
|
||||||
|
print(output)
|
||||||
|
except RuntimeError as e:
|
||||||
|
raise RuntimeError(f'{t.param_source.__class__.__name__} {t.n=} {t.expect.__name__}({t.expect_arg!r}): {e}') from e
|
||||||
|
|
||||||
|
output = '\n'.join(outputs) + '\n'
|
||||||
|
xo_name = 'test_param_src'
|
||||||
|
if update_expected_output:
|
||||||
|
with resources.path(xo, xo_name) as xo_path:
|
||||||
|
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(output)
|
||||||
|
else:
|
||||||
|
xo_str = resources.read_text(xo, xo_name)
|
||||||
|
if xo_str != output:
|
||||||
|
at = 0
|
||||||
|
while at < len(output):
|
||||||
|
if output[at] == xo_str[at]:
|
||||||
|
at += 1
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
raise RuntimeError(f'output differs from expected output at position {at}: {xo_str[at:at+128]!r}')
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if '-u' in sys.argv:
|
||||||
|
update_expected_output = True
|
||||||
|
sys.argv.remove('-u')
|
||||||
|
unittest.main()
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,9 @@
|
|||||||
|
ok: ConstantSource vals=['123', '123', '123']
|
||||||
|
ok: RandomDigitSource vals=['13987', '49298', '55670']
|
||||||
|
ok: RandomDigitSource vals=['650', '580', '49', '885', '497', '195', '320', '137', '245', '663']
|
||||||
|
ok: RandomDigitSource vals=['638', '025', '232', '779', '826', '972', '650', '580', '049', '885']
|
||||||
|
ok: RandomHexDigitSource vals=['6b65c172', 'abcf6df0', '984a5fcf']
|
||||||
|
ok: RandomHexDigitSource vals=['96876670', '356ce766', 'd1d697c1']
|
||||||
|
ok: RandomHexDigitSource vals=['f95d8c86', '2bdb095e', '6b65c172']
|
||||||
|
ok: IncDigitSource vals=['10001', '10002', '10003']
|
||||||
|
ok: CsvSource vals=['first val', 'second val', 'third val']
|
||||||
Reference in New Issue
Block a user