Compare commits
1 Commits
neels/saip
...
osmith/wip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e4ea1c9973 |
@@ -24,12 +24,20 @@ import argparse
|
||||
from Cryptodome.Cipher import AES
|
||||
from osmocom.utils import h2b, b2h, Hexstr
|
||||
|
||||
from pySim.card_key_provider import CardKeyFieldCryptor
|
||||
from pySim.card_key_provider import CardKeyProviderCsv
|
||||
|
||||
class CsvColumnEncryptor(CardKeyFieldCryptor):
|
||||
def dict_keys_to_upper(d: dict) -> dict:
|
||||
return {k.upper():v for k,v in d.items()}
|
||||
|
||||
class CsvColumnEncryptor:
|
||||
def __init__(self, filename: str, transport_keys: dict):
|
||||
self.filename = filename
|
||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||
self.transport_keys = dict_keys_to_upper(transport_keys)
|
||||
|
||||
def encrypt_col(self, colname:str, value: str) -> Hexstr:
|
||||
key = self.transport_keys[colname]
|
||||
cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV)
|
||||
return b2h(cipher.encrypt(h2b(value)))
|
||||
|
||||
def encrypt(self) -> None:
|
||||
with open(self.filename, 'r') as infile:
|
||||
@@ -41,8 +49,9 @@ class CsvColumnEncryptor(CardKeyFieldCryptor):
|
||||
cw.writeheader()
|
||||
|
||||
for row in cr:
|
||||
for fieldname in cr.fieldnames:
|
||||
row[fieldname] = self.crypt.encrypt_field(fieldname, row[fieldname])
|
||||
for key_colname in self.transport_keys:
|
||||
if key_colname in row:
|
||||
row[key_colname] = self.encrypt_col(key_colname, row[key_colname])
|
||||
cw.writerow(row)
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -62,5 +71,9 @@ if __name__ == "__main__":
|
||||
print("You must specify at least one key!")
|
||||
sys.exit(1)
|
||||
|
||||
csv_column_keys = CardKeyProviderCsv.process_transport_keys(csv_column_keys)
|
||||
for name, key in csv_column_keys.items():
|
||||
print("Encrypting column %s using AES key %s" % (name, key))
|
||||
|
||||
cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
|
||||
cce.encrypt()
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import argparse
|
||||
from osmocom.utils import h2b, swap_nibbles
|
||||
from pySim.esim.es8p import ProfileMetadata
|
||||
|
||||
parser = argparse.ArgumentParser(description="""Utility program to generate profile metadata in the
|
||||
StoreMetadataRequest format based on input values from the command line.""")
|
||||
parser.add_argument('--iccid', required=True, help="ICCID of eSIM profile");
|
||||
parser.add_argument('--spn', required=True, help="Service Provider Name");
|
||||
parser.add_argument('--profile-name', required=True, help="eSIM Profile Name");
|
||||
parser.add_argument('--profile-class', choices=['test', 'operational', 'provisioning'],
|
||||
default='operational', help="Profile Class");
|
||||
parser.add_argument('--outfile', required=True, help="Output File Name");
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = parser.parse_args()
|
||||
|
||||
iccid_bin = h2b(swap_nibbles(opts.iccid))
|
||||
pmd = ProfileMetadata(iccid_bin, spn=opts.spn, profile_name=opts.profile_name,
|
||||
profile_class=opts.profile_class)
|
||||
|
||||
with open(opts.outfile, 'wb') as f:
|
||||
f.write(pmd.gen_store_metadata_request())
|
||||
print("Written StoreMetadataRequest to '%s'" % opts.outfile)
|
||||
@@ -329,7 +329,7 @@ def do_info(pes: ProfileElementSequence, opts):
|
||||
print("Security domain Instance AID: %s" % b2h(sd.decoded['instance']['instanceAID']))
|
||||
# FIXME: 'applicationSpecificParametersC9' parsing to figure out enabled SCP
|
||||
for key in sd.keys:
|
||||
print("\t%s" % repr(key))
|
||||
print("\tKVN=0x%02x, KID=0x%02x, %s" % (key.key_version_number, key.key_identifier, key.key_components))
|
||||
|
||||
# RFM
|
||||
print()
|
||||
|
||||
@@ -18,7 +18,7 @@ sys.path.insert(0, os.path.abspath('..'))
|
||||
# -- Project information -----------------------------------------------------
|
||||
|
||||
project = 'osmopysim-usermanual'
|
||||
copyright = '2009-2025 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
copyright = '2009-2023 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||
|
||||
# PDF: Avoid that the authors list exceeds the page by inserting '\and'
|
||||
|
||||
@@ -40,21 +40,16 @@ osmo-smdpp currently
|
||||
Running osmo-smdpp
|
||||
------------------
|
||||
|
||||
osmo-smdpp comes with built-in TLS support which is enabled by default. However, it is always possible to
|
||||
disable the built-in TLS support if needed.
|
||||
|
||||
In order to use osmo-smdpp without the built-in TLS support, it has to be put behind a TLS reverse proxy,
|
||||
which terminates the ES9+ HTTPS traffic from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
|
||||
|
||||
NOTE: The built in TLS support in osmo-smdpp makes use of the python *twisted* framework. Older versions
|
||||
of this framework appear to have problems when using the example elliptic curve certificates (both NIST and
|
||||
Brainpool) from GSMA.
|
||||
osmo-smdpp does not have built-in TLS support as the used *twisted* framework appears to have
|
||||
problems when using the example elliptic curve certificates (both NIST and Brainpool) from GSMA.
|
||||
|
||||
So in order to use it, you have to put it behind a TLS reverse proxy, which terminates the ES9+
|
||||
HTTPS from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
|
||||
|
||||
nginx as TLS proxy
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If you chose to use `nginx` as TLS reverse proxy, you can use the following configuration snippet::
|
||||
If you use `nginx` as web server, you can use the following configuration snippet::
|
||||
|
||||
upstream smdpp {
|
||||
server localhost:8000;
|
||||
@@ -97,43 +92,32 @@ The `smdpp-data/upp` directory contains the UPP (Unprotected Profile Package) us
|
||||
commandline options
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Typically, you just run osmo-smdpp without any arguments, and it will bind its built-in HTTPS ES9+ interface to
|
||||
`localhost` TCP port 443. In this case an external TLS reverse proxy is not needed.
|
||||
Typically, you just run it without any arguments, and it will bind its plain-HTTP ES9+ interface to
|
||||
`localhost` TCP port 8000.
|
||||
|
||||
osmo-smdpp currently doesn't have any configuration file.
|
||||
|
||||
There are command line options for binding:
|
||||
|
||||
Bind the HTTPS ES9+ to a port other than 443::
|
||||
Bind the HTTP ES9+ to a port other than 8000::
|
||||
|
||||
./osmo-smdpp.py -p 8443
|
||||
|
||||
Disable the built-in TLS support and bind the plain-HTTP ES9+ to a port 8000::
|
||||
|
||||
./osmo-smdpp.py -p 8000 --nossl
|
||||
./osmo-smdpp.py -p 8001
|
||||
|
||||
Bind the HTTP ES9+ to a different local interface::
|
||||
|
||||
./osmo-smdpp.py -H 127.0.0.2
|
||||
./osmo-smdpp.py -H 127.0.0.1
|
||||
|
||||
DNS setup for your LPA
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The LPA must resolve `testsmdpplus1.example.com` to the IP address of your TLS proxy.
|
||||
|
||||
It must also accept the TLS certificates used by your TLS proxy. In case osmo-smdpp is used with built-in TLS support,
|
||||
it will use the certificates provided in smdpp-data.
|
||||
|
||||
NOTE: The HTTPS ES9+ interface cannot be addressed by the LPA directly via its IP address. The reason for this is that
|
||||
the included SGP.26 (DPtls) test certificates explicitly restrict the hostname to `testsmdpplus1.example.com` in the
|
||||
`X509v3 Subject Alternative Name` extension. Using a bare IP address as hostname may cause the certificate to be
|
||||
rejected by the LPA.
|
||||
|
||||
It must also accept the TLS certificates used by your TLS proxy.
|
||||
|
||||
Supported eUICC
|
||||
~~~~~~~~~~~~~~~
|
||||
|
||||
If you run osmo-smdpp with the included SGP.26 (DPauth, DPpb) certificates, you must use an eUICC with matching SGP.26
|
||||
If you run osmo-smdpp with the included SGP.26 certificates, you must use an eUICC with matching SGP.26
|
||||
certificates, i.e. the EUM certificate must be signed by a SGP.26 test root CA and the eUICC certificate
|
||||
in turn must be signed by that SGP.26 EUM certificate.
|
||||
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
#!/bin/sh
|
||||
python3 -m pylint -j0 --errors-only --disable E1102 --disable E0401 --enable W0301 pySim
|
||||
@@ -1,4 +0,0 @@
|
||||
#!/bin/sh -e
|
||||
set -x
|
||||
cd "$(dirname "$0")"
|
||||
ruff check .
|
||||
@@ -861,10 +861,10 @@ class SmDppHttpServer:
|
||||
|
||||
def main(argv):
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP(S) to", default="localhost")
|
||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP(S) to", default=443)
|
||||
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
|
||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
|
||||
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
||||
parser.add_argument("-s", "--nossl", help="disable built in SSL/TLS support", action='store_true', default=False)
|
||||
parser.add_argument("-s", "--nossl", help="do NOT use ssl", action='store_true', default=False)
|
||||
parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False)
|
||||
parser.add_argument("-b", "--brainpool", help="Use Brainpool curves instead of NIST",
|
||||
action='store_true', default=False)
|
||||
|
||||
151
pySim-shell.py
151
pySim-shell.py
@@ -22,25 +22,19 @@ from typing import List, Optional
|
||||
import json
|
||||
import traceback
|
||||
import re
|
||||
|
||||
import cmd2
|
||||
from packaging import version
|
||||
from cmd2 import style
|
||||
|
||||
import logging
|
||||
from pySim.log import PySimLogger
|
||||
from osmocom.utils import auto_uint8
|
||||
|
||||
# cmd2 >= 2.3.0 has deprecated the bg/fg in favor of Bg/Fg :(
|
||||
if version.parse(cmd2.__version__) < version.parse("2.3.0"):
|
||||
from cmd2 import fg, bg # pylint: disable=no-name-in-module
|
||||
RED = fg.red
|
||||
YELLOW = fg.yellow
|
||||
LIGHT_RED = fg.bright_red
|
||||
LIGHT_GREEN = fg.bright_green
|
||||
else:
|
||||
from cmd2 import Fg, Bg # pylint: disable=no-name-in-module
|
||||
RED = Fg.RED
|
||||
YELLOW = Fg.YELLOW
|
||||
LIGHT_RED = Fg.LIGHT_RED
|
||||
LIGHT_GREEN = Fg.LIGHT_GREEN
|
||||
from cmd2 import CommandSet, with_default_category, with_argparser
|
||||
@@ -69,12 +63,10 @@ from pySim.ts_102_222 import Ts102222Commands
|
||||
from pySim.gsm_r import DF_EIRENE
|
||||
from pySim.cat import ProactiveCommand
|
||||
|
||||
from pySim.card_key_provider import CardKeyProviderCsv
|
||||
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
|
||||
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
|
||||
|
||||
from pySim.app import init_card
|
||||
|
||||
log = PySimLogger.get("main")
|
||||
|
||||
class Cmd2Compat(cmd2.Cmd):
|
||||
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
|
||||
@@ -100,19 +92,15 @@ class PysimApp(Cmd2Compat):
|
||||
(C) 2021-2023 by Harald Welte, sysmocom - s.f.m.c. GmbH and contributors
|
||||
Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/shell.html """
|
||||
|
||||
def __init__(self, verbose, card, rs, sl, ch, script=None):
|
||||
def __init__(self, card, rs, sl, ch, script=None):
|
||||
if version.parse(cmd2.__version__) < version.parse("2.0.0"):
|
||||
kwargs = {'use_ipython': True}
|
||||
else:
|
||||
kwargs = {'include_ipy': True}
|
||||
|
||||
self.verbose = verbose
|
||||
self._onchange_verbose('verbose', False, self.verbose);
|
||||
|
||||
# pylint: disable=unexpected-keyword-arg
|
||||
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
|
||||
auto_load_commands=False, startup_script=script, **kwargs)
|
||||
PySimLogger.setup(self.poutput, {logging.WARN: YELLOW})
|
||||
self.intro = style(self.BANNER, fg=RED)
|
||||
self.default_category = 'pySim-shell built-in commands'
|
||||
self.card = None
|
||||
@@ -138,9 +126,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
self.add_settable(Settable2Compat('apdu_strict', bool,
|
||||
'Enforce APDU responses according to ISO/IEC 7816-3, table 12', self,
|
||||
onchange_cb=self._onchange_apdu_strict))
|
||||
self.add_settable(Settable2Compat('verbose', bool,
|
||||
'Enable/disable verbose logging', self,
|
||||
onchange_cb=self._onchange_verbose))
|
||||
self.equip(card, rs)
|
||||
|
||||
def equip(self, card, rs):
|
||||
@@ -225,13 +210,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
else:
|
||||
self.card._scc._tp.apdu_strict = False
|
||||
|
||||
def _onchange_verbose(self, param_name, old, new):
|
||||
PySimLogger.set_verbose(new)
|
||||
if new == True:
|
||||
PySimLogger.set_level(logging.DEBUG)
|
||||
else:
|
||||
PySimLogger.set_level(logging.INFO)
|
||||
|
||||
class Cmd2ApduTracer(ApduTracer):
|
||||
def __init__(self, cmd2_app):
|
||||
self.cmd2 = cmd2_app
|
||||
@@ -499,23 +477,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
"""Echo (print) a string on the console"""
|
||||
self.poutput(' '.join(opts.STRING))
|
||||
|
||||
query_card_key_parser = argparse.ArgumentParser()
|
||||
query_card_key_parser.add_argument('FIELDS', help="fields to query", type=str, nargs='+')
|
||||
query_card_key_parser.add_argument('--key', help='lookup key (typically \'ICCID\' or \'EID\')',
|
||||
type=str, required=True)
|
||||
query_card_key_parser.add_argument('--value', help='lookup key match value (e.g \'8988211000000123456\')',
|
||||
type=str, required=True)
|
||||
@cmd2.with_argparser(query_card_key_parser)
|
||||
@cmd2.with_category(CUSTOM_CATEGORY)
|
||||
def do_query_card_key(self, opts):
|
||||
"""Manually query the Card Key Provider"""
|
||||
result = card_key_provider_get(opts.FIELDS, opts.key, opts.value)
|
||||
self.poutput("Result:")
|
||||
if result == {}:
|
||||
self.poutput(" (none)")
|
||||
for k in result.keys():
|
||||
self.poutput(" %s: %s" % (str(k), str(result.get(k))))
|
||||
|
||||
@cmd2.with_category(CUSTOM_CATEGORY)
|
||||
def do_version(self, opts):
|
||||
"""Print the pySim software version."""
|
||||
@@ -954,53 +915,36 @@ class Iso7816Commands(CommandSet):
|
||||
raise RuntimeError("cannot find %s for ICCID '%s'" % (field, iccid))
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def __select_pin_nr(pin_type:str, pin_nr:int) -> int:
|
||||
if pin_type:
|
||||
# pylint: disable=unsubscriptable-object
|
||||
return pin_names.inverse[pin_type]
|
||||
return pin_nr
|
||||
|
||||
@staticmethod
|
||||
def __add_pin_nr_to_ArgumentParser(chv_parser):
|
||||
group = chv_parser.add_mutually_exclusive_group()
|
||||
group.add_argument('--pin-type',
|
||||
choices=[x for x in pin_names.values()
|
||||
if (x.startswith('PIN') or x.startswith('2PIN'))],
|
||||
help='Specifiy pin type (default is PIN1)')
|
||||
group.add_argument('--pin-nr', type=auto_uint8, default=0x01,
|
||||
help='PIN Number, 1=PIN1, 0x81=2PIN1 or custom value (see also TS 102 221, Table 9.3")')
|
||||
|
||||
verify_chv_parser = argparse.ArgumentParser()
|
||||
verify_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
verify_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(verify_chv_parser)
|
||||
|
||||
@cmd2.with_argparser(verify_chv_parser)
|
||||
def do_verify_chv(self, opts):
|
||||
"""Verify (authenticate) using specified CHV (PIN) code, which is how the specifications
|
||||
call it if you authenticate yourself using the specified PIN. There usually is at least PIN1 and
|
||||
2PIN1 (see also TS 102 221 Section 9.5.1 / Table 9.3)."""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.verify_chv(pin_nr, h2b(pin))
|
||||
PIN2."""
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.verify_chv(opts.pin_nr, h2b(pin))
|
||||
self._cmd.poutput("CHV verification successful")
|
||||
|
||||
unblock_chv_parser = argparse.ArgumentParser()
|
||||
unblock_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
unblock_chv_parser.add_argument('PUK', nargs='?', type=is_decimal,
|
||||
help='PUK code value. If none given, CSV file will be queried')
|
||||
unblock_chv_parser.add_argument('NEWPIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(unblock_chv_parser)
|
||||
|
||||
@cmd2.with_argparser(unblock_chv_parser)
|
||||
def do_unblock_chv(self, opts):
|
||||
"""Unblock PIN code using specified PUK code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
|
||||
puk = self.get_code(opts.PUK, "PUK" + str(pin_nr))
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
|
||||
puk = self.get_code(opts.PUK, "PUK" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.unblock_chv(
|
||||
pin_nr, h2b(puk), h2b(new_pin))
|
||||
opts.pin_nr, h2b(puk), h2b(new_pin))
|
||||
self._cmd.poutput("CHV unblock successful")
|
||||
|
||||
change_chv_parser = argparse.ArgumentParser()
|
||||
@@ -1008,42 +952,42 @@ class Iso7816Commands(CommandSet):
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
change_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(change_chv_parser)
|
||||
change_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
|
||||
@cmd2.with_argparser(change_chv_parser)
|
||||
def do_change_chv(self, opts):
|
||||
"""Change PIN code to a new PIN code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.change_chv(
|
||||
pin_nr, h2b(pin), h2b(new_pin))
|
||||
opts.pin_nr, h2b(pin), h2b(new_pin))
|
||||
self._cmd.poutput("CHV change successful")
|
||||
|
||||
disable_chv_parser = argparse.ArgumentParser()
|
||||
disable_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
disable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
__add_pin_nr_to_ArgumentParser(disable_chv_parser)
|
||||
|
||||
@cmd2.with_argparser(disable_chv_parser)
|
||||
def do_disable_chv(self, opts):
|
||||
"""Disable PIN code using specified PIN code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.disable_chv(pin_nr, h2b(pin))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.disable_chv(opts.pin_nr, h2b(pin))
|
||||
self._cmd.poutput("CHV disable successful")
|
||||
|
||||
enable_chv_parser = argparse.ArgumentParser()
|
||||
__add_pin_nr_to_ArgumentParser(enable_chv_parser)
|
||||
enable_chv_parser.add_argument(
|
||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
||||
enable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||
help='PIN code value. If none given, CSV file will be queried')
|
||||
|
||||
@cmd2.with_argparser(enable_chv_parser)
|
||||
def do_enable_chv(self, opts):
|
||||
"""Enable PIN code using specified PIN code"""
|
||||
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.enable_chv(pin_nr, h2b(pin))
|
||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
||||
(data, sw) = self._cmd.lchan.scc.enable_chv(opts.pin_nr, h2b(pin))
|
||||
self._cmd.poutput("CHV enable successful")
|
||||
|
||||
def do_deactivate_file(self, opts):
|
||||
@@ -1127,23 +1071,16 @@ argparse_add_reader_args(option_parser)
|
||||
global_group = option_parser.add_argument_group('General Options')
|
||||
global_group.add_argument('--script', metavar='PATH', default=None,
|
||||
help='script with pySim-shell commands to be executed automatically at start-up')
|
||||
global_group.add_argument('--csv', metavar='FILE',
|
||||
default=None, help='Read card data from CSV file')
|
||||
global_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
help='per-CSV-column AES transport key')
|
||||
global_group.add_argument("--card_handler", dest="card_handler_config", metavar="FILE",
|
||||
help="Use automatic card handling machine")
|
||||
global_group.add_argument("--noprompt", help="Run in non interactive mode",
|
||||
action='store_true', default=False)
|
||||
global_group.add_argument("--skip-card-init", help="Skip all card/profile initialization",
|
||||
action='store_true', default=False)
|
||||
global_group.add_argument("--verbose", help="Enable verbose logging",
|
||||
action='store_true', default=False)
|
||||
|
||||
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
|
||||
card_key_group.add_argument('--csv', metavar='FILE',
|
||||
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
|
||||
help='Read card data from CSV file')
|
||||
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
help=argparse.SUPPRESS, dest='column_key')
|
||||
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
help='per-column AES transport key', dest='column_key')
|
||||
|
||||
adm_group = global_group.add_mutually_exclusive_group()
|
||||
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
|
||||
@@ -1158,27 +1095,23 @@ option_parser.add_argument("command", nargs='?',
|
||||
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
|
||||
help="Optional Arguments for command")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
startup_errors = False
|
||||
opts = option_parser.parse_args()
|
||||
|
||||
# Ensure that we are able to print formatted warnings from the beginning.
|
||||
PySimLogger.setup(print, {logging.WARN: YELLOW})
|
||||
if (opts.verbose):
|
||||
PySimLogger.set_verbose(True)
|
||||
PySimLogger.set_level(logging.DEBUG)
|
||||
else:
|
||||
PySimLogger.set_verbose(False)
|
||||
PySimLogger.set_level(logging.INFO)
|
||||
|
||||
# Register csv-file as card data provider, either from specified CSV
|
||||
# or from CSV file in home directory
|
||||
column_keys = {}
|
||||
for par in opts.column_key:
|
||||
csv_column_keys = {}
|
||||
for par in opts.csv_column_key:
|
||||
name, key = par.split(':')
|
||||
column_keys[name] = key
|
||||
if os.path.isfile(opts.csv):
|
||||
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
|
||||
csv_column_keys[name] = key
|
||||
csv_default = str(Path.home()) + "/.osmocom/pysim/card_data.csv"
|
||||
if opts.csv:
|
||||
card_key_provider_register(CardKeyProviderCsv(opts.csv, csv_column_keys))
|
||||
if os.path.isfile(csv_default):
|
||||
card_key_provider_register(CardKeyProviderCsv(csv_default, csv_column_keys))
|
||||
|
||||
# Init card reader driver
|
||||
sl = init_reader(opts, proactive_handler = Proact())
|
||||
@@ -1194,7 +1127,7 @@ if __name__ == '__main__':
|
||||
# able to tolerate and recover from that.
|
||||
try:
|
||||
rs, card = init_card(sl, opts.skip_card_init)
|
||||
app = PysimApp(opts.verbose, card, rs, sl, ch)
|
||||
app = PysimApp(card, rs, sl, ch)
|
||||
except:
|
||||
startup_errors = True
|
||||
print("Card initialization (%s) failed with an exception:" % str(sl))
|
||||
@@ -1206,7 +1139,7 @@ if __name__ == '__main__':
|
||||
print(" it should also be noted that some readers may behave strangely when no card")
|
||||
print(" is inserted.)")
|
||||
print("")
|
||||
app = PysimApp(opts.verbose, None, None, sl, ch)
|
||||
app = PysimApp(None, None, sl, ch)
|
||||
|
||||
# If the user supplies an ADM PIN at via commandline args authenticate
|
||||
# immediately so that the user does not have to use the shell commands
|
||||
|
||||
@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
|
||||
operation with pySim-shell.
|
||||
"""
|
||||
|
||||
# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
|
||||
# (C) 2021-2024 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier, Harald Welte
|
||||
@@ -31,161 +31,128 @@ operation with pySim-shell.
|
||||
from typing import List, Dict, Optional
|
||||
from Cryptodome.Cipher import AES
|
||||
from osmocom.utils import h2b, b2h
|
||||
from pySim.log import PySimLogger
|
||||
|
||||
import abc
|
||||
import csv
|
||||
import logging
|
||||
|
||||
log = PySimLogger.get("CARDKEY")
|
||||
|
||||
card_key_providers = [] # type: List['CardKeyProvider']
|
||||
|
||||
class CardKeyFieldCryptor:
|
||||
"""
|
||||
A Card key field encryption class that may be used by Card key provider implementations to add support for
|
||||
a column-based encryption to protect sensitive material (cryptographic key material, ADM keys, etc.).
|
||||
The sensitive material is encrypted using a "key-encryption key", occasionally also known as "transport key"
|
||||
before it is stored into a file or database (see also GSMA FS.28). The "transport key" is then used to decrypt
|
||||
the key material on demand.
|
||||
"""
|
||||
|
||||
# well-known groups of columns relate to a given functionality. This avoids having
|
||||
# to specify the same transport key N number of times, if the same key is used for multiple
|
||||
# fields of one group, like KIC+KID+KID of one SD.
|
||||
__CRYPT_GROUPS = {
|
||||
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
|
||||
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
|
||||
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
|
||||
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
||||
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
|
||||
# well-known groups of columns relate to a given functionality. This avoids having
|
||||
# to specify the same transport key N number of times, if the same key is used for multiple
|
||||
# fields of one group, like KIC+KID+KID of one SD.
|
||||
CRYPT_GROUPS = {
|
||||
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
|
||||
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
|
||||
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
|
||||
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
||||
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
|
||||
}
|
||||
|
||||
__IV = b'\x23' * 16
|
||||
|
||||
@staticmethod
|
||||
def __dict_keys_to_upper(d: dict) -> dict:
|
||||
return {k.upper():v for k,v in d.items()}
|
||||
|
||||
@staticmethod
|
||||
def __process_transport_keys(transport_keys: dict, crypt_groups: dict):
|
||||
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
|
||||
new_dict = {}
|
||||
for name, key in transport_keys.items():
|
||||
if name in crypt_groups:
|
||||
for field in crypt_groups[name]:
|
||||
new_dict[field] = key
|
||||
else:
|
||||
new_dict[name] = key
|
||||
return new_dict
|
||||
|
||||
def __init__(self, transport_keys: dict):
|
||||
"""
|
||||
Create new field encryptor/decryptor object and set transport keys, usually one for each column. In some cases
|
||||
it is also possible to use a single key for multiple columns (see also __CRYPT_GROUPS)
|
||||
|
||||
Args:
|
||||
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
|
||||
respective field (column) of the CSV. This is done so that different fields
|
||||
(columns) can use different transport keys, which is strongly recommended by
|
||||
GSMA FS.28
|
||||
"""
|
||||
self.transport_keys = self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys),
|
||||
self.__CRYPT_GROUPS)
|
||||
for name, key in self.transport_keys.items():
|
||||
log.debug("Encrypting/decrypting field %s using AES key %s" % (name, key))
|
||||
|
||||
def decrypt_field(self, field_name: str, encrypted_val: str) -> str:
|
||||
"""
|
||||
Decrypt a single field. The decryption is only applied if we have a transport key is known under the provided
|
||||
field name, otherwise the field is treated as plaintext and passed through as it is.
|
||||
|
||||
Args:
|
||||
field_name : name of the field to decrypt (used to identify which key to use)
|
||||
encrypted_val : encrypted field value
|
||||
|
||||
Returns:
|
||||
plaintext field value
|
||||
"""
|
||||
if not field_name.upper() in self.transport_keys:
|
||||
return encrypted_val
|
||||
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
||||
return b2h(cipher.decrypt(h2b(encrypted_val)))
|
||||
|
||||
def encrypt_field(self, field_name: str, plaintext_val: str) -> str:
|
||||
"""
|
||||
Encrypt a single field. The encryption is only applied if we have a transport key is known under the provided
|
||||
field name, otherwise the field is treated as non sensitive and passed through as it is.
|
||||
|
||||
Args:
|
||||
field_name : name of the field to decrypt (used to identify which key to use)
|
||||
encrypted_val : encrypted field value
|
||||
|
||||
Returns:
|
||||
plaintext field value
|
||||
"""
|
||||
if not field_name.upper() in self.transport_keys:
|
||||
return plaintext_val
|
||||
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
||||
return b2h(cipher.encrypt(h2b(plaintext_val)))
|
||||
|
||||
class CardKeyProvider(abc.ABC):
|
||||
"""Base class, not containing any concrete implementation."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||
"""
|
||||
Get multiple card-individual fields for identified card. This method should not fail with an exception in
|
||||
case the entry, columns or even the key column itsself is not found.
|
||||
VALID_KEY_FIELD_NAMES = ['ICCID', 'EID', 'IMSI' ]
|
||||
|
||||
# check input parameters, but do nothing concrete yet
|
||||
def _verify_get_data(self, fields: List[str] = [], key: str = 'ICCID', value: str = "") -> Dict[str, str]:
|
||||
"""Verify multiple fields for identified card.
|
||||
|
||||
Args:
|
||||
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
|
||||
key : look-up key to identify card data, such as 'ICCID'
|
||||
value : value for look-up key to identify card data
|
||||
Returns:
|
||||
dictionary of {field : value, ...} strings for each requested field from 'fields'. In case nothing is
|
||||
fond None shall be returned.
|
||||
dictionary of {field, value} strings for each requested field from 'fields'
|
||||
"""
|
||||
|
||||
if key not in self.VALID_KEY_FIELD_NAMES:
|
||||
raise ValueError("Key field name '%s' is not a valid field name, valid field names are: %s" %
|
||||
(key, str(self.VALID_KEY_FIELD_NAMES)))
|
||||
|
||||
return {}
|
||||
|
||||
def get_field(self, field: str, key: str = 'ICCID', value: str = "") -> Optional[str]:
|
||||
"""get a single field from CSV file using a specified key/value pair"""
|
||||
fields = [field]
|
||||
result = self.get(fields, key, value)
|
||||
return result.get(field)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||
"""Get multiple card-individual fields for identified card.
|
||||
|
||||
Args:
|
||||
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
|
||||
key : look-up key to identify card data, such as 'ICCID'
|
||||
value : value for look-up key to identify card data
|
||||
Returns:
|
||||
dictionary of {field, value} strings for each requested field from 'fields'
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return type(self).__name__
|
||||
|
||||
class CardKeyProviderCsv(CardKeyProvider):
|
||||
"""Card key provider implementation that allows to query against a specified CSV file."""
|
||||
"""Card key provider implementation that allows to query against a specified CSV file.
|
||||
Supports column-based encryption as it is generally a bad idea to store cryptographic key material in
|
||||
plaintext. Instead, the key material should be encrypted by a "key-encryption key", occasionally also
|
||||
known as "transport key" (see GSMA FS.28)."""
|
||||
IV = b'\x23' * 16
|
||||
csv_file = None
|
||||
filename = None
|
||||
|
||||
def __init__(self, csv_filename: str, transport_keys: dict):
|
||||
def __init__(self, filename: str, transport_keys: dict):
|
||||
"""
|
||||
Args:
|
||||
csv_filename : file name (path) of CSV file containing card-individual key/data
|
||||
transport_keys : (see class CardKeyFieldCryptor)
|
||||
filename : file name (path) of CSV file containing card-individual key/data
|
||||
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
|
||||
respective field (column) of the CSV. This is done so that different fields
|
||||
(columns) can use different transport keys, which is strongly recommended by
|
||||
GSMA FS.28
|
||||
"""
|
||||
self.csv_file = open(csv_filename, 'r')
|
||||
self.csv_file = open(filename, 'r')
|
||||
if not self.csv_file:
|
||||
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
||||
self.csv_filename = csv_filename
|
||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||
raise RuntimeError("Could not open CSV file '%s'" % filename)
|
||||
self.filename = filename
|
||||
self.transport_keys = self.process_transport_keys(transport_keys)
|
||||
|
||||
@staticmethod
|
||||
def process_transport_keys(transport_keys: dict):
|
||||
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
|
||||
new_dict = {}
|
||||
for name, key in transport_keys.items():
|
||||
if name in CRYPT_GROUPS:
|
||||
for field in CRYPT_GROUPS[name]:
|
||||
new_dict[field] = key
|
||||
else:
|
||||
new_dict[name] = key
|
||||
return new_dict
|
||||
|
||||
def _decrypt_field(self, field_name: str, encrypted_val: str) -> str:
|
||||
"""decrypt a single field, if we have a transport key for the field of that name."""
|
||||
if not field_name in self.transport_keys:
|
||||
return encrypted_val
|
||||
cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, self.IV)
|
||||
return b2h(cipher.decrypt(h2b(encrypted_val)))
|
||||
|
||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||
super()._verify_get_data(fields, key, value)
|
||||
|
||||
self.csv_file.seek(0)
|
||||
cr = csv.DictReader(self.csv_file)
|
||||
if not cr:
|
||||
raise RuntimeError("Could not open DictReader for CSV-File '%s'" % self.csv_filename)
|
||||
raise RuntimeError(
|
||||
"Could not open DictReader for CSV-File '%s'" % self.filename)
|
||||
cr.fieldnames = [field.upper() for field in cr.fieldnames]
|
||||
|
||||
if key not in cr.fieldnames:
|
||||
return None
|
||||
return_dict = {}
|
||||
rc = {}
|
||||
for row in cr:
|
||||
if row[key] == value:
|
||||
for f in fields:
|
||||
if f in row:
|
||||
return_dict.update({f: self.crypt.decrypt_field(f, row[f])})
|
||||
rc.update({f: self._decrypt_field(f, row[f])})
|
||||
else:
|
||||
raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.csv_filename, f))
|
||||
if return_dict == {}:
|
||||
return None
|
||||
return return_dict
|
||||
|
||||
raise RuntimeError("CSV-File '%s' lacks column '%s'" %
|
||||
(self.filename, f))
|
||||
return rc
|
||||
|
||||
|
||||
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
||||
@@ -200,7 +167,7 @@ def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key
|
||||
provider_list.append(provider)
|
||||
|
||||
|
||||
def card_key_provider_get(fields: list[str], key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
|
||||
def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
|
||||
"""Query all registered card data providers for card-individual [key] data.
|
||||
|
||||
Args:
|
||||
@@ -211,21 +178,17 @@ def card_key_provider_get(fields: list[str], key: str, value: str, provider_list
|
||||
Returns:
|
||||
dictionary of {field, value} strings for each requested field from 'fields'
|
||||
"""
|
||||
key = key.upper()
|
||||
fields = [f.upper() for f in fields]
|
||||
for p in provider_list:
|
||||
if not isinstance(p, CardKeyProvider):
|
||||
raise ValueError("Provider list contains element which is not a card data provider")
|
||||
log.debug("Searching for card key data (key=%s, value=%s, provider=%s)" % (key, value, str(p)))
|
||||
raise ValueError(
|
||||
"provider list contains element which is not a card data provider")
|
||||
result = p.get(fields, key, value)
|
||||
if result:
|
||||
log.debug("Found card data: %s" % (str(result)))
|
||||
return result
|
||||
|
||||
raise ValueError("Unable to find card key data (key=%s, value=%s, fields=%s)" % (key, value, str(fields)))
|
||||
return {}
|
||||
|
||||
|
||||
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> str:
|
||||
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> Optional[str]:
|
||||
"""Query all registered card data providers for a single field.
|
||||
|
||||
Args:
|
||||
@@ -236,7 +199,11 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
|
||||
Returns:
|
||||
dictionary of {field, value} strings for the requested field
|
||||
"""
|
||||
|
||||
fields = [field]
|
||||
result = card_key_provider_get(fields, key, value, card_key_providers)
|
||||
return result.get(field.upper())
|
||||
for p in provider_list:
|
||||
if not isinstance(p, CardKeyProvider):
|
||||
raise ValueError(
|
||||
"provider list contains element which is not a card data provider")
|
||||
result = p.get_field(field, key, value)
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
@@ -141,7 +141,7 @@ class SimCardCommands:
|
||||
Returns:
|
||||
Tuple of (decoded_data, sw)
|
||||
"""
|
||||
cmd = cmd_constr.build(cmd_data) if cmd_data else b''
|
||||
cmd = cmd_constr.build(cmd_data) if cmd_data else ''
|
||||
lc = i2h([len(cmd)]) if cmd_data else ''
|
||||
le = '00' if resp_constr else ''
|
||||
pdu = ''.join([cla, ins, p1, p2, lc, b2h(cmd), le])
|
||||
|
||||
@@ -76,11 +76,10 @@ def gen_replace_session_keys(ppk_enc: bytes, ppk_cmac: bytes, initial_mcv: bytes
|
||||
class ProfileMetadata:
|
||||
"""Representation of Profile metadata. Right now only the mandatory bits are
|
||||
supported, but in general this should follow the StoreMetadataRequest of SGP.22 5.5.3"""
|
||||
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str, profile_class = 'operational'):
|
||||
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str):
|
||||
self.iccid_bin = iccid_bin
|
||||
self.spn = spn
|
||||
self.profile_name = profile_name
|
||||
self.profile_class = profile_class
|
||||
self.icon = None
|
||||
self.icon_type = None
|
||||
self.notifications = []
|
||||
@@ -106,14 +105,6 @@ class ProfileMetadata:
|
||||
'serviceProviderName': self.spn,
|
||||
'profileName': self.profile_name,
|
||||
}
|
||||
if self.profile_class == 'test':
|
||||
smr['profileClass'] = 0
|
||||
elif self.profile_class == 'provisioning':
|
||||
smr['profileClass'] = 1
|
||||
elif self.profile_class == 'operational':
|
||||
smr['profileClass'] = 2
|
||||
else:
|
||||
raise ValueError('Unsupported Profile Class %s' % self.profile_class)
|
||||
if self.icon:
|
||||
smr['icon'] = self.icon
|
||||
smr['iconType'] = self.icon_type
|
||||
|
||||
@@ -985,9 +985,9 @@ class SecurityDomainKey:
|
||||
self.key_components = key_components
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=0x%x, Comp=%s)' % (self.key_version_number,
|
||||
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=%s, Comp=%s)' % (self.key_version_number,
|
||||
self.key_identifier,
|
||||
build_construct(KeyUsageQualifier, self.key_usage_qualifier)[0],
|
||||
self.key_usage_qualifier,
|
||||
repr(self.key_components))
|
||||
|
||||
@classmethod
|
||||
@@ -1006,13 +1006,6 @@ class SecurityDomainKey:
|
||||
'keyVersionNumber': bytes([self.key_version_number]),
|
||||
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
||||
|
||||
def get_key_component(self, key_type):
|
||||
for kc in self.key_components:
|
||||
if kc.key_type == key_type:
|
||||
return kc.key_data
|
||||
return None
|
||||
|
||||
|
||||
class ProfileElementSD(ProfileElement):
|
||||
"""Class representing a securityDomain ProfileElement."""
|
||||
type = 'securityDomain'
|
||||
|
||||
@@ -1,237 +0,0 @@
|
||||
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
|
||||
#
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: nhofmeyr@sysmocom.de
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import secrets
|
||||
import re
|
||||
from pySim.utils import all_subclasses_of
|
||||
from osmocom.utils import b2h
|
||||
|
||||
class ParamSourceExn(Exception):
|
||||
pass
|
||||
|
||||
class ParamSourceExhaustedExn(ParamSourceExn):
|
||||
pass
|
||||
|
||||
class ParamSourceUndefinedExn(ParamSourceExn):
|
||||
pass
|
||||
|
||||
class ParamSource:
|
||||
'abstract parameter source. For usage, see personalization.BatchPersonalization.'
|
||||
is_abstract = True
|
||||
|
||||
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
|
||||
name = 'none'
|
||||
|
||||
@classmethod
|
||||
def get_all_implementations(cls, blacklist=None):
|
||||
"return all subclasses of ParamSource that have is_abstract = False."
|
||||
# return a set() so that multiple inheritance does not return dups
|
||||
return set(c
|
||||
for c in all_subclasses_of(cls)
|
||||
if (not c.is_abstract) and ((not blacklist) or (c not in blacklist))
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
'''Subclasses implement this:
|
||||
if a parameter source defines some string input magic, override this function.
|
||||
For example, a RandomDigitSource derives the number of digits from the string length,
|
||||
so the user can enter '0000' to get a four digit random number.'''
|
||||
return cls(s)
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
'''Subclasses implement this: return the next value from the parameter source.
|
||||
When there are no more values from the source, raise a ParamSourceExhaustedExn.'''
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
|
||||
class ConstantSource(ParamSource):
|
||||
'one value for all'
|
||||
is_abstract = False
|
||||
name = 'constant'
|
||||
|
||||
def __init__(self, val:str):
|
||||
self.val = val
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
return self.val
|
||||
|
||||
class InputExpandingParamSource(ParamSource):
|
||||
|
||||
@classmethod
|
||||
def expand_str(cls, s:str):
|
||||
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
|
||||
if '*' not in s:
|
||||
return s
|
||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
|
||||
if len(tokens) < 3:
|
||||
return s
|
||||
parts = []
|
||||
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
|
||||
parts.append(unchanged)
|
||||
repeat = int(repeat_str)
|
||||
parts.append(snippet * repeat)
|
||||
return ''.join(parts)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
return cls(cls.expand_str(s))
|
||||
|
||||
class RandomSourceMixin:
|
||||
random_impl = secrets.SystemRandom()
|
||||
|
||||
class RandomDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
'return a different sequence of random decimal digits each'
|
||||
is_abstract = False
|
||||
name = 'random decimal digits'
|
||||
used_keys = set()
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
"""
|
||||
See also from_str().
|
||||
|
||||
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
|
||||
num_digits: number of random digits (possibly with leading zeros) to generate.
|
||||
first_value, last_value: the decimal range in which to provide random digits.
|
||||
"""
|
||||
num_digits = int(num_digits)
|
||||
first_value = int(first_value)
|
||||
last_value = int(last_value)
|
||||
assert num_digits > 0
|
||||
assert first_value <= last_value
|
||||
self.num_digits = num_digits
|
||||
self.val_first_last = (first_value, last_value)
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
# try to generate random digits that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randint(*self.val_first_last)
|
||||
if val in RandomDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
continue
|
||||
RandomDigitSource.used_keys.add(val)
|
||||
break
|
||||
return self.val_to_digit(val)
|
||||
|
||||
def val_to_digit(self, val:int):
|
||||
return '%0*d' % (self.num_digits, val) # pylint: disable=consider-using-f-string
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
|
||||
if '..' in s:
|
||||
first_str, last_str = s.split('..')
|
||||
first_str = first_str.strip()
|
||||
last_str = last_str.strip()
|
||||
else:
|
||||
first_str = s.strip()
|
||||
last_str = None
|
||||
|
||||
first_value = int(first_str)
|
||||
last_value = int(last_str) if last_str is not None else '9' * len(first_str)
|
||||
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
|
||||
|
||||
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
'return a different sequence of random hexadecimal digits each'
|
||||
is_abstract = False
|
||||
name = 'random hexadecimal digits'
|
||||
used_keys = set()
|
||||
|
||||
def __init__(self, num_digits):
|
||||
'see from_str()'
|
||||
num_digits = int(num_digits)
|
||||
if num_digits < 1:
|
||||
raise ValueError('zero number of digits')
|
||||
# hex digits always come in two
|
||||
if (num_digits & 1) != 0:
|
||||
raise ValueError(f'hexadecimal value should have even number of digits, not {num_digits}')
|
||||
self.num_digits = num_digits
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
# try to generate random bytes that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randbytes(self.num_digits // 2)
|
||||
if val in RandomHexDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
continue
|
||||
RandomHexDigitSource.used_keys.add(val)
|
||||
break
|
||||
|
||||
return b2h(val)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
return cls(num_digits=len(s.strip()))
|
||||
|
||||
class IncDigitSource(RandomDigitSource):
|
||||
'incrementing sequence of digits'
|
||||
is_abstract = False
|
||||
name = 'incrementing decimal digits'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"The arguments defining the number of digits and value range are identical to RandomDigitSource.__init__()."
|
||||
super().__init__(*args, **kwargs)
|
||||
self.next_val = None
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
"Restart from the first value of the defined range passed to __init__()."
|
||||
self.next_val = self.val_first_last[0]
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = self.next_val
|
||||
if val is None:
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
returnval = self.val_to_digit(val)
|
||||
|
||||
val += 1
|
||||
if val > self.val_first_last[1]:
|
||||
self.next_val = None
|
||||
else:
|
||||
self.next_val = val
|
||||
|
||||
return returnval
|
||||
|
||||
class CsvSource(ParamSource):
|
||||
'apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)'
|
||||
is_abstract = False
|
||||
name = 'from CSV'
|
||||
|
||||
def __init__(self, csv_column):
|
||||
"""
|
||||
csv_column: column name indicating the column to use for this parameter.
|
||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
||||
CsvSource picks the column with the name matching csv_column.
|
||||
"""
|
||||
self.csv_column = csv_column
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = None
|
||||
if csv_row:
|
||||
val = csv_row.get(self.csv_column)
|
||||
if not val:
|
||||
raise ParamSourceUndefinedExn(f'no value for CSV column {self.csv_column!r}')
|
||||
return val
|
||||
File diff suppressed because it is too large
Load Diff
@@ -103,26 +103,6 @@ class CheckBasicStructure(ProfileConstraintChecker):
|
||||
if 'profile-a-p256' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs):
|
||||
raise ProfileError('profile-a-p256 mandatory, but no usim or isim')
|
||||
|
||||
def check_mandatory_services_aka(self, pes: ProfileElementSequence):
|
||||
"""Ensure that no unnecessary authentication related services are marked as mandatory but not
|
||||
actually used within the profile"""
|
||||
m_svcs = pes.get_pe_for_type('header').decoded['eUICC-Mandatory-services']
|
||||
# list of tuples (algo_id, key_len_in_octets) for all the akaParameters in the PE Sequence
|
||||
algo_id_klen = [(x.decoded['algoConfiguration'][1]['algorithmID'],
|
||||
len(x.decoded['algoConfiguration'][1]['key'])) for x in pes.get_pes_for_type('akaParameter')]
|
||||
# just a plain list of algorithm IDs in akaParameters
|
||||
algorithm_ids = [x[0] for x in algo_id_klen]
|
||||
if 'milenage' in m_svcs and not 1 in algorithm_ids:
|
||||
raise ProfileError('milenage mandatory, but no related algorithm_id in akaParameter')
|
||||
if 'tuak128' in m_svcs and not (2, 128/8) in algo_id_klen:
|
||||
raise ProfileError('tuak128 mandatory, but no related algorithm_id in akaParameter')
|
||||
if 'cave' in m_svcs and not pes.get_pe_for_type('cdmaParameter'):
|
||||
raise ProfileError('cave mandatory, but no related cdmaParameter')
|
||||
if 'tuak256' in m_svcs and (2, 256/8) in algo_id_klen:
|
||||
raise ProfileError('tuak256 mandatory, but no related algorithm_id in akaParameter')
|
||||
if 'usim-test-algorithm' in m_svcs and not 3 in algorithm_ids:
|
||||
raise ProfileError('usim-test-algorithm mandatory, but no related algorithm_id in akaParameter')
|
||||
|
||||
def check_identification_unique(self, pes: ProfileElementSequence):
|
||||
"""Ensure that each PE has a unique identification value."""
|
||||
id_list = [pe.header['identification'] for pe in pes.pe_list if pe.header]
|
||||
|
||||
@@ -91,7 +91,6 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
||||
|
||||
# Key Usage:
|
||||
# KVN 0x01 .. 0x0F reserved for SCP80
|
||||
# KVN 0x81 .. 0x8f reserved for SCP81
|
||||
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
|
||||
# KVN 0x20 .. 0x2F reserved for SCP02
|
||||
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK
|
||||
|
||||
125
pySim/log.py
125
pySim/log.py
@@ -1,125 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" pySim: Logging
|
||||
"""
|
||||
|
||||
#
|
||||
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import logging
|
||||
from cmd2 import style
|
||||
|
||||
class _PySimLogHandler(logging.Handler):
|
||||
def __init__(self, log_callback):
|
||||
super().__init__()
|
||||
self.log_callback = log_callback
|
||||
|
||||
def emit(self, record):
|
||||
formatted_message = self.format(record)
|
||||
self.log_callback(formatted_message, record)
|
||||
|
||||
class PySimLogger:
|
||||
"""
|
||||
Static class to centralize the log output of PySim applications. This class can be used to print log messages from
|
||||
any pySim module. Configuration of the log behaviour (see setup and set_ methods) is entirely optional. In case no
|
||||
print callback is set (see setup method), the logger will pass the log messages directly to print() without applying
|
||||
any formatting to the original log message.
|
||||
"""
|
||||
|
||||
LOG_FMTSTR = "%(levelname)s: %(message)s"
|
||||
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
|
||||
__formatter = logging.Formatter(LOG_FMTSTR)
|
||||
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
|
||||
|
||||
# No print callback by default, means that log messages are passed directly to print()
|
||||
print_callback = None
|
||||
|
||||
# No specific color scheme by default
|
||||
colors = {}
|
||||
|
||||
# The logging default is non-verbose logging on logging level DEBUG. This is a safe default that works for
|
||||
# applications that ignore the presence of the PySimLogger class.
|
||||
verbose = False
|
||||
logging.root.setLevel(logging.DEBUG)
|
||||
|
||||
def __init__(self):
|
||||
raise RuntimeError('static class, do not instantiate')
|
||||
|
||||
@staticmethod
|
||||
def setup(print_callback = None, colors:dict = {}):
|
||||
"""
|
||||
Set a print callback function and color scheme. This function call is optional. In case this method is not
|
||||
called, default settings apply.
|
||||
Args:
|
||||
print_callback : A callback function that accepts the resulting log string as input. The callback should
|
||||
have the following format: print_callback(message:str)
|
||||
colors : An optional dict through which certain log levels can be assigned a color.
|
||||
(e.g. {logging.WARN: YELLOW})
|
||||
"""
|
||||
PySimLogger.print_callback = print_callback
|
||||
PySimLogger.colors = colors
|
||||
|
||||
@staticmethod
|
||||
def set_verbose(verbose:bool = False):
|
||||
"""
|
||||
Enable/disable verbose logging. (has no effect in case no print callback is set, see method setup)
|
||||
Args:
|
||||
verbose: verbosity (True = verbose logging, False = normal logging)
|
||||
"""
|
||||
PySimLogger.verbose = verbose;
|
||||
|
||||
@staticmethod
|
||||
def set_level(level:int = logging.DEBUG):
|
||||
"""
|
||||
Set the logging level.
|
||||
Args:
|
||||
level: Logging level, valis log leves are: DEBUG, INFO, WARNING, ERROR and CRITICAL
|
||||
"""
|
||||
logging.root.setLevel(level)
|
||||
|
||||
@staticmethod
|
||||
def _log_callback(message, record):
|
||||
if not PySimLogger.print_callback:
|
||||
# In case no print callback has been set display the message as if it were printed trough a normal
|
||||
# python print statement.
|
||||
print(record.message)
|
||||
else:
|
||||
# When a print callback is set, use it to display the log line. Apply color if the API user chose one
|
||||
if PySimLogger.verbose:
|
||||
formatted_message = logging.Formatter.format(PySimLogger.__formatter_verbose, record)
|
||||
else:
|
||||
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
|
||||
color = PySimLogger.colors.get(record.levelno)
|
||||
if color:
|
||||
PySimLogger.print_callback(style(formatted_message, fg = color))
|
||||
else:
|
||||
PySimLogger.print_callback(formatted_message)
|
||||
|
||||
@staticmethod
|
||||
def get(log_facility: str):
|
||||
"""
|
||||
Set up and return a new python logger object
|
||||
Args:
|
||||
log_facility : Name of log facility (e.g. "MAIN", "RUNTIME"...)
|
||||
"""
|
||||
logger = logging.getLogger(log_facility)
|
||||
handler = _PySimLogHandler(log_callback=PySimLogger._log_callback)
|
||||
logger.addHandler(handler)
|
||||
return logger
|
||||
@@ -23,9 +23,6 @@ from osmocom.tlv import bertlv_parse_one
|
||||
|
||||
from pySim.exceptions import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.log import PySimLogger
|
||||
|
||||
log = PySimLogger.get("RUNTIME")
|
||||
|
||||
def lchan_nr_from_cla(cla: int) -> int:
|
||||
"""Resolve the logical channel number from the CLA byte."""
|
||||
@@ -47,7 +44,6 @@ class RuntimeState:
|
||||
card : pysim.cards.Card instance
|
||||
profile : CardProfile instance
|
||||
"""
|
||||
|
||||
self.mf = CardMF(profile=profile)
|
||||
self.card = card
|
||||
self.profile = profile
|
||||
@@ -70,7 +66,7 @@ class RuntimeState:
|
||||
for addon_cls in self.profile.addons:
|
||||
addon = addon_cls()
|
||||
if addon.probe(self.card):
|
||||
log.info("Detected %s Add-on \"%s\"" % (self.profile, addon))
|
||||
print("Detected %s Add-on \"%s\"" % (self.profile, addon))
|
||||
for f in addon.files_in_mf:
|
||||
self.mf.add_file(f)
|
||||
|
||||
@@ -104,18 +100,18 @@ class RuntimeState:
|
||||
apps_taken = []
|
||||
if aids_card:
|
||||
aids_taken = []
|
||||
log.info("AIDs on card:")
|
||||
print("AIDs on card:")
|
||||
for a in aids_card:
|
||||
for f in apps_profile:
|
||||
if f.aid in a:
|
||||
log.info(" %s: %s (EF.DIR)" % (f.name, a))
|
||||
print(" %s: %s (EF.DIR)" % (f.name, a))
|
||||
aids_taken.append(a)
|
||||
apps_taken.append(f)
|
||||
aids_unknown = set(aids_card) - set(aids_taken)
|
||||
for a in aids_unknown:
|
||||
log.info(" unknown: %s (EF.DIR)" % a)
|
||||
print(" unknown: %s (EF.DIR)" % a)
|
||||
else:
|
||||
log.warn("EF.DIR seems to be empty!")
|
||||
print("warning: EF.DIR seems to be empty!")
|
||||
|
||||
# Some card applications may not be registered in EF.DIR, we will actively
|
||||
# probe for those applications
|
||||
@@ -130,7 +126,7 @@ class RuntimeState:
|
||||
_data, sw = self.card.select_adf_by_aid(f.aid)
|
||||
self.selected_adf = f
|
||||
if sw == "9000":
|
||||
log.info(" %s: %s" % (f.name, f.aid))
|
||||
print(" %s: %s" % (f.name, f.aid))
|
||||
apps_taken.append(f)
|
||||
except (SwMatchError, ProtocolError):
|
||||
pass
|
||||
@@ -477,15 +473,11 @@ class RuntimeLchan:
|
||||
|
||||
def get_file_for_filename(self, name: str):
|
||||
"""Get the related CardFile object for a specified filename."""
|
||||
if is_hex(name):
|
||||
name = name.lower()
|
||||
sels = self.selected_file.get_selectables()
|
||||
return sels[name]
|
||||
|
||||
def activate_file(self, name: str):
|
||||
"""Request ACTIVATE FILE of specified file."""
|
||||
if is_hex(name):
|
||||
name = name.lower()
|
||||
sels = self.selected_file.get_selectables()
|
||||
f = sels[name]
|
||||
data, sw = self.scc.activate_file(f.fid)
|
||||
@@ -518,47 +510,6 @@ class RuntimeLchan:
|
||||
dec_data = self.selected_file.decode_hex(data)
|
||||
return (dec_data, sw)
|
||||
|
||||
def __get_writeable_size(self):
|
||||
""" Determine the writable size (file or record) using the cached FCP parameters of the currently selected
|
||||
file. Return None in case the writeable size cannot be determined (no FCP available, FCP lacks size
|
||||
information).
|
||||
"""
|
||||
fcp = self.selected_file_fcp
|
||||
if not fcp:
|
||||
return None
|
||||
|
||||
structure = fcp.get('file_descriptor', {}).get('file_descriptor_byte', {}).get('structure')
|
||||
if not structure:
|
||||
return None
|
||||
|
||||
if structure == 'transparent':
|
||||
return fcp.get('file_size')
|
||||
elif structure == 'linear_fixed':
|
||||
return fcp.get('file_descriptor', {}).get('record_len')
|
||||
else:
|
||||
return None
|
||||
|
||||
def __check_writeable_size(self, data_len):
|
||||
""" Guard against unsuccessful writes caused by attempts to write data that exceeds the file limits. """
|
||||
|
||||
writeable_size = self.__get_writeable_size()
|
||||
if not writeable_size:
|
||||
return
|
||||
|
||||
if isinstance(self.selected_file, TransparentEF):
|
||||
writeable_name = "file"
|
||||
elif isinstance(self.selected_file, LinFixedEF):
|
||||
writeable_name = "record"
|
||||
else:
|
||||
writeable_name = "object"
|
||||
|
||||
if data_len > writeable_size:
|
||||
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
|
||||
(data_len, writeable_name, writeable_size, data_len - writeable_size))
|
||||
elif data_len < writeable_size:
|
||||
log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
|
||||
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
|
||||
|
||||
def update_binary(self, data_hex: str, offset: int = 0):
|
||||
"""Update transparent EF binary data.
|
||||
|
||||
@@ -569,7 +520,6 @@ class RuntimeLchan:
|
||||
if not isinstance(self.selected_file, TransparentEF):
|
||||
raise TypeError("Only works with TransparentEF, but %s is %s" % (self.selected_file,
|
||||
self.selected_file.__class__.__mro__))
|
||||
self.__check_writeable_size(len(data_hex) // 2 + offset)
|
||||
return self.scc.update_binary(self.selected_file.fid, data_hex, offset, conserve=self.rs.conserve_write)
|
||||
|
||||
def update_binary_dec(self, data: dict):
|
||||
@@ -617,7 +567,6 @@ class RuntimeLchan:
|
||||
if not isinstance(self.selected_file, LinFixedEF):
|
||||
raise TypeError("Only works with Linear Fixed EF, but %s is %s" % (self.selected_file,
|
||||
self.selected_file.__class__.__mro__))
|
||||
self.__check_writeable_size(len(data_hex) // 2)
|
||||
return self.scc.update_record(self.selected_file.fid, rec_nr, data_hex,
|
||||
conserve=self.rs.conserve_write,
|
||||
leftpad=self.selected_file.leftpad)
|
||||
|
||||
@@ -750,7 +750,7 @@ class EF_ARR(LinFixedEF):
|
||||
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
|
||||
def do_read_arr_record(self, opts):
|
||||
"""Read one EF.ARR record in flattened, human-friendly form."""
|
||||
(data, _sw) = self._cmd.lchan.read_record_dec(opts.RECORD_NR)
|
||||
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
|
||||
data = self._cmd.lchan.selected_file.flatten(data)
|
||||
self._cmd.poutput_json(data, opts.oneline)
|
||||
|
||||
|
||||
@@ -1109,9 +1109,3 @@ class CardCommandSet:
|
||||
if cla and not cmd.match_cla(cla):
|
||||
return None
|
||||
return cmd
|
||||
|
||||
|
||||
def all_subclasses_of(cls):
|
||||
for subc in cls.__subclasses__():
|
||||
yield subc
|
||||
yield from all_subclasses_of(subc)
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
#!/bin/sh
|
||||
python3 -m pylint -j0 --errors-only --disable E1102 --disable E0401 --enable W0301 pySim
|
||||
2
setup.py
2
setup.py
@@ -21,7 +21,7 @@ setup(
|
||||
"pyscard",
|
||||
"pyserial",
|
||||
"pytlv",
|
||||
"cmd2 >= 1.5.0, < 3.0",
|
||||
"cmd2 >= 1.5.0",
|
||||
"jsonpath-ng",
|
||||
"construct >= 2.10.70",
|
||||
"bidict",
|
||||
|
||||
@@ -2,7 +2,7 @@ Detected UICC Add-on "SIM"
|
||||
Detected UICC Add-on "GSM-R"
|
||||
Detected UICC Add-on "RUIM"
|
||||
Can't read AIDs from SIM -- 'list' object has no attribute 'lower'
|
||||
EF.DIR seems to be empty!
|
||||
warning: EF.DIR seems to be empty!
|
||||
ADF.ECASD: a0000005591010ffffffff8900000200
|
||||
ADF.ISD-R: a0000005591010ffffffff8900000100
|
||||
ISIM: a0000000871004
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
../../smdpp-data
|
||||
@@ -1,5 +0,0 @@
|
||||
"card_type_id","formfactor_id","imsi","iccid","pin1","puk1","pin2","puk2","ki","adm1","adm2","proprietary","kic1","kic2","kic3","kid1","kid2","kid3","kik1","kik2","kik3","msisdn","acc","opc"
|
||||
"myCardType","3FF","901700000000001","8988211000000000001","1234","12345678","1223","12345678","AAAAAAAAAAA5435425AAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 01","BBBBBBBBBB3324BBBBBBBB21212BBBBB","CC7654CCCCCCCCCCCCCCCCCCCCCCCCCC","DDDD90DDDDDDDDDDDDDDDDDD767DDDDD","EEEEEE567657567567EEEEEEEEEEEEEE","FFFFFFFFFFFFFFFFFFF56765765FFFFF","11111567811111111111111111111111","22222222222222222227669999222222","33333333333333333333333333333333","44444444444444445234544444444444","55555555555","0001","66666666666666666666666666666666"
|
||||
"myCardType","3FF","901700000000002","8988211000000000002","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAA3425AAAA","10101010","9999999999999999","proprietary data 02","BBBBBB421BBBBBBBBBB12BBBBBBBBBBB","CCCCCCCCCC3456CCCCCCCCCCCCCCCCCC","DDDDDDDDD567657DDDD2DDDDDDDDDDDD","EEEEEEEE56756EEEEEEEEE567657EEEE","FFFFF567657FFFFFFFFFFFFFFFFFFFFF","11111111111146113433411576511111","22222222222223432225765222222222","33333333333333523453333333333333","44425435234444444444446544444444","55555555555","0001","66666666666666266666666666666666"
|
||||
"myCardType","3FF","901700000000003","8988211000000000003","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 03","BBBBBBB45678BBBB756765BBBBBBBBBB","CCCCCCCCCCCCCC76543CCCC56765CCCC","DDDDDDDDDDDDDDDDDD5676575DDDDDDD","EEEEEEEEEEEEEEEEEE56765EEEEEEEEE","FFFFFFFFFFFFFFF567657FFFFFFFFFFF","11111111119876511111111111111111","22222222222444422222222222576522","33333332543333576733333333333333","44444444444567657567444444444444","55555555555","0001","66666675676575666666666666666666"
|
||||
|
||||
|
@@ -1,152 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
import os
|
||||
from pySim.card_key_provider import *
|
||||
|
||||
class TestCardKeyProviderCsv(unittest.TestCase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
|
||||
"OPC" : "000102030405065545645645645D0E0F",
|
||||
"KIC1" : "06410203546406456456450B0C0D0E0F",
|
||||
"KID1" : "00040267840507667609045645645E0F",
|
||||
"KIK1" : "0001020307687607668678678C0D0E0F",
|
||||
"KIC2" : "000142457594860706090A0B0688678F",
|
||||
"KID2" : "600102030405649468690A0B0C0D648F",
|
||||
"KIK2" : "00010203330506070496330B08640E0F",
|
||||
"KIC3" : "000104030405064684686A068C0D0E0F",
|
||||
"KID3" : "00010243048468070809060B0C0D0E0F",
|
||||
"KIK3" : "00010204040506070809488B0C0D0E0F"}
|
||||
|
||||
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
|
||||
card_key_provider_register(CardKeyProviderCsv(csv_file_path, column_keys))
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def test_card_key_provider_get(self):
|
||||
test_data = [{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||
'KI': '48a6d5f60567d45299e3ba08594009e7', 'ADM1': '10101010',
|
||||
'ADM2': '9999999999999999', 'KIC1': '3eb8567fa0b4b1e63bcab13bff5f2702',
|
||||
'KIC2': 'fd6c173a5b3f04b563808da24237fb46',
|
||||
'KIC3': '66c8c848e5dff69d70689d155d44f323',
|
||||
'KID1': 'd78accce870332dced467c173244dd94',
|
||||
'KID2': 'b3bf050969747b2d2c9389e127a3d791',
|
||||
'KID3': '40a77deb50d260b3041bbde1b5040625',
|
||||
'KIK1': '451b503239d818ea34421aa9c2a8887a',
|
||||
'KIK2': '967716f5fca8ae179f87f76524d1ae6b',
|
||||
'KIK3': '0884db5eee5409a00fc1bbc57ac52541',
|
||||
'OPC': '81817574c1961dd272ad080eb2caf279'}, 'ICCID' :"8988211000000000001"},
|
||||
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||
'KI': 'e94d7fa6fb92375dae86744ff6ecef49', 'ADM1': '10101010',
|
||||
'ADM2': '9999999999999999', 'KIC1': '79b4e39387c66253da68f653381ded44',
|
||||
'KIC2': '560561b5dba89c1da8d1920049e5e4f7',
|
||||
'KIC3': '79ff35e84e39305a119af8c79f84e8e5',
|
||||
'KID1': '233baf89122159553d67545ecedcf8e0',
|
||||
'KID2': '8fc2874164d7a8e40d72c968bc894ab8',
|
||||
'KID3': '2e3320f0dda85054d261be920fbfa065',
|
||||
'KIK1': 'd51b1b17630103d1672a3e9e0e4827ed',
|
||||
'KIK2': 'd01edbc48be555139506b0d7982bf7ff',
|
||||
'KIK3': 'a6487a5170849e8e0a03026afea91f5a',
|
||||
'OPC': '6b0d19ef28bd12f2daac31828d426939'}, 'ICCID' :"8988211000000000002"},
|
||||
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||
'KI': '3cdec1552ef433a89f327905213c5a6e', 'ADM1': '10101010',
|
||||
'ADM2': '9999999999999999', 'KIC1': '72986b13ce505e12653ad42df5cfca13',
|
||||
'KIC2': '8f0d1e58b01e833773e5562c4940674d',
|
||||
'KIC3': '9c72ba5a14d54f489edbffd3d8802f03',
|
||||
'KID1': 'd23a42995df9ca83f74b2cfd22695526',
|
||||
'KID2': '5c3a189d12aa1ac6614883d7de5e6c8c',
|
||||
'KID3': 'a6ace0d303a2b38a96b418ab83c16725',
|
||||
'KIK1': 'bf2319467d859c12527aa598430caef2',
|
||||
'KIK2': '6a4c459934bea7e40787976b8881ab01',
|
||||
'KIK3': '91cd02c38b5f68a98cc90a1f2299538f',
|
||||
'OPC': '6df46814b1697daca003da23808bbbc3'}, 'ICCID' :"8988211000000000003"}]
|
||||
|
||||
for t in test_data:
|
||||
result = card_key_provider_get(["PIN1","PUK1","PIN2","PUK2","KI","ADM1","ADM2","KIC1",
|
||||
"KIC2","KIC3","KID1","KID2","KID3","KIK1","KIK2","KIK3","OPC"],
|
||||
"ICCID", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
result = card_key_provider_get(["PIN1","puk1","PIN2","PUK2","KI","adm1","ADM2","KIC1",
|
||||
"KIC2","kic3","KID1","KID2","KID3","kik1","KIK2","KIK3","OPC"],
|
||||
"iccid", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
|
||||
def test_card_key_provider_get_field(self):
|
||||
test_data = [{'EXPECTED' : "3eb8567fa0b4b1e63bcab13bff5f2702", 'ICCID' :"8988211000000000001"},
|
||||
{'EXPECTED' : "79b4e39387c66253da68f653381ded44", 'ICCID' :"8988211000000000002"},
|
||||
{'EXPECTED' : "72986b13ce505e12653ad42df5cfca13", 'ICCID' :"8988211000000000003"}]
|
||||
|
||||
for t in test_data:
|
||||
result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
for t in test_data:
|
||||
result = card_key_provider_get_field("kic1", "iccid", t.get('ICCID'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
|
||||
class TestCardKeyFieldCryptor(unittest.TestCase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
|
||||
"OPC" : "000102030405065545645645645D0E0F",
|
||||
"KIC1" : "06410203546406456456450B0C0D0E0F",
|
||||
"UICC_SCP03" : "00040267840507667609045645645E0F"}
|
||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def test_encrypt_field(self):
|
||||
test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "OPC"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIC1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KID1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIK1"},
|
||||
{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "opc"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kic1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kid1"},
|
||||
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kik1"}]
|
||||
|
||||
for t in test_data:
|
||||
result = self.crypt.encrypt_field(t.get('FIELDNAME'), t.get('PLAINTEXT_VAL'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
def test_decrypt_field(self):
|
||||
test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "OPC"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIC1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KID1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIK1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "opc"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kic1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kid1"},
|
||||
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kik1"}]
|
||||
|
||||
for t in test_data:
|
||||
result = self.crypt.decrypt_field(t.get('FIELDNAME'), t.get('ENCRYPTED_VAL'))
|
||||
self.assertEqual(result, t.get('EXPECTED'))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,399 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: Neels Hofmeyr
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import io
|
||||
import sys
|
||||
import unittest
|
||||
import io
|
||||
from importlib import resources
|
||||
from osmocom.utils import hexstr
|
||||
from pySim.esim.saip import ProfileElementSequence
|
||||
import pySim.esim.saip.personalization as p13n
|
||||
import smdpp_data.upp
|
||||
|
||||
import xo
|
||||
update_expected_output = False
|
||||
|
||||
def valstr(val):
|
||||
if isinstance(val, io.BytesIO):
|
||||
val = val.getvalue()
|
||||
if isinstance(val, bytearray):
|
||||
val = bytes(val)
|
||||
return f'{val!r}'
|
||||
|
||||
def valtypestr(val):
|
||||
if isinstance(val, dict):
|
||||
types = []
|
||||
for v in val.values():
|
||||
types.append(f'{type(v).__name__}')
|
||||
|
||||
val_type = '{' + ', '.join(types) + '}'
|
||||
else:
|
||||
val_type = f'{type(val).__name__}'
|
||||
return f'{valstr(val)}:{val_type}'
|
||||
|
||||
class D:
|
||||
mandatory = set()
|
||||
optional = set()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
||||
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
||||
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
for k in self.optional:
|
||||
if not hasattr(self, k):
|
||||
setattr(self, k, None)
|
||||
|
||||
class ConfigurableParameterTest(unittest.TestCase):
|
||||
|
||||
def test_parameters(self):
|
||||
|
||||
upp_fnames = (
|
||||
'TS48v5_SAIP2.1A_NoBERTLV.der',
|
||||
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
|
||||
'TS48v5_SAIP2.1B_NoBERTLV.der',
|
||||
'TS48v5_SAIP2.3_NoBERTLV.der',
|
||||
)
|
||||
|
||||
class Paramtest(D):
|
||||
mandatory = (
|
||||
'param_cls',
|
||||
'val',
|
||||
'expect_val',
|
||||
)
|
||||
optional = (
|
||||
'expect_clean_val',
|
||||
)
|
||||
|
||||
param_tests = [
|
||||
Paramtest(param_cls=p13n.Imsi, val='123456',
|
||||
expect_clean_val=str('123456'),
|
||||
expect_val={'IMSI': hexstr('123456'),
|
||||
'IMSI-ACC': '0040'}),
|
||||
Paramtest(param_cls=p13n.Imsi, val=int(123456),
|
||||
expect_val={'IMSI': hexstr('123456'),
|
||||
'IMSI-ACC': '0040'}),
|
||||
|
||||
Paramtest(param_cls=p13n.Imsi, val='123456789012345',
|
||||
expect_clean_val=str('123456789012345'),
|
||||
expect_val={'IMSI': hexstr('123456789012345'),
|
||||
'IMSI-ACC': '0020'}),
|
||||
Paramtest(param_cls=p13n.Imsi, val=int(123456789012345),
|
||||
expect_val={'IMSI': hexstr('123456789012345'),
|
||||
'IMSI-ACC': '0020'}),
|
||||
|
||||
Paramtest(param_cls=p13n.Puk1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Puk1,
|
||||
val=int(12345678),
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Puk2,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='1234',
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='123456',
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(1234),
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(123456),
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(12345678),
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='1234',
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='123456',
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val=int(123456),
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='Milenage',
|
||||
expect_clean_val=1,
|
||||
expect_val='Milenage'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='TUAK',
|
||||
expect_clean_val=2,
|
||||
expect_val='TUAK'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='usim-test',
|
||||
expect_clean_val=3,
|
||||
expect_val='usim-test'),
|
||||
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=1,
|
||||
expect_clean_val=1,
|
||||
expect_val='Milenage'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=2,
|
||||
expect_clean_val=2,
|
||||
expect_val='TUAK'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=3,
|
||||
expect_clean_val=3,
|
||||
expect_val='usim-test'),
|
||||
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=int(11020304050607080910111213141516),
|
||||
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='11020304050607080910111213141516'),
|
||||
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
]
|
||||
|
||||
for sdkey_cls in (
|
||||
# thin out the number of tests, as a compromise between completeness and test runtime
|
||||
p13n.SdKeyScp80Kvn01Enc,
|
||||
#p13n.SdKeyScp80Kvn01Dek,
|
||||
#p13n.SdKeyScp80Kvn01Mac,
|
||||
#p13n.SdKeyScp80Kvn02Enc,
|
||||
p13n.SdKeyScp80Kvn02Dek,
|
||||
#p13n.SdKeyScp80Kvn02Mac,
|
||||
#p13n.SdKeyScp81Kvn81Enc,
|
||||
#p13n.SdKeyScp81Kvn81Dek,
|
||||
p13n.SdKeyScp81Kvn81Mac,
|
||||
#p13n.SdKeyScp81Kvn82Enc,
|
||||
#p13n.SdKeyScp81Kvn82Dek,
|
||||
#p13n.SdKeyScp81Kvn82Mac,
|
||||
p13n.SdKeyScp81Kvn83Enc,
|
||||
#p13n.SdKeyScp81Kvn83Dek,
|
||||
#p13n.SdKeyScp81Kvn83Mac,
|
||||
#p13n.SdKeyScp02Kvn20Enc,
|
||||
p13n.SdKeyScp02Kvn20Dek,
|
||||
#p13n.SdKeyScp02Kvn20Mac,
|
||||
#p13n.SdKeyScp02Kvn21Enc,
|
||||
#p13n.SdKeyScp02Kvn21Dek,
|
||||
p13n.SdKeyScp02Kvn21Mac,
|
||||
#p13n.SdKeyScp02Kvn22Enc,
|
||||
#p13n.SdKeyScp02Kvn22Dek,
|
||||
#p13n.SdKeyScp02Kvn22Mac,
|
||||
p13n.SdKeyScp02KvnffEnc,
|
||||
#p13n.SdKeyScp02KvnffDek,
|
||||
#p13n.SdKeyScp02KvnffMac,
|
||||
#p13n.SdKeyScp03Kvn30Enc,
|
||||
p13n.SdKeyScp03Kvn30Dek,
|
||||
#p13n.SdKeyScp03Kvn30Mac,
|
||||
#p13n.SdKeyScp03Kvn31Enc,
|
||||
#p13n.SdKeyScp03Kvn31Dek,
|
||||
p13n.SdKeyScp03Kvn31Mac,
|
||||
#p13n.SdKeyScp03Kvn32Enc,
|
||||
#p13n.SdKeyScp03Kvn32Dek,
|
||||
#p13n.SdKeyScp03Kvn32Mac,
|
||||
):
|
||||
|
||||
param_tests.extend([
|
||||
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val='010203040506070809101112131415161718192021222324',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
||||
b'\x17\x18\x19\x20\x21\x22\x23\x24',
|
||||
expect_val='010203040506070809101112131415161718192021222324'),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val='0102030405060708091011121314151617181920212223242526272829303132',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
||||
b'\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31\x32',
|
||||
expect_val='0102030405060708091011121314151617181920212223242526272829303132'),
|
||||
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=11020304050607080910111213141516,
|
||||
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='11020304050607080910111213141516',
|
||||
),
|
||||
])
|
||||
|
||||
outputs = []
|
||||
|
||||
for upp_fname in upp_fnames:
|
||||
test_idx = -1
|
||||
try:
|
||||
|
||||
der = resources.read_binary(smdpp_data.upp, upp_fname)
|
||||
|
||||
for t in param_tests:
|
||||
test_idx += 1
|
||||
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
|
||||
|
||||
param = None
|
||||
try:
|
||||
param = t.param_cls()
|
||||
param.input_value = t.val
|
||||
param.validate()
|
||||
except ValueError as e:
|
||||
raise ValueError(f'{logloc}: {e}') from e
|
||||
|
||||
clean_val = param.value
|
||||
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
|
||||
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
|
||||
raise ValueError(f'{logloc}: expected'
|
||||
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
|
||||
|
||||
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
|
||||
# pes = copy.deepcopy(orig_pes)
|
||||
pes = ProfileElementSequence.from_der(der)
|
||||
try:
|
||||
param.apply(pes)
|
||||
except ValueError as e:
|
||||
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
|
||||
|
||||
changed_der = pes.to_der()
|
||||
|
||||
pes2 = ProfileElementSequence.from_der(changed_der)
|
||||
|
||||
read_back_val = t.param_cls.get_value_from_pes(pes2)
|
||||
|
||||
# compose log string to show the precise type of dict values
|
||||
if isinstance(read_back_val, dict):
|
||||
types = set()
|
||||
for v in read_back_val.values():
|
||||
types.add(f'{type(v).__name__}')
|
||||
|
||||
read_back_val_type = '{' + ', '.join(types) + '}'
|
||||
else:
|
||||
read_back_val_type = f'{type(read_back_val).__name__}'
|
||||
|
||||
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
|
||||
|
||||
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
|
||||
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
|
||||
|
||||
expect_val = t.expect_val
|
||||
if not isinstance(expect_val, dict):
|
||||
expect_val = { t.param_cls.get_name(): expect_val }
|
||||
if read_back_val != expect_val:
|
||||
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
|
||||
|
||||
ok = logloc.replace(' clean_val', '\n\tclean_val'
|
||||
).replace(' read_back_val', '\n\tread_back_val'
|
||||
).replace('=', '=\t'
|
||||
)
|
||||
output = f'\nok: {ok}'
|
||||
outputs.append(output)
|
||||
print(output)
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f'Error while testing UPP {upp_fname} {test_idx=}: {e}') from e
|
||||
|
||||
output = '\n'.join(outputs) + '\n'
|
||||
xo_name = 'test_configurable_parameters'
|
||||
if update_expected_output:
|
||||
with resources.path(xo, xo_name) as xo_path:
|
||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
else:
|
||||
xo_str = resources.read_text(xo, xo_name)
|
||||
if xo_str != output:
|
||||
at = 0
|
||||
while at < len(output):
|
||||
if output[at] == xo_str[at]:
|
||||
at += 1
|
||||
continue
|
||||
break
|
||||
|
||||
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if '-u' in sys.argv:
|
||||
update_expected_output = True
|
||||
sys.argv.remove('-u')
|
||||
unittest.main()
|
||||
@@ -63,44 +63,6 @@ class SaipTest(unittest.TestCase):
|
||||
# TODO: we don't actually test the results here, but we just verify there is no exception
|
||||
pes.to_der()
|
||||
|
||||
def test_personalization2(self):
|
||||
"""Test some of the personalization operations."""
|
||||
pes = ProfileElementSequence.from_der(self.per_input)
|
||||
prev_val = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{prev_val=}')
|
||||
self.assertTrue(prev_val)
|
||||
|
||||
set_val = '42342342342342342342342342342342'
|
||||
param = SdKeyScp80_01Kic(set_val)
|
||||
param.validate()
|
||||
param.apply(pes)
|
||||
|
||||
get_val1 = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{get_val1=} {set_val=}')
|
||||
self.assertEqual(get_val1, set((set_val,)))
|
||||
|
||||
get_val1b = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{get_val1b=} {set_val=}')
|
||||
self.assertEqual(get_val1b, set((set_val,)))
|
||||
|
||||
print("HELLOO")
|
||||
der = pes.to_der()
|
||||
print("DONEDONE")
|
||||
|
||||
get_val1c = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{get_val1c=} {set_val=}')
|
||||
self.assertEqual(get_val1c, set((set_val,)))
|
||||
|
||||
# assertTrue to not dump the entire der.
|
||||
# Expecting the modified DER to be different. If this assertion fails, then no change has happened in the output
|
||||
# DER and the ConfigurableParameter subclass is buggy.
|
||||
self.assertTrue(der != self.per_input)
|
||||
|
||||
pes2 = ProfileElementSequence.from_der(der)
|
||||
get_val2 = set(SdKeyScp80_01Kic.get_values_from_pes(pes2))
|
||||
print(f'{get_val2=} {set_val=}')
|
||||
self.assertEqual(get_val2, set((set_val,)))
|
||||
|
||||
def test_constructor_encode(self):
|
||||
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
|
||||
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
|
||||
|
||||
@@ -1,121 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import unittest
|
||||
import logging
|
||||
from pySim.log import PySimLogger
|
||||
import io
|
||||
import sys
|
||||
from inspect import currentframe, getframeinfo
|
||||
|
||||
log = PySimLogger.get("TEST")
|
||||
|
||||
TEST_MSG_DEBUG = "this is a debug message"
|
||||
TEST_MSG_INFO = "this is an info message"
|
||||
TEST_MSG_WARNING = "this is a warning message"
|
||||
TEST_MSG_ERROR = "this is an error message"
|
||||
TEST_MSG_CRITICAL = "this is a critical message"
|
||||
|
||||
expected_message = None
|
||||
|
||||
class PySimLogger_Test(unittest.TestCase):
|
||||
|
||||
def __test_01_safe_defaults_one(self, callback, message:str):
|
||||
# When log messages are sent to an unconfigured PySimLogger class, we expect the unmodified message being
|
||||
# logged to stdout, just as if it were printed via a normal print() statement.
|
||||
log_output = io.StringIO()
|
||||
sys.stdout = log_output
|
||||
callback(message)
|
||||
assert(log_output.getvalue().strip() == message)
|
||||
sys.stdout = sys.__stdout__
|
||||
|
||||
def test_01_safe_defaults(self):
|
||||
# When log messages are sent to an unconfigured PySimLogger class, we expect that all messages are logged,
|
||||
# regardless of the logging level.
|
||||
self.__test_01_safe_defaults_one(log.debug, TEST_MSG_DEBUG)
|
||||
self.__test_01_safe_defaults_one(log.info, TEST_MSG_INFO)
|
||||
self.__test_01_safe_defaults_one(log.warning, TEST_MSG_WARNING)
|
||||
self.__test_01_safe_defaults_one(log.error, TEST_MSG_ERROR)
|
||||
self.__test_01_safe_defaults_one(log.critical, TEST_MSG_CRITICAL)
|
||||
|
||||
@staticmethod
|
||||
def _test_print_callback(message):
|
||||
assert(message.strip() == expected_message)
|
||||
|
||||
def test_02_normal(self):
|
||||
# When the PySimLogger is set up with its default values, we expect formatted log messages on all logging
|
||||
# levels.
|
||||
global expected_message
|
||||
PySimLogger.setup(self._test_print_callback)
|
||||
expected_message = "DEBUG: " + TEST_MSG_DEBUG
|
||||
log.debug(TEST_MSG_DEBUG)
|
||||
expected_message = "INFO: " + TEST_MSG_INFO
|
||||
log.info(TEST_MSG_INFO)
|
||||
expected_message = "WARNING: " + TEST_MSG_WARNING
|
||||
log.warning(TEST_MSG_WARNING)
|
||||
expected_message = "ERROR: " + TEST_MSG_ERROR
|
||||
log.error(TEST_MSG_ERROR)
|
||||
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
|
||||
log.critical(TEST_MSG_CRITICAL)
|
||||
|
||||
def test_03_verbose(self):
|
||||
# When the PySimLogger is set up with its default values, we expect verbose formatted log messages on all
|
||||
# logging levels.
|
||||
global expected_message
|
||||
PySimLogger.setup(self._test_print_callback)
|
||||
PySimLogger.set_verbose(True)
|
||||
frame = currentframe()
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
|
||||
log.debug(TEST_MSG_DEBUG)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
|
||||
log.info(TEST_MSG_INFO)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
|
||||
log.warning(TEST_MSG_WARNING)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
|
||||
log.error(TEST_MSG_ERROR)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
|
||||
log.critical(TEST_MSG_CRITICAL)
|
||||
|
||||
def test_04_level(self):
|
||||
# When the PySimLogger is set up with its default values, we expect formatted log messages but since we will
|
||||
# limit the log level to INFO, we should not see any messages of level DEBUG
|
||||
global expected_message
|
||||
PySimLogger.setup(self._test_print_callback)
|
||||
PySimLogger.set_level(logging.INFO)
|
||||
|
||||
# We test this in non verbose mode, this will also confirm that disabeling the verbose mode works.
|
||||
PySimLogger.set_verbose(False)
|
||||
|
||||
# Debug messages should not appear
|
||||
expected_message = None
|
||||
log.debug(TEST_MSG_DEBUG)
|
||||
|
||||
# All other messages should appear normally
|
||||
expected_message = "INFO: " + TEST_MSG_INFO
|
||||
log.info(TEST_MSG_INFO)
|
||||
expected_message = "WARNING: " + TEST_MSG_WARNING
|
||||
log.warning(TEST_MSG_WARNING)
|
||||
expected_message = "ERROR: " + TEST_MSG_ERROR
|
||||
log.error(TEST_MSG_ERROR)
|
||||
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
|
||||
log.critical(TEST_MSG_CRITICAL)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,216 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: Neels Hofmeyr
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import math
|
||||
from importlib import resources
|
||||
import unittest
|
||||
from pySim.esim.saip import param_source
|
||||
|
||||
import xo
|
||||
update_expected_output = False
|
||||
|
||||
class D:
|
||||
mandatory = set()
|
||||
optional = set()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
||||
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
||||
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
for k in self.optional:
|
||||
if not hasattr(self, k):
|
||||
setattr(self, k, None)
|
||||
|
||||
decimals = '0123456789'
|
||||
hexadecimals = '0123456789abcdefABCDEF'
|
||||
|
||||
class FakeRandom:
|
||||
vals = b'\xab\xcfm\xf0\x98J_\xcf\x96\x87fp5l\xe7f\xd1\xd6\x97\xc1\xf9]\x8c\x86+\xdb\t^ke\xc1r'
|
||||
i = 0
|
||||
|
||||
@classmethod
|
||||
def next(cls):
|
||||
cls.i = (cls.i + 1) % len(cls.vals)
|
||||
return cls.vals[cls.i]
|
||||
|
||||
@staticmethod
|
||||
def randint(a, b):
|
||||
d = b - a
|
||||
n_bytes = math.ceil(math.log(d, 2))
|
||||
r = int.from_bytes( bytes(FakeRandom.next() for i in range(n_bytes)) )
|
||||
return a + (r % (b - a))
|
||||
|
||||
@staticmethod
|
||||
def randbytes(n):
|
||||
return bytes(FakeRandom.next() for i in range(n))
|
||||
|
||||
|
||||
class ParamSourceTest(unittest.TestCase):
|
||||
|
||||
def test_param_source(self):
|
||||
|
||||
class ParamSourceTest(D):
|
||||
mandatory = (
|
||||
'param_source',
|
||||
'n',
|
||||
'expect',
|
||||
)
|
||||
optional = (
|
||||
'expect_arg',
|
||||
'csv_rows',
|
||||
)
|
||||
|
||||
def expect_const(t, vals):
|
||||
return tuple(t.expect_arg) == tuple(vals)
|
||||
|
||||
def expect_random(t, vals):
|
||||
chars = t.expect_arg.get('digits')
|
||||
repetitions = (t.n - len(set(vals)))
|
||||
if repetitions:
|
||||
raise RuntimeError(f'expect_random: there are {repetitions} repetitions in the returned values: {vals}')
|
||||
for val_i in range(len(vals)):
|
||||
v = vals[val_i]
|
||||
val_minlen = t.expect_arg.get('val_minlen')
|
||||
val_maxlen = t.expect_arg.get('val_maxlen')
|
||||
if len(v) < val_minlen or len(v) > val_maxlen:
|
||||
raise RuntimeError(f'expect_random: invalid length {len(v)} for value [{val_i}]: {v!r}, expecting'
|
||||
f' {val_minlen}..{val_maxlen}')
|
||||
|
||||
if chars is not None and not all(c in chars for c in v):
|
||||
raise RuntimeError(f'expect_random: invalid char in value [{val_i}]: {v!r}')
|
||||
return True
|
||||
|
||||
param_source_tests = [
|
||||
ParamSourceTest(param_source=param_source.ConstantSource.from_str('123'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('123', '123', '123')
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('12345'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 5,
|
||||
'val_maxlen': 5,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('1..999'),
|
||||
n=10,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 1,
|
||||
'val_maxlen': 3,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('001..999'),
|
||||
n=10,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 3,
|
||||
'val_maxlen': 3,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.IncDigitSource.from_str('10001'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('10001', '10002', '10003')
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.CsvSource('column_name'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('first val', 'second val', 'third val'),
|
||||
csv_rows=(
|
||||
{'column_name': 'first val',},
|
||||
{'column_name': 'second val',},
|
||||
{'column_name': 'third val',},
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
outputs = []
|
||||
|
||||
for t in param_source_tests:
|
||||
try:
|
||||
if hasattr(t.param_source, 'random_impl'):
|
||||
t.param_source.random_impl = FakeRandom
|
||||
|
||||
vals = []
|
||||
for i in range(t.n):
|
||||
csv_row = None
|
||||
if t.csv_rows is not None:
|
||||
csv_row = t.csv_rows[i]
|
||||
vals.append( t.param_source.get_next(csv_row=csv_row) )
|
||||
if not t.expect(t, vals):
|
||||
raise RuntimeError(f'invalid values returned: returned {vals}')
|
||||
output = f'ok: {t.param_source.__class__.__name__} {vals=!r}'
|
||||
outputs.append(output)
|
||||
print(output)
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f'{t.param_source.__class__.__name__} {t.n=} {t.expect.__name__}({t.expect_arg!r}): {e}') from e
|
||||
|
||||
output = '\n'.join(outputs) + '\n'
|
||||
xo_name = 'test_param_src'
|
||||
if update_expected_output:
|
||||
with resources.path(xo, xo_name) as xo_path:
|
||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
else:
|
||||
xo_str = resources.read_text(xo, xo_name)
|
||||
if xo_str != output:
|
||||
at = 0
|
||||
while at < len(output):
|
||||
if output[at] == xo_str[at]:
|
||||
at += 1
|
||||
continue
|
||||
break
|
||||
|
||||
raise RuntimeError(f'output differs from expected output at position {at}: {xo_str[at:at+128]!r}')
|
||||
|
||||
if __name__ == "__main__":
|
||||
if '-u' in sys.argv:
|
||||
update_expected_output = True
|
||||
sys.argv.remove('-u')
|
||||
unittest.main()
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,9 +0,0 @@
|
||||
ok: ConstantSource vals=['123', '123', '123']
|
||||
ok: RandomDigitSource vals=['13987', '49298', '55670']
|
||||
ok: RandomDigitSource vals=['650', '580', '49', '885', '497', '195', '320', '137', '245', '663']
|
||||
ok: RandomDigitSource vals=['638', '025', '232', '779', '826', '972', '650', '580', '049', '885']
|
||||
ok: RandomHexDigitSource vals=['6b65c172', 'abcf6df0', '984a5fcf']
|
||||
ok: RandomHexDigitSource vals=['96876670', '356ce766', 'd1d697c1']
|
||||
ok: RandomHexDigitSource vals=['f95d8c86', '2bdb095e', '6b65c172']
|
||||
ok: IncDigitSource vals=['10001', '10002', '10003']
|
||||
ok: CsvSource vals=['first val', 'second val', 'third val']
|
||||
Reference in New Issue
Block a user