Compare commits

..

2 Commits

Author SHA1 Message Date
Philipp Maier
a3469bc03b card_key_provider: add PostgreSQL support
The Card Key Provider currently only has support for CSV files
as input. Unfortunately using CSV files does not scale very well
when the card inventory is very large and continously updated.
In this case a centralized storage in the form of a database
is the more suitable approach.

This patch adds PostgreSQL support next to the existing CSV
file support. It also adds an importer tool to import existing
CSV files into the database.

Change-Id: Icba625c02a60d7e1f519b506a46bda5ded0537d3
Related: SYS#7725
2025-12-18 15:59:31 +01:00
Philipp Maier
c118012fb9 pysim/log: also accept ANSI strings to specify the log message colors
the PySimLogger class currently only accepts cmd2 color enum values.
This is what we need for pySim-shell.py. However, in case we want to
use the PySimLogger in stand-alone programs that do not use cmd2, this
is a bit bulky. Let's add some flexibility to PySimLogger, so that we
can specify the colors as raw ANSI strings as well.

Change-Id: I93543e19649064043ae8323f82ecd8c423d1d921
Related: SYS#7725
2025-12-18 15:59:01 +01:00
76 changed files with 386 additions and 6276 deletions

2
.gitignore vendored
View File

@@ -1,5 +1,5 @@
*.pyc *.pyc
.*.sw? .*.swp
/docs/_* /docs/_*
/docs/generated /docs/generated

View File

@@ -100,6 +100,7 @@ Please install the following dependencies:
- pyyaml >= 5.1 - pyyaml >= 5.1
- smpp.pdu (from `github.com/hologram-io/smpp.pdu`) - smpp.pdu (from `github.com/hologram-io/smpp.pdu`)
- termcolor - termcolor
- psycopg2-binary
Example for Debian: Example for Debian:
```sh ```sh

View File

@@ -1,112 +0,0 @@
#!/usr/bin/env python3
# A tool to analyze the eUICC simaResponse (series of EUICCResponse)
#
# (C) 2025 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
from osmocom.utils import h2b, b2h
from osmocom.tlv import bertlv_parse_one, bertlv_encode_tag, bertlv_encode_len
from pySim.esim.saip import *
parser = argparse.ArgumentParser(description="""Utility program to analyze the contents of an eUICC simaResponse.""")
parser.add_argument('SIMA_RESPONSE', help='Hexstring containing the simaResponse as received from the eUICC')
def split_sima_response(sima_response):
"""split an eUICC simaResponse field into a list of EUICCResponse fields"""
remainder = sima_response
result = []
while len(remainder):
tdict, l, v, next_remainder = bertlv_parse_one(remainder)
rawtag = bertlv_encode_tag(tdict)
rawlen = bertlv_encode_len(l)
result = result + [remainder[0:len(rawtag) + len(rawlen) + l]]
remainder = next_remainder
return result
def analyze_status(status):
"""
Convert a status code (integer) into a human readable string
(see eUICC Profile Package: Interoperable Format Technical Specification, section 8.11)
"""
# SIMA status codes
string_values = {0 : 'ok',
1 : 'pe-not-supported',
2 : 'memory-failure',
3 : 'bad-values',
4 : 'not-enough-memory',
5 : 'invalid-request-format',
6 : 'invalid-parameter',
7 : 'runtime-not-supported',
8 : 'lib-not-supported',
9 : 'template-not-supported ',
10 : 'feature-not-supported',
11 : 'pin-code-missing',
31 : 'unsupported-profile-version'}
string_value = string_values.get(status, None)
if string_value is not None:
return "%d = %s (SIMA status code)" % (status, string_value)
# ISO 7816 status words
if status >= 24576 and status <= 28671:
return "%d = %04x (ISO7816 status word)" % (status, status)
elif status >= 36864 and status <= 40959:
return "%d = %04x (ISO7816 status word)" % (status, status)
# Proprietary status codes
elif status >= 40960 and status <= 65535:
return "%d = %04x (proprietary)" % (status, status)
# Unknown status codes
return "%d (unknown, proprietary?)" % status
def analyze_euicc_response(euicc_response):
"""Analyze and display the contents of an EUICCResponse"""
print(" EUICCResponse: %s" % b2h(euicc_response))
euicc_response_decoded = asn1.decode('EUICCResponse', euicc_response)
pe_status = euicc_response_decoded.get('peStatus')
print(" peStatus:")
for s in pe_status:
print(" status: %s" % analyze_status(s.get('status')))
print(" identification: %s" % str(s.get('identification', None)))
print(" additional-information: %s" % str(s.get('additional-information', None)))
print(" offset: %s" % str(s.get('offset', None)))
if euicc_response_decoded.get('profileInstallationAborted', False) is None:
# This type is defined as profileInstallationAborted NULL OPTIONAL, so when it is present it
# will have the value None, otherwise it is simply not present.
print(" profileInstallationAborted: True")
else:
print(" profileInstallationAborted: False")
status_message = euicc_response_decoded.get('statusMessage', None)
print(" statusMessage: %s" % str(status_message))
if __name__ == '__main__':
opts = parser.parse_args()
sima_response = h2b(opts.SIMA_RESPONSE);
print("simaResponse: %s" % b2h(sima_response))
euicc_response_list = split_sima_response(sima_response)
for euicc_response in euicc_response_list:
analyze_euicc_response(euicc_response)

View File

@@ -1,28 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse import argparse
import logging import logging
import csv import csv
import sys import sys
import os
import yaml import yaml
import psycopg2 import psycopg2
from psycopg2.sql import Identifier, SQL from psycopg2.sql import Identifier, SQL
@@ -30,7 +11,7 @@ from pathlib import Path
from pySim.log import PySimLogger from pySim.log import PySimLogger
from packaging import version from packaging import version
log = PySimLogger.get(Path(__file__).stem) log = PySimLogger.get("CSV2PGQSL")
class CardKeyDatabase: class CardKeyDatabase:
def __init__(self, config_filename: str, table_name: str, create_table: bool = False, admin: bool = False): def __init__(self, config_filename: str, table_name: str, create_table: bool = False, admin: bool = False):
@@ -53,7 +34,8 @@ class CardKeyDatabase:
raise ValueError("user for role '%s' not set up in config file." % role) raise ValueError("user for role '%s' not set up in config file." % role)
return user.get('name'), user.get('pass') return user.get('name'), user.get('pass')
self.table = table_name.lower() log = PySimLogger.get("PQSQL")
self.table = table_name
self.cols = None self.cols = None
# Depending on the table type, the table name must contain either the substring "uicc_keys" or "euicc_keys". # Depending on the table type, the table name must contain either the substring "uicc_keys" or "euicc_keys".
@@ -113,7 +95,7 @@ class CardKeyDatabase:
""" """
# Create table columns with primary key # Create table columns with primary key
query = SQL("CREATE TABLE {} ({} VARCHAR PRIMARY KEY").format(Identifier(self.table), query = SQL("CREATE TABLE {} ({} VARCHAR PRIMARY KEY").format(Identifier(self.table.lower()),
Identifier(cols[0].lower())) Identifier(cols[0].lower()))
for c in cols[1:]: for c in cols[1:]:
query += SQL(", {} VARCHAR").format(Identifier(c.lower())) query += SQL(", {} VARCHAR").format(Identifier(c.lower()))
@@ -123,16 +105,16 @@ class CardKeyDatabase:
# Create indexes for all other columns # Create indexes for all other columns
for c in cols[1:]: for c in cols[1:]:
self.cur.execute(query = SQL("CREATE INDEX {} ON {}({});").format(Identifier(c.lower()), self.cur.execute(query = SQL("CREATE INDEX {} ON {}({});").format(Identifier(c.lower()),
Identifier(self.table), Identifier(self.table.lower()),
Identifier(c.lower()))) Identifier(c.lower())))
# Set permissions # Set permissions
self.cur.execute(SQL("GRANT INSERT ON {} TO {};").format(Identifier(self.table), self.cur.execute(SQL("GRANT INSERT ON {} TO {};").format(Identifier(self.table.lower()),
Identifier(user_importer))) Identifier(user_importer)))
self.cur.execute(SQL("GRANT SELECT ON {} TO {};").format(Identifier(self.table), self.cur.execute(SQL("GRANT SELECT ON {} TO {};").format(Identifier(self.table.lower()),
Identifier(user_reader))) Identifier(user_reader)))
log.info("New database table created: %s", self.table) log.info("New database table created: %s", str(self.table.lower()))
def get_cols(self) -> list[str]: def get_cols(self) -> list[str]:
""" """
@@ -147,7 +129,7 @@ class CardKeyDatabase:
return self.cols return self.cols
# Request a list of current cols from the database # Request a list of current cols from the database
self.cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (self.table,)) self.cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (self.table.lower(),))
cols_result = self.cur.fetchall() cols_result = self.cur.fetchall()
cols = [] cols = []
@@ -190,7 +172,7 @@ class CardKeyDatabase:
# Add the missing columns to the table # Add the missing columns to the table
self.cols = None self.cols = None
for c in cols_missing: for c in cols_missing:
self.cur.execute(query = SQL("ALTER TABLE {} ADD {} VARCHAR;").format(Identifier(self.table), self.cur.execute(query = SQL("ALTER TABLE {} ADD {} VARCHAR;").format(Identifier(self.table.lower()),
Identifier(c.lower()))) Identifier(c.lower())))
def insert_row(self, row:dict[str, str]): def insert_row(self, row:dict[str, str]):
@@ -211,7 +193,7 @@ class CardKeyDatabase:
# Insert row into datbase table # Insert row into datbase table
row_keys = list(row.keys()) row_keys = list(row.keys())
row_values = list(row.values()) row_values = list(row.values())
query = SQL("INSERT INTO {} ").format(Identifier(self.table)) query = SQL("INSERT INTO {} ").format(Identifier(self.table.lower()))
query += SQL("({} ").format(Identifier(row_keys[0].lower())) query += SQL("({} ").format(Identifier(row_keys[0].lower()))
for k in row_keys[1:]: for k in row_keys[1:]:
query += SQL(", {}").format(Identifier(k.lower())) query += SQL(", {}").format(Identifier(k.lower()))
@@ -237,7 +219,7 @@ def open_csv(opts: argparse.Namespace):
def open_db(cr: csv.DictReader, opts: argparse.Namespace) -> CardKeyDatabase: def open_db(cr: csv.DictReader, opts: argparse.Namespace) -> CardKeyDatabase:
try: try:
db = CardKeyDatabase(os.path.expanduser(opts.pgsql), opts.table_name, opts.create_table, opts.admin) db = CardKeyDatabase(opts.pqsql, opts.table_name, opts.create_table, opts.admin)
# Check CSV format against table schema, add missing columns # Check CSV format against table schema, add missing columns
cols_missing = db.get_missing_cols(cr.fieldnames) cols_missing = db.get_missing_cols(cr.fieldnames)
@@ -275,8 +257,8 @@ if __name__ == '__main__':
option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider', option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider',
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False) option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
option_parser.add_argument('--pgsql', metavar='FILE', option_parser.add_argument('--pqsql', metavar='FILE',
default="~/.osmocom/pysim/card_data_pgsql.cfg", default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
help='Read card data from PostgreSQL database (config file)') help='Read card data from PostgreSQL database (config file)')
option_parser.add_argument('--csv', metavar='FILE', help='input CSV file with card data', required=True) option_parser.add_argument('--csv', metavar='FILE', help='input CSV file with card data', required=True)
option_parser.add_argument("--table-name", help="name of the card key table", type=str, required=True) option_parser.add_argument("--table-name", help="name of the card key table", type=str, required=True)

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import argparse
import logging
import json
import asn1tools
import asn1tools.codecs.ber
import asn1tools.codecs.der
import pySim.esim.rsp as rsp
import pySim.esim.saip as saip
from pySim.esim.es2p import param, Es2pApiServerMno, Es2pApiServerHandlerMno
from osmocom.utils import b2h
from datetime import datetime
from analyze_simaResponse import split_sima_response
from pathlib import Path
logger = logging.getLogger(Path(__file__).stem)
parser = argparse.ArgumentParser(description="""
Utility to receive and log requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
parser.add_argument("--host", help="Host/IP to bind HTTP(S) to", default="localhost")
parser.add_argument("--port", help="TCP port to bind HTTP(S) to", default=443, type=int)
parser.add_argument('--server-cert', help='X.509 server certificate used to provide the ES2+ HTTPs service')
parser.add_argument('--client-ca-cert', help='X.509 CA certificates to authenticate the requesting client(s)')
parser.add_argument("-v", "--verbose", help="enable debug output", action='store_true', default=False)
def decode_sima_response(sima_response):
decoded = []
euicc_response_list = split_sima_response(sima_response)
for euicc_response in euicc_response_list:
decoded.append(saip.asn1.decode('EUICCResponse', euicc_response))
return decoded
def decode_result_data(result_data):
return rsp.asn1.decode('PendingNotification', result_data)
def decode(data, path="/"):
if data is None:
return 'none'
elif type(data) is datetime:
return data.isoformat()
elif type(data) is tuple:
return {str(data[0]) : decode(data[1], path + str(data[0]) + "/")}
elif type(data) is list:
new_data = []
for item in data:
new_data.append(decode(item, path))
return new_data
elif type(data) is bytes:
return b2h(data)
elif type(data) is dict:
new_data = {}
for key, item in data.items():
new_key = str(key)
if path == '/' and new_key == 'resultData':
new_item = decode_result_data(item)
elif (path == '/resultData/profileInstallationResult/profileInstallationResultData/finalResult/successResult/' \
or path == '/resultData/profileInstallationResult/profileInstallationResultData/finalResult/errorResult/') \
and new_key == 'simaResponse':
new_item = decode_sima_response(item)
else:
new_item = item
new_data[new_key] = decode(new_item, path + new_key + "/")
return new_data
else:
return data
class Es2pApiServerHandlerForLogging(Es2pApiServerHandlerMno):
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
logging.info("ES2+:handleDownloadProgressInfo: %s" % json.dumps(decode(data)))
return {}, None
if __name__ == "__main__":
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
Es2pApiServerMno(args.port, args.host, Es2pApiServerHandlerForLogging(), args.server_cert, args.client_ca_cert)

View File

@@ -126,14 +126,14 @@ class Es9pClient:
if self.opts.iccid: if self.opts.iccid:
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid)) ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid))
if self.opts.operation == 'install': if self.opts.operation == 'download':
pird = { pird = {
'transactionId': h2b(self.opts.transaction_id), 'transactionId': self.opts.transaction_id,
'notificationMetadata': ntf_metadata, 'notificationMetadata': ntf_metadata,
'smdpOid': self.opts.smdpp_oid, 'smdpOid': self.opts.smdpp_oid,
'finalResult': ('successResult', { 'finalResult': ('successResult', {
'aid': h2b(self.opts.isdp_aid), 'aid': self.opts.isdp_aid,
'simaResponse': h2b(self.opts.sima_response), 'simaResponse': self.opts.sima_response,
}), }),
} }
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird) pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)

View File

@@ -42,9 +42,6 @@ case "$JOB_TYPE" in
# Run pySim-shell integration tests (requires physical cards) # Run pySim-shell integration tests (requires physical cards)
python3 -m unittest discover -v -s ./tests/pySim-shell_test/ python3 -m unittest discover -v -s ./tests/pySim-shell_test/
# Run pySim-smpp2sim test
tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
;; ;;
"distcheck") "distcheck")
virtualenv -p python3 venv --system-site-packages virtualenv -p python3 venv --system-site-packages

View File

@@ -107,7 +107,7 @@ parser_esrv.add_argument('--output-file', required=True, help='Output file name'
parser_esrv.add_argument('--add-flag', default=[], choices=esrv_flag_choices, action='append', help='Add flag to mandatory services list') parser_esrv.add_argument('--add-flag', default=[], choices=esrv_flag_choices, action='append', help='Add flag to mandatory services list')
parser_esrv.add_argument('--remove-flag', default=[], choices=esrv_flag_choices, action='append', help='Remove flag from mandatory services list') parser_esrv.add_argument('--remove-flag', default=[], choices=esrv_flag_choices, action='append', help='Remove flag from mandatory services list')
parser_tree = subparsers.add_parser('tree', help='Display the filesystem tree') parser_info = subparsers.add_parser('tree', help='Display the filesystem tree')
def write_pes(pes: ProfileElementSequence, output_file:str): def write_pes(pes: ProfileElementSequence, output_file:str):
"""write the PE sequence to a file""" """write the PE sequence to a file"""

View File

@@ -1,240 +0,0 @@
#!/usr/bin/env python3
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Harald Welte, Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import logging
import smpplib.gsm
import smpplib.client
import smpplib.consts
import time
from pySim.ota import OtaKeyset, OtaDialectSms, OtaAlgoCrypt, OtaAlgoAuth, CNTR_REQ, RC_CC_DS, POR_REQ
from pySim.utils import b2h, h2b, is_hexstr
from pathlib import Path
logger = logging.getLogger(Path(__file__).stem)
option_parser = argparse.ArgumentParser(description='Tool to send OTA SMS RFM/RAM messages via SMPP',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
option_parser.add_argument("--host", help="Host/IP of the SMPP server", default="localhost")
option_parser.add_argument("--port", help="TCP port of the SMPP server", default=2775, type=int)
option_parser.add_argument("--system-id", help="System ID to use to bind to the SMPP server", default="test")
option_parser.add_argument("--password", help="Password to use to bind to the SMPP server", default="test")
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
algo_crypt_choices = []
algo_crypt_classes = OtaAlgoCrypt.__subclasses__()
for cls in algo_crypt_classes:
algo_crypt_choices.append(cls.enum_name)
option_parser.add_argument("--algo-crypt", choices=algo_crypt_choices, default='triple_des_cbc2',
help="OTA crypt algorithm")
algo_auth_choices = []
algo_auth_classes = OtaAlgoAuth.__subclasses__()
for cls in algo_auth_classes:
algo_auth_choices.append(cls.enum_name)
option_parser.add_argument("--algo-auth", choices=algo_auth_choices, default='triple_des_cbc2',
help="OTA auth algorithm")
option_parser.add_argument('--kic', required=True, type=is_hexstr, help='OTA key (KIC)')
option_parser.add_argument('--kic-idx', default=1, type=int, help='OTA key index (KIC)')
option_parser.add_argument('--kid', required=True, type=is_hexstr, help='OTA key (KID)')
option_parser.add_argument('--kid-idx', default=1, type=int, help='OTA key index (KID)')
option_parser.add_argument('--cntr', default=0, type=int, help='replay protection counter')
option_parser.add_argument('--tar', required=True, type=is_hexstr, help='Toolkit Application Reference')
option_parser.add_argument("--cntr-req", choices=CNTR_REQ.decmapping.values(), default='no_counter',
help="Counter requirement")
option_parser.add_argument('--no-ciphering', action='store_true', default=False, help='Disable ciphering')
option_parser.add_argument("--rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
help="message check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
option_parser.add_argument('--por-in-submit', action='store_true', default=False,
help='require PoR to be sent via SMS-SUBMIT')
option_parser.add_argument('--por-no-ciphering', action='store_true', default=False, help='Disable ciphering (PoR)')
option_parser.add_argument("--por-rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
help="PoR check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
option_parser.add_argument("--por-req", choices=POR_REQ.decmapping.values(), default='por_required',
help="Proof of Receipt requirements")
option_parser.add_argument('--src-addr', default='12', type=str, help='SMS source address (MSISDN)')
option_parser.add_argument('--dest-addr', default='23', type=str, help='SMS destination address (MSISDN)')
option_parser.add_argument('--timeout', default=10, type=int, help='Maximum response waiting time')
option_parser.add_argument('-a', '--apdu', action='append', required=True, type=is_hexstr, help='C-APDU to send')
class SmppHandler:
client = None
def __init__(self, host: str, port: int,
system_id: str, password: str,
ota_keyset: OtaKeyset, spi: dict, tar: bytes):
"""
Initialize connection to SMPP server and set static OTA SMS-TPDU ciphering parameters
Args:
host : Hostname or IPv4/IPv6 address of the SMPP server
port : TCP Port of the SMPP server
system_id: SMPP System-ID used by ESME (client) to bind
password: SMPP Password used by ESME (client) to bind
ota_keyset: OTA keyset to be used for SMS-TPDU ciphering
spi: Security Parameter Indicator (SPI) to be used for SMS-TPDU ciphering
tar: Toolkit Application Reference (TAR) of the targeted card application
"""
# Create and connect SMPP client
client = smpplib.client.Client(host, port, allow_unknown_opt_params=True)
client.set_message_sent_handler(self.message_sent_handler)
client.set_message_received_handler(self.message_received_handler)
client.connect()
client.bind_transceiver(system_id=system_id, password=password)
self.client = client
# Setup static OTA parameters
self.ota_dialect = OtaDialectSms()
self.ota_keyset = ota_keyset
self.tar = tar
self.spi = spi
def __del__(self):
if self.client:
self.client.unbind()
self.client.disconnect()
def message_received_handler(self, pdu):
if pdu.short_message:
logger.info("SMS-TPDU received: %s", b2h(pdu.short_message))
try:
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
except ValueError:
# Retry to decoding with ciphering disabled (in case the card has problems to decode the SMS-TDPU
# we have sent, the response will contain an unencrypted error message)
spi = self.spi.copy()
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
logger.info("SMS-TPDU decoded: %s", dec)
self.response = dec
return None
def message_sent_handler(self, pdu):
logger.debug("SMS-TPDU sent: pdu_sequence=%s pdu_message_id=%s", pdu.sequence, pdu.message_id)
def transceive_sms_tpdu(self, tpdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple:
"""
Transceive SMS-TPDU. This method sends the SMS-TPDU to the SMPP server, and waits for a response. The method
returns when the response is received.
Args:
tpdu : short message content (plaintext)
src_addr : short message source address
dest_addr : short message destination address
timeout : timeout after which this method should give up waiting for a response
Returns:
tuple containing the response (plaintext)
"""
logger.info("SMS-TPDU sending: %s...", b2h(tpdu))
self.client.send_message(
# TODO: add parameters to switch source_addr_ton and dest_addr_ton between SMPP_TON_INTL and SMPP_NPI_ISDN
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
source_addr=src_addr,
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
destination_addr=dest_addr,
short_message=tpdu,
# TODO: add parameters to set data_coding and esm_class
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
protocol_id=0x7f,
# TODO: add parameter to use registered delivery
# registered_delivery=True,
)
logger.info("SMS-TPDU sent, waiting for response...")
timestamp_sent=int(time.time())
self.response = None
while self.response is None:
self.client.poll()
if int(time.time()) - timestamp_sent > timeout:
raise ValueError("Timeout reached, no response SMS-TPDU received!")
return self.response
def transceive_apdu(self, apdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple[bytes, bytes]:
"""
Transceive APDU. This method wraps the given APDU into an SMS-TPDU, sends it to the SMPP server and waits for
the response. When the response is received, the last response data and the last status word is extracted from
the response and returned to the caller.
Args:
apdu : one or more concatenated APDUs
src_addr : short message source address
dest_addr : short message destination address
timeout : timeout after which this method should give up waiting for a response
Returns:
tuple containing the last response data and the last status word as byte strings
"""
logger.info("C-APDU sending: %s...", b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
response = self.transceive_sms_tpdu(tpdu, src_addr, dest_addr, timeout)
# Extract last_response_data and last_status_word from the response
sw = None
resp = None
for container in response:
if container:
container_dict = dict(container)
resp = container_dict.get('last_response_data')
sw = container_dict.get('last_status_word')
if resp is None:
raise ValueError("Response does not contain any last_response_data, no R-APDU received!")
if sw is None:
raise ValueError("Response does not contain any last_status_word, no R-APDU received!")
logger.info("R-APDU received: %s %s", resp, sw)
return h2b(resp), h2b(sw)
if __name__ == '__main__':
opts = option_parser.parse_args()
logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
if opts.kic_idx != opts.kid_idx:
logger.warning("KIC index (%s) and KID index (%s) are different (security violation, card should reject message)",
opts.kic_idx, opts.kid_idx)
ota_keyset = OtaKeyset(algo_crypt=opts.algo_crypt,
kic_idx=opts.kic_idx,
kic=h2b(opts.kic),
algo_auth=opts.algo_auth,
kid_idx=opts.kid_idx,
kid=h2b(opts.kid),
cntr=opts.cntr)
spi = {'counter' : opts.cntr_req,
'ciphering' : not opts.no_ciphering,
'rc_cc_ds': opts.rc_cc_ds,
'por_in_submit': opts.por_in_submit,
'por_shall_be_ciphered': not opts.por_no_ciphering,
'por_rc_cc_ds': opts.por_rc_cc_ds,
'por': opts.por_req}
apdu = h2b("".join(opts.apdu))
smpp_handler = SmppHandler(opts.host, opts.port, opts.system_id, opts.password, ota_keyset, spi, h2b(opts.tar))
resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr, opts.dest_addr, opts.timeout)
print("%s %s" % (b2h(resp), b2h(sw)))

View File

@@ -48,48 +48,39 @@ must be made available on multiple sites, the `CardKeyProviderPgsql` is the
better option. better option.
The CardKeyProviderPgsql The CardKeyProviderPqsql
------------------------ ------------------------
With the `CardKeyProviderPgsql` you can use a PostgreSQL database as storage With the `CardKeyProviderPsql` you can use a PostgreSQL database as storage
medium. The implementation comes with a CSV importer tool that consumes the medium. The implementation comes with a CSV importer tool that consumes the
same CSV files you would normally use with the `CardKeyProviderCsv`, so you same CSV files you would normally use with the `CardKeyProviderCsv`, so you
can just use your existing CSV files and import them into the database. can just use your existing CSV files and import them into the database.
Requirements
^^^^^^^^^^^^
The `CardKeyProviderPgsql` uses the `Psycopg` PostgreSQL database adapter
(https://www.psycopg.org). `Psycopg` is not part of the default requirements
of pySim-shell and must be installed separately. `Psycopg` is available as
Python package under the name `psycopg2-binary`.
Setting up the database Setting up the database
^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^
From the perspective of the database, the `CardKeyProviderPgsql` has only From the perspective of the database, the `CardKeyProviderPsql` has only
minimal requirements. You do not have to create any tables in advance. An empty minimal requirements. You do not have to create any tables in advance. An empty
database and at least one user that may create, alter and insert into tables is database and at least one user that may create, alter and insert into tables is
sufficient. However, for increased reliability and as a protection against sufficient. However, for increased reliability and as a protection against
incorrect operation, the `CardKeyProviderPgsql` supports a hierarchical model incorrect operation, the `CardKeyProviderPsql` supports a hierarchical model
with three users (or roles): with three users (or roles):
* **admin**: * **admin**:
This should be the owner of the database. It is intended to be used for This should be the owner of the database. It is intended to be used for
administrative tasks like adding new tables or adding new columns to existing administrative tasks like adding new tables or adding new columns to existing
tables. This user should not be used to insert new data into tables or to access tables. This user should not be used to insert new data into tables or to access
data from within pySim-shell using the `CardKeyProviderPgsql` data from within pySim-shell using the `CardKeyProviderPsql`
* **importer**: * **importer**:
This user is used when feeding new data into an existing table. It should only This user is used when feeding new data into an existing table. It should only
be able to insert new rows into existing tables. It should not be used for be able to insert new rows into existing tables. It should not be used for
administrative tasks or to access data from within pySim-shell using the administrative tasks or to access data from within pySim-shell using the
`CardKeyProviderPgsql` `CardKeyProviderPsql`
* **reader**: * **reader**:
To access data from within pySim shell using the `CardKeyProviderPgsql` the To access data from within pySim shell using the `CardKeyProviderPsql` the
reader user is the correct one to use. This user should have no write access reader user is the correct one to use. This user should have no write access
to the database or any of the tables. to the database or any of the tables.
@@ -97,7 +88,7 @@ with three users (or roles):
Creating a config file Creating a config file
^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^
The default location for the config file is `~/.osmocom/pysim/card_data_pgsql.cfg` The default location for the config file is `~/.osmocom/pysim/card_data_pqsql.cfg`
The file uses `yaml` syntax and should look like the example below: The file uses `yaml` syntax and should look like the example below:
:: ::
@@ -129,9 +120,9 @@ passwords for each of the aforementioned users (or roles). In case only a single
admin user is used, all three entries may be populated with the same user name admin user is used, all three entries may be populated with the same user name
and password (not recommended) and password (not recommended)
The field `table_names` sets the tables that the `CardKeyProviderPgsql` shall The field `table_names` sets the tables that the `CardKeyProviderPsql` shall
use to query to locate card key data. You can set up as many tables as you use to query to locate card key data. You can set up as many tables as you
want, `CardKeyProviderPgsql` will query them in order, one by one until a want, `CardKeyProviderPsql` will query them in order, one by one until a
matching entry is found. matching entry is found.
NOTE: In case you do not want to disclose the admin and the importer credentials NOTE: In case you do not want to disclose the admin and the importer credentials
@@ -173,7 +164,7 @@ with the `admin` user. The `admin` user is selected using the `--admin` switch.
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys --create-table --admin $ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys --create-table --admin
INFO: CSV file: ./csv-to-pgsql_example_01.csv INFO: CSV file: ./csv-to-pgsql_example_01.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1'] INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1 INFO: Database host: 127.0.0.1
INFO: Database name: my_database INFO: Database name: my_database
INFO: Database user: my_admin_user INFO: Database user: my_admin_user
@@ -191,7 +182,7 @@ now ready to be filled with data.
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys $ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys
INFO: CSV file: ./csv-to-pgsql_example_01.csv INFO: CSV file: ./csv-to-pgsql_example_01.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1'] INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1 INFO: Database host: 127.0.0.1
INFO: Database name: my_database INFO: Database name: my_database
INFO: Database user: my_importer_user INFO: Database user: my_importer_user
@@ -223,7 +214,7 @@ creating new tables, this operation also requires admin privileges, so the
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys --update-columns --admin $ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys --update-columns --admin
INFO: CSV file: ./csv-to-pgsql_example_02.csv INFO: CSV file: ./csv-to-pgsql_example_02.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1'] INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1 INFO: Database host: 127.0.0.1
INFO: Database name: my_database INFO: Database name: my_database
INFO: Database user: my_admin_user INFO: Database user: my_admin_user
@@ -240,7 +231,7 @@ first one:
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys $ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys
INFO: CSV file: ./csv-to-pgsql_example_02.csv INFO: CSV file: ./csv-to-pgsql_example_02.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1'] INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1 INFO: Database host: 127.0.0.1
INFO: Database name: my_database INFO: Database name: my_database
INFO: Database user: my_importer_user INFO: Database user: my_importer_user

View File

@@ -48,7 +48,6 @@ pySim consists of several parts:
sim-rest sim-rest
suci-keytool suci-keytool
saip-tool saip-tool
smpp-ota-tool
Indices and tables Indices and tables

View File

@@ -1,179 +0,0 @@
smpp-ota-tool
=============
The `smpp-ota-tool` allows users to send OTA SMS messages containing APDU scripts (RFM, RAM) via an SMPP server. The
intended audience are developers who want to test/evaluate the OTA SMS interface of a SIM/UICC/eUICC. `smpp-ota-tool`
is intended to be used as a companion tool for :ref:`pySim-smpp2sim`, however it should be usable on any other SMPP
server (such as a production SMSC of a live cellular network) as well.
From the technical perspective `smpp-ota-tool` takes the role of an SMPP ESME. It takes care of the encoding, encryption
and checksumming (signing) of the RFM/RAM OTA SMS and eventually submits it to the SMPP server. The program then waits
for a response. The response is automatically parsed and printed on stdout. This makes the program also suitable to be
called from shell scripts.
.. note:: In the following we will we will refer to `SIM` as one of the following: `SIM`, `USIM`, `ISIM`, `UICC`,
`eUICC`, `eSIM`.
Applying OTA keys
~~~~~~~~~~~~~~~~~
Depending on the `SIM` type you will receive one or more sets of keys which you can use to communicate with the `SIM`
through a secure channel protocol. When using the OTA SMS method, the SCP80 protocol is used and it therefore crucial
to use a keyset that is actually suitable for SCP80.
A keyset usually consists of three keys:
#. KIC: the key used for ciphering (encryption/decryption)
#. KID: the key used to compute a cryptographic checksum (signing)
#. KIK: the key used to encrypt/decrypt key material (key rotation, adding of new keys)
From the transport security perspective, only KIC and KID are relevant. The KIK (also referenced as "Data Encryption
Key", DEK) is only used when keys are rotated or new keys are added (see also ETSI TS 102 226, section 8.2.1.5).
When the keyset is programmed into the security domain of the `SIM`, it is tied to a specific cryptographic algorithm
(3DES, AES128 or AES256) and a so called Key Version Number (KVN). The term "Key Version Number" is misleading, since
it is actually not a version number. It is a unique identifier of a certain keyset which also identifies for which
secure channel protocol the keyset may be used. Keysets with a KVN from 1-15 (``0x01``-``0x0F``) are suitable for SCP80.
This means that it is not only important to know just the KIC/KID/KIK keys. Also the related algorithms and the KVN
numbers must be known.
.. note:: SCP80 keysets typically start counting from 1 upwards. Typical configurations use a set of 3 keysets with
KVN numbers 1-3.
Addressing an Application
~~~~~~~~~~~~~~~~~~~~~~~~~
When communicating with a specific application on a `SIM` via SCP80, it is important to address that application with
the correct parameters. The following two parameters must be known in advance:
#. TAR: The Toolkit Application Reference (TAR) number is a three byte value that uniquely addresses an application
on the `SIM`. The exact values may vary (see also ETSI TS 101 220, Table D.1).
#. MSL: The Minimum Security Level (MSL) is a bit-field that dictates which of the security measures encoded in the
SPI are mandatory (see also ETSI TS 102 225, section 5.1.1).
A practical example
~~~~~~~~~~~~~~~~~~~
.. note:: This tutorial assumes that pySim-smpp2sim is running on the local machine with its default parameters.
See also :ref:`pySim-smpp2sim`.
Let's assume that an OTA SMS shall be sent to the SIM RFM application of an sysmoISIM-SJA2. What we want to do is to
select DF.GSM and to get the select response back.
We have received the following key material from the `SIM` vendor:
::
KIC1: F09C43EE1A0391665CC9F05AF4E0BD10
KID1: 01981F4A20999F62AF99988007BAF6CA
KIK1: 8F8AEE5CDCC5D361368BC45673D99195
KIC2: 01022916E945B656FDE03F806A105FA2
KID2: D326CB69F160333CC5BD1495D448EFD6
KIK2: 08037E0590DFE049D4975FFB8652F625
KIC3: 2B22824D0D27A3A1CEEC512B312082B4
KID3: F1697766925A11F4458295590137B672
KIK3: C7EE69B2C5A1C8E160DD36A38EB517B3
Those are three keysets. The enumeration is directly equal to the KVN used. All three keysets are 3DES keys, which
means triple_des_cbc2 is the correct algorithm to use.
.. note:: The key set configuration can be confirmed by retrieving the key configuration using
`get_data key_information` from within an SCP02 session on ADF.ISD.
In this example we intend to address the SIM RFM application on the `SIM`. Which according to the manual has TAR ``B00010``
and MSL ``0x06``. When we hold ``0x06`` = ``0b00000110`` against the SPI coding chart (see also ETSI TS 102 225,
section 5.1.1). We can deduct that Ciphering and Cryptographic Checksum are mandatory.
.. note:: The MSL (see also ETSI TS 102 226, section 6.1) is assigned to an application by the `SIM` issuer. It is a
custom decision and may vary with different `SIM` types/profiles. In the case of sysmoISIM-SJS1/SJA2/SJA5 the
counter requirement has been waived to simplify lab/research type use. In productive environments, `SIM`
applications should ideally use an MSL that makes the counter mandatory.
In order to select DF.GSM (``0x7F20``) and to retrieve the select response, two APDUs are needed. The first APDU is the
select command ``A0A40000027F20`` and the second is the related get-response command ``A0C0000016``. Those APDUs will be
concatenated and are sent in a single message. The message containing the concatenated APDUs works as a script that
is received by the SIM RFM application and then executed. This method poses some limitations that have to be taken into
account when making requests like this (see also ETSI TS 102 226, section 5).
With this information we may now construct a commandline for `smpp-ota-tool.py`. We will pass the KVN as kid_idx and
kic_idx (see also ETSI TS 102 225, Table 2, fields `KIc` and `KID`). Both index values should refer to the same
keyset/KVN as keysets should not be mixed. (`smpp-ota-tool` still provides separate parameters anyway to allow testing
with invalid keyset combinations)
::
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016
2026-02-26 17:13:56 INFO Connecting to localhost:2775...
2026-02-26 17:13:56 INFO C-APDU sending: a0a40000027f20a0c0000016...
2026-02-26 17:13:56 INFO SMS-TPDU sending: 02700000281506191515b00010da1d6cbbd0d11ce4330d844c7408340943e843f67a6d7b0674730881605fd62d...
2026-02-26 17:13:56 INFO SMS-TPDU sent, waiting for response...
2026-02-26 17:13:56 INFO SMS-TPDU received: 027100002c12b000107ddf58d1780f771638b3975759f4296cf5c31efc87a16a1b61921426baa16da1b5ba1a9951d59a39
2026-02-26 17:13:56 INFO SMS-TPDU decoded: (Container(rpl=44, rhl=18, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00\x00', pcntr=0, response_status=uEnumIntegerString.new(0, 'por_ok'), cc_rc=b'\x8f\xea\xf5.\xf4\x0e\xc2\x14', secured_data=b'\x02\x90\x00\x00\x00\xff\xff\x7f \x02\x00\x00\x00\x00\x00\t\xb1\x065\x04\x00\x83\x8a\x83\x8a'), Container(number_of_commands=2, last_status_word=u'9000', last_response_data=u'0000ffff7f2002000000000009b106350400838a838a'))
2026-02-26 17:13:56 INFO R-APDU received: 0000ffff7f2002000000000009b106350400838a838a 9000
0000ffff7f2002000000000009b106350400838a838a 9000
2026-02-26 17:13:56 INFO Disconnecting...
The result we see is the select response of DF.GSM and a status word indicating that the last command has been
processed normally.
As we can see, this mechanism now allows us to perform small administrative tasks remotely. We can read the contents of
files remotely or make changes to files. Depending on the changes we make, there may be security issues arising from
replay attacks. With the commandline above, the communication is encrypted and protected by a cryptographic checksum,
so an adversary can neither read, nor alter the message. However, an adversary could still replay an intercepted
message and the `SIM` would happily execute the contained APDUs again.
To prevent this, we may include a replay protection counter within the message. In this case, the MSL indicates that a
replay protection counter is not required. However, to extended the security of our messages, we may chose to use a
counter anyway. In the following example, we will encode a counter value of 100. We will instruct the `SIM` to make sure
that the value we send is higher than the counter value that is currently stored in the `SIM`.
To add a replay connection counter we add the commandline arguments `--cntr-req` to set the counter requirement and
`--cntr` to pass the counter value.
::
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016 --cntr-req counter_must_be_higher --cntr 100
2026-02-26 17:16:39 INFO Connecting to localhost:2775...
2026-02-26 17:16:39 INFO C-APDU sending: a0a40000027f20a0c0000016...
2026-02-26 17:16:39 INFO SMS-TPDU sending: 02700000281516191515b000103a4f599e94f2b5dcfbbda984761b7977df6514c57a580fb4844787c436d2eade...
2026-02-26 17:16:39 INFO SMS-TPDU sent, waiting for response...
2026-02-26 17:16:39 INFO SMS-TPDU received: 027100002c12b0001049fb0315f6c6401b553867f412cefaf9355b38271178edb342a3bc9cc7e670cdc1f45eea6ffcbb39
2026-02-26 17:16:39 INFO SMS-TPDU decoded: (Container(rpl=44, rhl=18, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00d', pcntr=0, response_status=uEnumIntegerString.new(0, 'por_ok'), cc_rc=b'\xa9/\xc7\xc9\x00"\xab5', secured_data=b'\x02\x90\x00\x00\x00\xff\xff\x7f \x02\x00\x00\x00\x00\x00\t\xb1\x065\x04\x00\x83\x8a\x83\x8a'), Container(number_of_commands=2, last_status_word=u'9000', last_response_data=u'0000ffff7f2002000000000009b106350400838a838a'))
2026-02-26 17:16:39 INFO R-APDU received: 0000ffff7f2002000000000009b106350400838a838a 9000
0000ffff7f2002000000000009b106350400838a838a 9000
2026-02-26 17:16:39 INFO Disconnecting...
The `SIM` has accepted the message. The message got processed and the `SIM` has set its internal to 100. As an experiment,
we may try to re-use the counter value:
::
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016 --cntr-req counter_must_be_higher --cntr 100
2026-02-26 17:16:43 INFO Connecting to localhost:2775...
2026-02-26 17:16:43 INFO C-APDU sending: a0a40000027f20a0c0000016...
2026-02-26 17:16:43 INFO SMS-TPDU sending: 02700000281516191515b000103a4f599e94f2b5dcfbbda984761b7977df6514c57a580fb4844787c436d2eade...
2026-02-26 17:16:43 INFO SMS-TPDU sent, waiting for response...
2026-02-26 17:16:43 INFO SMS-TPDU received: 027100000b0ab0001000000000000006
2026-02-26 17:16:43 INFO SMS-TPDU decoded: (Container(rpl=11, rhl=10, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00\x00', pcntr=0, response_status=uEnumIntegerString.new(6, 'undefined_security_error'), cc_rc=b'', secured_data=b''), None)
Traceback (most recent call last):
File "/home/user/work/git_master/pysim/./contrib/smpp-ota-tool.py", line 238, in <module>
resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr, opts.dest_addr, opts.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/work/git_master/pysim/./contrib/smpp-ota-tool.py", line 162, in transceive_apdu
raise ValueError("Response does not contain any last_response_data, no R-APDU received!")
ValueError: Response does not contain any last_response_data, no R-APDU received!
2026-02-26 17:16:43 INFO Disconnecting...
As we can see, the `SIM` has rejected the message with an `undefined_security_error`. The replay-protection-counter
ensures that a message can only be sent once.
.. note:: The replay-protection-counter is implemented as a 5 byte integer value (see also ETSI TS 102 225, Table 3).
When the counter has reached its maximum, it will not overflow nor can it be reset.
smpp-ota-tool syntax
~~~~~~~~~~~~~~~~~~~~
.. argparse::
:module: contrib.smpp-ota-tool
:func: option_parser
:prog: contrib/smpp-ota-tool.py

View File

@@ -55,5 +55,3 @@ And once your external program is sending SMS to the simulated SMSC, it will log
SMSPPDownload(DeviceIdentities({'source_dev_id': 'network', 'dest_dev_id': 'uicc'}),Address({'ton_npi': 0, 'call_number': '0123456'}),SMS_TPDU({'tpdu': '400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d'})) SMSPPDownload(DeviceIdentities({'source_dev_id': 'network', 'dest_dev_id': 'uicc'}),Address({'ton_npi': 0, 'call_number': '0123456'}),SMS_TPDU({'tpdu': '400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d'}))
INFO root: ENVELOPE: d147820283818604001032548b3b400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d INFO root: ENVELOPE: d147820283818604001032548b3b400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d
INFO root: SW 9000: 027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c INFO root: SW 9000: 027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c
.. note:: for sending OTA SMS messages :ref:`smpp-ota-tool` may be used.

View File

@@ -44,7 +44,6 @@ from pySim.exceptions import SwMatchError
from pySim.legacy.cards import card_detect, SimCard, UsimCard, IsimCard from pySim.legacy.cards import card_detect, SimCard, UsimCard, IsimCard
from pySim.utils import dec_imsi, dec_iccid from pySim.utils import dec_imsi, dec_iccid
from pySim.legacy.utils import format_xplmn_w_act, dec_st, dec_msisdn from pySim.legacy.utils import format_xplmn_w_act, dec_st, dec_msisdn
from pySim.ts_51_011 import EF_SMSP
option_parser = argparse.ArgumentParser(description='Legacy tool for reading some parts of a SIM card', option_parser = argparse.ArgumentParser(description='Legacy tool for reading some parts of a SIM card',
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@@ -142,15 +141,6 @@ if __name__ == '__main__':
(res, sw) = card.read_record('SMSP', 1) (res, sw) = card.read_record('SMSP', 1)
if sw == '9000': if sw == '9000':
print("SMSP: %s" % (res,)) print("SMSP: %s" % (res,))
ef_smsp = EF_SMSP()
smsc_a = ef_smsp.decode_record_bin(h2b(res), 1).get('tp_sc_addr', {})
smsc_n = smsc_a.get('call_number', None)
if smsc_a.get('ton_npi', {}).get('type_of_number', None) == 'international' and smsc_n is not None:
smsc = '+' + smsc_n
else:
smsc = smsc_n
if smsc is not None:
print("SMSC: %s" % (smsc,))
else: else:
print("SMSP: Can't read, response code = %s" % (sw,)) print("SMSP: Can't read, response code = %s" % (sw,))

View File

@@ -74,7 +74,7 @@ from pySim.card_key_provider import card_key_provider_register, card_key_provide
from pySim.app import init_card from pySim.app import init_card
log = PySimLogger.get(Path(__file__).stem) log = PySimLogger.get("main")
class Cmd2Compat(cmd2.Cmd): class Cmd2Compat(cmd2.Cmd):
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer """Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
@@ -519,17 +519,8 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
@cmd2.with_category(CUSTOM_CATEGORY) @cmd2.with_category(CUSTOM_CATEGORY)
def do_version(self, opts): def do_version(self, opts):
"""Print the pySim software version.""" """Print the pySim software version."""
from importlib.metadata import version as vsn import pkg_resources
self.poutput("pyosmocom " + vsn('pyosmocom')) self.poutput(pkg_resources.get_distribution('pySim'))
import os
cwd = os.path.dirname(os.path.realpath(__file__))
if os.path.isdir(os.path.join(cwd, ".git")):
import subprocess
url = subprocess.check_output(['git', 'config', '--get', 'remote.origin.url']).decode('ascii').strip()
version = subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=cwd).decode('ascii').strip()
self.poutput(os.path.basename(url) + " " + version)
else:
self.poutput("pySim " + vsn('pySim'))
@with_default_category('pySim Commands') @with_default_category('pySim Commands')
class PySimCommands(CommandSet): class PySimCommands(CommandSet):
@@ -1147,10 +1138,10 @@ global_group.add_argument("--verbose", help="Enable verbose logging",
card_key_group = option_parser.add_argument_group('Card Key Provider Options') card_key_group = option_parser.add_argument_group('Card Key Provider Options')
card_key_group.add_argument('--csv', metavar='FILE', card_key_group.add_argument('--csv', metavar='FILE',
default="~/.osmocom/pysim/card_data.csv", default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
help='Read card data from CSV file') help='Read card data from CSV file')
card_key_group.add_argument('--pgsql', metavar='FILE', card_key_group.add_argument('--pqsql', metavar='FILE',
default="~/.osmocom/pysim/card_data_pgsql.cfg", default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
help='Read card data from PostgreSQL database (config file)') help='Read card data from PostgreSQL database (config file)')
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append', card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help=argparse.SUPPRESS, dest='column_key') help=argparse.SUPPRESS, dest='column_key')
@@ -1176,7 +1167,7 @@ if __name__ == '__main__':
# Ensure that we are able to print formatted warnings from the beginning. # Ensure that we are able to print formatted warnings from the beginning.
PySimLogger.setup(print, {logging.WARN: YELLOW}) PySimLogger.setup(print, {logging.WARN: YELLOW})
if opts.verbose: if (opts.verbose):
PySimLogger.set_verbose(True) PySimLogger.set_verbose(True)
PySimLogger.set_level(logging.DEBUG) PySimLogger.set_level(logging.DEBUG)
else: else:
@@ -1189,10 +1180,10 @@ if __name__ == '__main__':
for par in opts.column_key: for par in opts.column_key:
name, key = par.split(':') name, key = par.split(':')
column_keys[name] = key column_keys[name] = key
if os.path.isfile(os.path.expanduser(opts.csv)): if os.path.isfile(opts.csv):
card_key_provider_register(CardKeyProviderCsv(os.path.expanduser(opts.csv), column_keys)) card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
if os.path.isfile(os.path.expanduser(opts.pgsql)): if os.path.isfile(opts.pqsql):
card_key_provider_register(CardKeyProviderPgsql(os.path.expanduser(opts.pgsql), column_keys)) card_key_provider_register(CardKeyProviderPgsql(opts.pqsql, column_keys))
# Init card reader driver # Init card reader driver
sl = init_reader(opts, proactive_handler = Proact()) sl = init_reader(opts, proactive_handler = Proact())

View File

@@ -23,7 +23,6 @@ from pySim.apdu_source.gsmtap import GsmtapApduSource
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive
from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap
from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource
from pySim.apdu_source.stdin_hex import StdinHexApduSource
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
@@ -191,10 +190,6 @@ parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
parser_tcaloader_log.add_argument('-f', '--log-file', required=True, parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
help='Name of the log file to be read') help='Name of the log file to be read')
parser_stdin_hex = subparsers.add_parser('stdin-hex', help="""
Read APDUs as hex-string from stdin.""")
if __name__ == '__main__': if __name__ == '__main__':
opts = option_parser.parse_args() opts = option_parser.parse_args()
@@ -210,8 +205,6 @@ if __name__ == '__main__':
s = PysharkGsmtapPcap(opts.pcap_file) s = PysharkGsmtapPcap(opts.pcap_file)
elif opts.source == 'tca-loader-log': elif opts.source == 'tca-loader-log':
s = TcaLoaderLogApduSource(opts.log_file) s = TcaLoaderLogApduSource(opts.log_file)
elif opts.source == 'stdin-hex':
s = StdinHexApduSource()
else: else:
raise ValueError("unsupported source %s", opts.source) raise ValueError("unsupported source %s", opts.source)

View File

@@ -84,5 +84,5 @@ class PysharkGsmtapPcap(_PysharkGsmtap):
Args: Args:
pcap_filename: File name of the pcap file to be opened pcap_filename: File name of the pcap file to be opened
""" """
pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='gsm_sim || iso7816.atr', use_json=True, keep_packets=False) pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='gsm_sim', use_json=True, keep_packets=False)
super().__init__(pyshark_inst) super().__init__(pyshark_inst)

View File

@@ -1,39 +0,0 @@
# coding=utf-8
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.utils import h2b
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_102_222 import ApduCommands as UiccAdmApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
from . import ApduSource, PacketType, CardReset
ApduCommands = UiccApduCommands + UiccAdmApduCommands + UsimApduCommands + GpApduCommands
class StdinHexApduSource(ApduSource):
"""ApduSource for reading apdu hex-strings from stdin."""
def read_packet(self) -> PacketType:
while True:
command = input("C-APDU >")
if len(command) == 0:
continue
response = '9000'
return ApduCommands.parse_cmd_bytes(h2b(command) + h2b(response))

View File

@@ -7,7 +7,7 @@ there are also automatic card feeders.
""" """
# #
# (C) 2019 by sysmocom - s.f.m.c. GmbH # (C) 2019 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify

View File

@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
operation with pySim-shell. operation with pySim-shell.
""" """
# (C) 2021-2025 by sysmocom - s.f.m.c. GmbH # (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# Author: Philipp Maier, Harald Welte # Author: Philipp Maier, Harald Welte
@@ -37,8 +37,10 @@ import abc
import csv import csv
import logging import logging
import yaml import yaml
import psycopg2
from psycopg2.sql import Identifier, SQL
log = PySimLogger.get(__name__) log = PySimLogger.get("CARDKEY")
card_key_providers = [] # type: List['CardKeyProvider'] card_key_providers = [] # type: List['CardKeyProvider']
@@ -58,7 +60,7 @@ class CardKeyFieldCryptor:
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'], 'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'], 'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'], 'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
'SCP03_ISDA': ['SCP03_ENC_ISDA', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'], 'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'], 'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
} }
@@ -197,7 +199,6 @@ class CardKeyProviderPgsql(CardKeyProvider):
config_filename : file name (path) of CSV file containing card-individual key/data config_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor) transport_keys : (see class CardKeyFieldCryptor)
""" """
import psycopg2
log.info("Using SQL database as card key data source: %s" % config_filename) log.info("Using SQL database as card key data source: %s" % config_filename)
with open(config_filename, "r") as cfg: with open(config_filename, "r") as cfg:
config = yaml.load(cfg, Loader=yaml.FullLoader) config = yaml.load(cfg, Loader=yaml.FullLoader)
@@ -215,8 +216,6 @@ class CardKeyProviderPgsql(CardKeyProvider):
self.crypt = CardKeyFieldCryptor(transport_keys) self.crypt = CardKeyFieldCryptor(transport_keys)
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]: def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
import psycopg2
from psycopg2.sql import Identifier, SQL
db_result = None db_result = None
for t in self.tables: for t in self.tables:
self.conn.rollback() self.conn.rollback()

View File

@@ -128,7 +128,7 @@ class EF_AD(TransparentEF):
cell_test = 0x04 cell_test = 0x04
def __init__(self, fid='6f43', sfid=None, name='EF.AD', def __init__(self, fid='6f43', sfid=None, name='EF.AD',
desc='Administrative Data', size=(3, None), **kwargs): desc='Service Provider Name', size=(3, None), **kwargs):
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, **kwargs) super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, **kwargs)
self._construct = Struct( self._construct = Struct(
# Byte 1: Display Condition # Byte 1: Display Condition

View File

@@ -54,8 +54,6 @@ def compile_asn1_subdir(subdir_name:str, codec='der'):
__ver = sys.version_info __ver = sys.version_info
if (__ver.major, __ver.minor) >= (3, 9): if (__ver.major, __ver.minor) >= (3, 9):
for i in resources.files('pySim.esim').joinpath('asn1').joinpath(subdir_name).iterdir(): for i in resources.files('pySim.esim').joinpath('asn1').joinpath(subdir_name).iterdir():
if not i.name.endswith('.asn'):
continue
asn_txt += i.read_text() asn_txt += i.read_text()
asn_txt += "\n" asn_txt += "\n"
#else: #else:

View File

@@ -27,7 +27,7 @@ logger.setLevel(logging.DEBUG)
class param: class param:
class Iccid(ApiParamString): class Iccid(ApiParamString):
"""String representation of 18 to 20 digits, where the 20th digit MAY optionally be the padding """String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding
character F.""" character F."""
@classmethod @classmethod
def _encode(cls, data): def _encode(cls, data):
@@ -40,7 +40,7 @@ class param:
@classmethod @classmethod
def verify_encoded(cls, data): def verify_encoded(cls, data):
if len(data) not in (18, 19, 20): if len(data) not in [19, 20]:
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
@classmethod @classmethod
@@ -53,7 +53,7 @@ class param:
@classmethod @classmethod
def verify_decoded(cls, data): def verify_decoded(cls, data):
data = str(data) data = str(data)
if len(data) not in (18, 19, 20): if len(data) not in [19, 20]:
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
if len(data) == 19: if len(data) == 19:
decimal_part = data decimal_part = data

View File

@@ -149,8 +149,7 @@ class ApiError(Exception):
'message': None, 'message': None,
} }
actual_sec = func_ex_status.get('statusCodeData', None) actual_sec = func_ex_status.get('statusCodeData', None)
if actual_sec: sec.update(actual_sec)
sec.update(actual_sec)
self.subject_code = sec['subjectCode'] self.subject_code = sec['subjectCode']
self.reason_code = sec['reasonCode'] self.reason_code = sec['reasonCode']
self.subject_id = sec['subjectIdentifier'] self.subject_id = sec['subjectIdentifier']
@@ -256,10 +255,5 @@ class JsonHttpApiFunction(abc.ABC):
raise HttpHeaderError(response) raise HttpHeaderError(response)
if response.content: if response.content:
if response.headers.get('Content-Type').startswith('application/json'): return self.decode(response.json())
return self.decode(response.json())
elif response.headers.get('Content-Type').startswith('text/plain;charset=UTF-8'):
return { 'data': response.content.decode('utf-8') }
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
return None return None

View File

@@ -21,8 +21,6 @@ import io
import os import os
from typing import Tuple, List, Optional, Dict, Union from typing import Tuple, List, Optional, Dict, Union
from collections import OrderedDict from collections import OrderedDict
from difflib import SequenceMatcher, Match
import asn1tools import asn1tools
import zipfile import zipfile
from pySim import javacard from pySim import javacard
@@ -46,29 +44,6 @@ asn1 = compile_asn1_subdir('saip')
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NonMatch(Match):
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
@classmethod
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
The caller must ensure that the input list is ordered, non-overlapping and only contains
matches at equal offsets in a and b."""
res = []
cur = 0
for match in l:
if match.a != match.b:
raise ValueError('only works for equal-offset matches')
assert match.a >= cur
nm_len = match.a - cur
if nm_len > 0:
# there's no point in generating zero-lenth non-matching sections
res.append(cls(a=cur, b=cur, size=nm_len))
cur = match.a + match.size
if size > cur:
res.append(cls(a=cur, b=cur, size=size-cur))
return res
class Naa: class Naa:
"""A class defining a Network Access Application (NAA)""" """A class defining a Network Access Application (NAA)"""
name = None name = None
@@ -151,8 +126,6 @@ class File:
self.df_name = None self.df_name = None
self.fill_pattern = None self.fill_pattern = None
self.fill_pattern_repeat = False self.fill_pattern_repeat = False
self.pstdo = None # pinStatusTemplateDO, mandatory for DF/ADF
self.lcsi = None # optional life cycle status indicator
# apply some defaults from profile # apply some defaults from profile
if self.template: if self.template:
self.from_template(self.template) self.from_template(self.template)
@@ -171,9 +144,6 @@ class File:
def file_size(self) -> Optional[int]: def file_size(self) -> Optional[int]:
"""Return the size of the file in bytes.""" """Return the size of the file in bytes."""
if self.file_type in ['LF', 'CY']: if self.file_type in ['LF', 'CY']:
if self._file_size and self.nb_rec is None and self.rec_len:
self.nb_rec = self._file_size // self.rec_len
return self.nb_rec * self.rec_len return self.nb_rec * self.rec_len
elif self.file_type in ['TR', 'BT']: elif self.file_type in ['TR', 'BT']:
return self._file_size return self._file_size
@@ -280,8 +250,6 @@ class File:
elif self.file_type in ['MF', 'DF', 'ADF']: elif self.file_type in ['MF', 'DF', 'ADF']:
fdb_dec['file_type'] = 'df' fdb_dec['file_type'] = 'df'
fdb_dec['structure'] = 'no_info_given' fdb_dec['structure'] = 'no_info_given'
# pinStatusTemplateDO is mandatory for DF/ADF
fileDescriptor['pinStatusTemplateDO'] = self.pstdo
# build file descriptor based on above input data # build file descriptor based on above input data
fd_dict = {} fd_dict = {}
if len(fdb_dec): if len(fdb_dec):
@@ -308,8 +276,6 @@ class File:
# desired fill or repeat pattern in the "proprietaryEFInfo" element for the EF in Profiles # desired fill or repeat pattern in the "proprietaryEFInfo" element for the EF in Profiles
# downloaded to a V2.2 or earlier eUICC. # downloaded to a V2.2 or earlier eUICC.
fileDescriptor['proprietaryEFInfo'] = pefi fileDescriptor['proprietaryEFInfo'] = pefi
if self.lcsi:
fileDescriptor['lcsi'] = self.lcsi
logger.debug("%s: to_fileDescriptor(%s)" % (self, fileDescriptor)) logger.debug("%s: to_fileDescriptor(%s)" % (self, fileDescriptor))
return fileDescriptor return fileDescriptor
@@ -325,12 +291,6 @@ class File:
dfName = fileDescriptor.get('dfName', None) dfName = fileDescriptor.get('dfName', None)
if dfName: if dfName:
self.df_name = dfName self.df_name = dfName
efFileSize = fileDescriptor.get('efFileSize', None)
if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
self.pstdo = fileDescriptor.get('pinStatusTemplateDO', None)
self.lcsi = fileDescriptor.get('lcsi', None)
pefi = fileDescriptor.get('proprietaryEFInfo', {}) pefi = fileDescriptor.get('proprietaryEFInfo', {})
securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None) securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None)
if securityAttributesReferenced: if securityAttributesReferenced:
@@ -340,11 +300,13 @@ class File:
fdb_dec = fd_dec['file_descriptor_byte'] fdb_dec = fd_dec['file_descriptor_byte']
self.shareable = fdb_dec['shareable'] self.shareable = fdb_dec['shareable']
if fdb_dec['file_type'] == 'working_ef': if fdb_dec['file_type'] == 'working_ef':
efFileSize = fileDescriptor.get('efFileSize', None)
if fd_dec['num_of_rec']: if fd_dec['num_of_rec']:
self.nb_rec = fd_dec['num_of_rec'] self.nb_rec = fd_dec['num_of_rec']
if fd_dec['record_len']: if fd_dec['record_len']:
self.rec_len = fd_dec['record_len'] self.rec_len = fd_dec['record_len']
if efFileSize: if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
if self.rec_len and self.nb_rec == None: if self.rec_len and self.nb_rec == None:
# compute the number of records from file size and record length # compute the number of records from file size and record length
self.nb_rec = self._file_size // self.rec_len self.nb_rec = self._file_size // self.rec_len
@@ -444,40 +406,12 @@ class File:
return ValueError("Unknown key '%s' in tuple list" % k) return ValueError("Unknown key '%s' in tuple list" % k)
return stream.getvalue() return stream.getvalue()
def file_content_to_tuples(self, optimize:bool = False) -> List[Tuple]: def file_content_to_tuples(self) -> List[Tuple]:
"""Encode the file contents into a list of fillFileContent / fillFileOffset tuples that can be fed # FIXME: simplistic approach. needs optimization. We should first check if the content
into the asn.1 encoder. If optimize is True, it will try to encode only the differences from the # matches the expanded default value from the template. If it does, return empty list.
fillFileContent of the profile template. Otherwise, the entire file contents will be encoded # Next, we should compute the diff between the default value and self.body, and encode
as-is.""" # that as a sequence of fillFileOffset and fillFileContent tuples.
if not self.file_type in ['TR', 'LF', 'CY', 'BT']: return [('fillFileContent', self.body)]
return []
if not optimize:
# simplistic approach: encode the full file, ignoring the template/default
return [('fillFileContent', self.body)]
# Try to 'compress' the file body, based on the default file contents.
if self.template:
default = self.template.expand_default_value_pattern(length=len(self.body))
if not default:
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
else:
if default == self.body:
# 100% match: return an empty tuple list to make eUICC use the default
return []
sm = SequenceMatcher(a=default, b=self.body)
else:
# no template at all: we can only remove padding
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
matching_blocks = sm.get_matching_blocks()
# we can only make use of matches that have the same offset in 'a' and 'b'
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
ret = []
cur = 0
for block in non_matching_blocks:
ret.append(('fillFileOffset', block.a - cur))
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
cur += block.size
return ret
def __str__(self) -> str: def __str__(self) -> str:
return "File(%s)" % self.pe_name return "File(%s)" % self.pe_name
@@ -699,15 +633,8 @@ class FsProfileElement(ProfileElement):
self.pe_sequence.cur_df = pe_df self.pe_sequence.cur_df = pe_df
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file) self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
def file2pe(self, file: File):
"""Update the "decoded" member for the given file with the contents from the given File instance.
We expect that the File instance is part of self.files"""
if self.files[file.pe_name] != file:
raise ValueError("The file you passed is not part of this ProfileElement")
self.decoded[file.pe_name] = file.to_tuples()
def files2pe(self): def files2pe(self):
"""Update the "decoded" member for each file with the contents of the "files" member.""" """Update the "decoded" member with the contents of the "files" member."""
for k, f in self.files.items(): for k, f in self.files.items():
self.decoded[k] = f.to_tuples() self.decoded[k] = f.to_tuples()
@@ -1079,13 +1006,6 @@ class SecurityDomainKey:
'keyVersionNumber': bytes([self.key_version_number]), 'keyVersionNumber': bytes([self.key_version_number]),
'keyComponents': [k.to_saip_dict() for k in self.key_components]} 'keyComponents': [k.to_saip_dict() for k in self.key_components]}
def get_key_component(self, key_type):
for kc in self.key_components:
if kc.key_type == key_type:
return kc.key_data
return None
class ProfileElementSD(ProfileElement): class ProfileElementSD(ProfileElement):
"""Class representing a securityDomain ProfileElement.""" """Class representing a securityDomain ProfileElement."""
type = 'securityDomain' type = 'securityDomain'
@@ -1100,7 +1020,6 @@ class ProfileElementSD(ProfileElement):
def __init__(self, decoded: Optional[dict] = None, **kwargs): def __init__(self, decoded: Optional[dict] = None, **kwargs):
super().__init__(decoded, **kwargs) super().__init__(decoded, **kwargs)
if decoded: if decoded:
self._post_decode()
return return
# provide some reasonable defaults for a MNO-SD # provide some reasonable defaults for a MNO-SD
self.decoded['instance'] = { self.decoded['instance'] = {
@@ -1819,7 +1738,8 @@ class ProfileElementSequence:
del hdr.decoded['eUICC-Mandatory-services'][service] del hdr.decoded['eUICC-Mandatory-services'][service]
# remove any associated mandatory filesystem templates # remove any associated mandatory filesystem templates
for template in naa.templates: for template in naa.templates:
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)] if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
# determine the ADF names (AIDs) of all NAA ADFs # determine the ADF names (AIDs) of all NAA ADFs
naa_adf_names = [] naa_adf_names = []
if naa.pe_types[0] in self.pe_by_type: if naa.pe_types[0] in self.pe_by_type:
@@ -1862,7 +1782,7 @@ class ProfileElementSequence:
return None return None
@staticmethod @staticmethod
def peclass_for_path(path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]: def peclass_for_path(path: Path) -> Optional[ProfileElement]:
"""Return the ProfileElement class that can contain a file with given path.""" """Return the ProfileElement class that can contain a file with given path."""
naa = ProfileElementSequence.naa_for_path(path) naa = ProfileElementSequence.naa_for_path(path)
if naa: if naa:
@@ -1895,7 +1815,7 @@ class ProfileElementSequence:
return ProfileElementTelecom, ft return ProfileElementTelecom, ft
return ProfileElementGFM, None return ProfileElementGFM, None
def pe_for_path(self, path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]: def pe_for_path(self, path: Path) -> Optional[ProfileElement]:
"""Return the ProfileElement instance that can contain a file with matching path. This will """Return the ProfileElement instance that can contain a file with matching path. This will
either be an existing PE within the sequence, or it will be a newly-allocated PE that is either be an existing PE within the sequence, or it will be a newly-allocated PE that is
inserted into the sequence.""" inserted into the sequence."""
@@ -1961,10 +1881,7 @@ class ProfileElementSequence:
class FsNode: class FsNode:
"""A node in the filesystem hierarchy. Each node can have a parent node and any number of children. """A node in the filesystem hierarchy."""
Each node is identified uniquely within the parent by its numeric FID and its optional human-readable
name. Each node usually is associated with an instance of the File class for the actual content of
the file. FsNode is the base class used by more specific nodes, such as FsNode{EF,DF,ADF,MF}."""
def __init__(self, fid: int, parent: Optional['FsNode'], file: Optional[File] = None, def __init__(self, fid: int, parent: Optional['FsNode'], file: Optional[File] = None,
name: Optional[str] = None): name: Optional[str] = None):
self.fid = fid self.fid = fid
@@ -2019,7 +1936,7 @@ class FsNode:
return x return x
def walk(self, fn, **kwargs): def walk(self, fn, **kwargs):
"""call 'fn(self, ``**kwargs``) for the File.""" """call 'fn(self, **kwargs) for the File."""
return [fn(self, **kwargs)] return [fn(self, **kwargs)]
class FsNodeEF(FsNode): class FsNodeEF(FsNode):
@@ -2109,7 +2026,7 @@ class FsNodeDF(FsNode):
return cur return cur
def walk(self, fn, **kwargs): def walk(self, fn, **kwargs):
"""call 'fn(self, ``**kwargs``) for the DF and recursively for all children.""" """call 'fn(self, **kwargs) for the DF and recursively for all children."""
ret = super().walk(fn, **kwargs) ret = super().walk(fn, **kwargs)
for c in self.children.values(): for c in self.children.values():
ret += c.walk(fn, **kwargs) ret += c.walk(fn, **kwargs)

View File

@@ -1,360 +0,0 @@
"""Implementation of Personalization of eSIM profiles in SimAlliance/TCA Interoperable Profile:
Run a batch of N personalizations"""
# (C) 2025-2026 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: nhofmeyr@sysmocom.de
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import pprint
from typing import List, Generator
from pySim.esim.saip.personalization import ConfigurableParameter
from pySim.esim.saip import param_source
from pySim.esim.saip import ProfileElementSequence, ProfileElementSD
from pySim.global_platform import KeyUsageQualifier
from osmocom.utils import b2h
class BatchPersonalization:
"""Produce a series of eSIM profiles from predefined parameters.
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
Usage example:
der_input = some_file.open('rb').read()
pes = ProfileElementSequence.from_der(der_input)
p = pers.BatchPersonalization(
n=10,
src_pes=pes,
csv_rows=get_csv_reader())
p.add_param_and_src(
personalization.Iccid(),
param_source.IncDigitSource(
num_digits=18,
first_value=123456789012340001,
last_value=123456789012340010))
# add more parameters here, using ConfigurableParameter and ParamSource subclass instances to define the profile
# ...
# generate all 10 profiles (from n=10 above)
for result_pes in p.generate_profiles():
upp = result_pes.to_der()
store_upp(upp)
"""
class ParamAndSrc:
"""tie a ConfigurableParameter to a source of actual values"""
def __init__(self, param: ConfigurableParameter, src: param_source.ParamSource):
if isinstance(param, type):
self.param_cls = param
else:
self.param_cls = param.__class__
self.src = src
def __init__(self,
n: int,
src_pes: ProfileElementSequence,
params: list[ParamAndSrc]=[],
csv_rows: Generator=None,
):
"""
n: number of eSIM profiles to generate.
src_pes: a decoded eSIM profile as ProfileElementSequence, to serve as template. This is not modified, only
copied.
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
profile values.
csv_rows: A generator (e.g. iter(list_of_rows)) producing all CSV rows one at a time, starting with a row
containing the column headers. This is compatible with the python csv.reader. Each row gets passed to
ParamSource.get_next(), such that ParamSource implementations can access the row items. See
param_source.CsvSource.
"""
self.n = n
self.params = params or []
self.src_pes = src_pes
self.csv_rows = csv_rows
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
self.params.append(BatchPersonalization.ParamAndSrc(param, src))
def generate_profiles(self):
# get first row of CSV: column names
csv_columns = None
if self.csv_rows:
try:
csv_columns = next(self.csv_rows)
except StopIteration as e:
raise ValueError('the input CSV file appears to be empty') from e
for i in range(self.n):
csv_row = None
if self.csv_rows and csv_columns:
try:
csv_row_list = next(self.csv_rows)
except StopIteration as e:
raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e
csv_row = dict(zip(csv_columns, csv_row_list))
pes = copy.deepcopy(self.src_pes)
for p in self.params:
try:
input_value = p.src.get_next(csv_row=csv_row)
assert input_value is not None
value = p.param_cls.validate_val(input_value)
p.param_cls.apply_val(pes, value)
except Exception as e:
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e
yield pes
class UppAudit(dict):
"""
Key-value pairs collected from a single UPP DER or PES.
UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP.
"""
@classmethod
def from_der(cls, der: bytes, params: List, der_size=False, additional_sd_keys=False):
"""return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
both 'IMSI' and 'IMSI-ACC' parameters.
e.g.
UppAudit.from_der(my_der, [Imsi, ])
--> {'IMSI': '001010000000023', 'IMSI-ACC': '5'}
(where 'IMSI' == Imsi.name)
Read all parameters listed in params. params is a list of either ConfigurableParameter classes or
ConfigurableParameter class instances. This calls only classmethods, so each entry in params can either be the
class itself, or a class-instance of, a (non-abstract) ConfigurableParameter subclass.
For example, params = [Imsi, ] is equivalent to params = [Imsi(), ].
For der_size=True, also include a {'der_size':12345} entry.
For additional_sd_keys=True, output also all Security Domain KVN that there are *no* ConfigurableParameter
subclasses for. For example, SCP80 has reserved kvn 0x01..0x0f, but we offer only Scp80Kvn01, Scp80Kvn02,
Scp80Kvn03. So we would not show kvn 0x04..0x0f in an audit. additional_sd_keys=True includes audits of all SD
key KVN there may be in the UPP. This helps to spot SD keys that may already be present in a UPP template, with
unexpected / unusual kvn.
"""
# make an instance of this class
upp_audit = cls()
if der_size:
upp_audit['der_size'] = set((len(der), ))
pes = ProfileElementSequence.from_der(der)
for param in params:
try:
for valdict in param.get_values_from_pes(pes):
upp_audit.add_values(valdict)
except Exception as e:
raise ValueError(f'Error during audit for parameter {param}: {e}') from e
if not additional_sd_keys:
return upp_audit
# additional_sd_keys
for pe in pes.pe_list:
if pe.type != 'securityDomain':
continue
assert isinstance(pe, ProfileElementSD)
for key in pe.keys:
audit_key = f'SdKey_KVN{key.key_version_number:02x}_ID{key.key_identifier:02x}'
kuq_bin = KeyUsageQualifier.build(key.key_usage_qualifier).hex()
audit_val = f'{key.key_components=!r} key_usage_qualifier=0x{kuq_bin}={key.key_usage_qualifier!r}'
upp_audit[audit_key] = set((audit_val, ))
return upp_audit
def get_single_val(self, key, validate=True, allow_absent=False, absent_val=None):
"""
Return the audit's value for the given audit key (like 'IMSI' or 'IMSI-ACC').
Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value,
return that value. When they do not agree, raise a ValueError.
"""
# key should be a string, but if someone passes a ConfigurableParameter, just use its default name
if ConfigurableParameter.is_super_of(key):
key = key.get_name()
assert isinstance(key, str)
v = self.get(key)
if v is None and allow_absent:
return absent_val
if not isinstance(v, set):
raise ValueError(f'audit value should be a set(), got {v!r}')
if len(v) != 1:
raise ValueError(f'expected a single value for {key}, got {v!r}')
v = tuple(v)[0]
return v
@staticmethod
def audit_val_to_str(v):
"""
Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values,
audit values are always python sets. Turn it into a nice string representation: only the value when it is
unambiguous, otherwise a list of the ambiguous values.
A value may also be completely absent, then return 'not present'.
"""
def try_single_val(w):
'change single-entry sets to just the single value'
if isinstance(w, set):
if len(w) == 1:
return tuple(w)[0]
if len(w) == 0:
return None
return w
v = try_single_val(v)
if isinstance(v, bytes):
v = bytes_to_hexstr(v)
if v is None:
return 'not present'
return str(v)
def get_val_str(self, key):
"""Return a string of the value stored for the given key"""
return UppAudit.audit_val_to_str(self.get(key))
def add_values(self, src:dict):
"""self and src are both a dict of sets.
For example from
self == { 'a': set((123,)) }
and
src == { 'a': set((456,)), 'b': set((789,)) }
then after this function call:
self == { 'a': set((123, 456,)), 'b': set((789,)) }
"""
assert isinstance(src, dict)
for key, srcvalset in src.items():
dstvalset = self.get(key)
if dstvalset is None:
dstvalset = set()
self[key] = dstvalset
dstvalset.add(srcvalset)
def __str__(self):
return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys()))
class BatchAudit(list):
"""
Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization.
Produce an output CSV.
Usage example:
ba = BatchAudit(params=(personalization.Iccid, ))
for upp_der in upps:
ba.add_audit(upp_der)
print(ba.summarize())
with open('output.csv', 'wb') as csv_data:
csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='')
csv.writer(csv_str).writerows( ba.to_csv_rows() )
csv_str.flush()
BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances.
"""
def __init__(self, params:List):
assert params
self.params = params
def add_audit(self, upp_der:bytes):
audit = UppAudit.from_der(upp_der, self.params)
self.append(audit)
return audit
def summarize(self):
batch_audit = UppAudit()
audits = self
if len(audits) > 2:
val_sep = ', ..., '
else:
val_sep = ', '
first_audit = None
last_audit = None
if len(audits) >= 1:
first_audit = audits[0]
if len(audits) >= 2:
last_audit = audits[-1]
if first_audit:
if last_audit:
for key in first_audit.keys():
first_val = first_audit.get_val_str(key)
last_val = last_audit.get_val_str(key)
if first_val == last_val:
val = first_val
else:
val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}"
val = val_sep_with_newline.join((first_val, last_val))
batch_audit[key] = val
else:
batch_audit.update(first_audit)
return batch_audit
def to_csv_rows(self, headers=True, sort_key=None):
"""generator that yields all audits' values as rows, useful feed to a csv.writer."""
columns = set()
for audit in self:
columns.update(audit.keys())
columns = tuple(sorted(columns, key=sort_key))
if headers:
yield columns
for audit in self:
yield (audit.get_single_val(col, allow_absent=True, absent_val="") for col in columns)
def bytes_to_hexstr(b:bytes, sep=''):
return sep.join(f'{x:02x}' for x in b)
def esim_profile_introspect(upp):
pes = ProfileElementSequence.from_der(upp.read())
d = {}
d['upp'] = repr(pes)
def show_bytes_as_hexdump(item):
if isinstance(item, bytes):
return bytes_to_hexstr(item)
if isinstance(item, list):
return list(show_bytes_as_hexdump(i) for i in item)
if isinstance(item, tuple):
return tuple(show_bytes_as_hexdump(i) for i in item)
if isinstance(item, dict):
d = {}
for k, v in item.items():
d[k] = show_bytes_as_hexdump(v)
return d
return item
l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes)
d['pp'] = pprint.pformat(l, width=120)
return d

View File

@@ -1,229 +0,0 @@
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
#
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: nhofmeyr@sysmocom.de
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import secrets
import re
from osmocom.utils import b2h
class ParamSourceExn(Exception):
pass
class ParamSourceExhaustedExn(ParamSourceExn):
pass
class ParamSourceUndefinedExn(ParamSourceExn):
pass
class ParamSource:
"""abstract parameter source. For usage, see personalization.BatchPersonalization."""
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
name = "none"
numeric_base = None # or 10 or 16
def __init__(self, input_str:str):
"""Subclasses should call super().__init__(input_str) before evaluating self.input_str. Each subclass __init__()
may in turn manipulate self.input_str to apply expansions or decodings."""
self.input_str = input_str
def get_next(self, csv_row:dict=None):
"""Subclasses implement this: return the next value from the parameter source.
When there are no more values from the source, raise a ParamSourceExhaustedExn.
This default implementation is an empty source."""
raise ParamSourceExhaustedExn()
@classmethod
def from_str(cls, input_str:str):
"""compatibility with earlier version of ParamSource. Just use the constructor."""
return cls(input_str)
class ConstantSource(ParamSource):
"""one value for all"""
name = "constant"
def get_next(self, csv_row:dict=None):
return self.input_str
class InputExpandingParamSource(ParamSource):
def __init__(self, input_str:str):
super().__init__(input_str)
self.input_str = self.expand_input_str(self.input_str)
@classmethod
def expand_input_str(cls, input_str:str):
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
if "*" not in input_str:
return input_str
# re: "XX * 123" with optional spaces
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", input_str)
if len(tokens) < 3:
return input_str
parts = []
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
parts.append(unchanged)
repeat = int(repeat_str)
parts.append(snippet * repeat)
return "".join(parts)
class DecimalRangeSource(InputExpandingParamSource):
"""abstract: decimal numbers with a value range"""
numeric_base = 10
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
"""Constructor to set up values from a (user entered) string: DecimalRangeSource(input_str).
Constructor to set up values directly: DecimalRangeSource(num_digits=3, first_value=123, last_value=456)
num_digits produces leading zeros when first_value..last_value are shorter.
"""
assert ((input_str is not None and (num_digits, first_value, last_value) == (None, None, None))
or (input_str is None and None not in (num_digits, first_value, last_value)))
if input_str is not None:
super().__init__(input_str)
input_str = self.input_str
if ".." in input_str:
first_str, last_str = input_str.split('..')
first_str = first_str.strip()
last_str = last_str.strip()
else:
first_str = input_str.strip()
last_str = None
num_digits = len(first_str)
first_value = int(first_str)
last_value = int(last_str if last_str is not None else "9" * num_digits)
assert num_digits > 0
assert first_value <= last_value
self.num_digits = num_digits
self.first_value = first_value
self.last_value = last_value
def val_to_digit(self, val:int):
return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string
class RandomSourceMixin:
random_impl = secrets.SystemRandom()
class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
"""return a different sequence of random decimal digits each"""
name = "random decimal digits"
used_keys = set()
def get_next(self, csv_row:dict=None):
# try to generate random digits that are always different from previously produced random bytes
attempts = 10
while True:
val = self.random_impl.randint(self.first_value, self.last_value)
if val in RandomDigitSource.used_keys:
attempts -= 1
if attempts:
continue
RandomDigitSource.used_keys.add(val)
break
return self.val_to_digit(val)
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
"""return a different sequence of random hexadecimal digits each"""
name = "random hexadecimal digits"
numeric_base = 16
used_keys = set()
def __init__(self, input_str:str):
super().__init__(input_str)
input_str = self.input_str
num_digits = len(input_str.strip())
if num_digits < 1:
raise ValueError("zero number of digits")
# hex digits always come in two
if (num_digits & 1) != 0:
raise ValueError(f"hexadecimal value should have even number of digits, not {num_digits}")
self.num_digits = num_digits
def get_next(self, csv_row:dict=None):
# try to generate random bytes that are always different from previously produced random bytes
attempts = 10
while True:
val = self.random_impl.randbytes(self.num_digits // 2)
if val in RandomHexDigitSource.used_keys:
attempts -= 1
if attempts:
continue
RandomHexDigitSource.used_keys.add(val)
break
return b2h(val)
class IncDigitSource(DecimalRangeSource):
"""incrementing sequence of digits"""
name = "incrementing decimal digits"
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
"""input_str: the first value to return, a string of an integer number with optional leading zero digits. The
leading zero digits are preserved."""
super().__init__(input_str, num_digits, first_value, last_value)
self.next_val = None
self.reset()
def reset(self):
"""Restart from the first value of the defined range passed to __init__()."""
self.next_val = self.first_value
def get_next(self, csv_row:dict=None):
val = self.next_val
if val is None:
raise ParamSourceExhaustedExn()
returnval = self.val_to_digit(val)
val += 1
if val > self.last_value:
self.next_val = None
else:
self.next_val = val
return returnval
class CsvSource(ParamSource):
"""apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)"""
name = "from CSV"
def __init__(self, input_str:str):
"""self.csv_column = input_str:
column name indicating the column to use for this parameter.
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
CsvSource picks the column with the name matching csv_column.
"""
"""Parse input_str into self.num_digits, self.first_value, self.last_value."""
super().__init__(input_str)
self.csv_column = self.input_str
def get_next(self, csv_row:dict=None):
val = None
if csv_row:
val = csv_row.get(self.csv_column)
if not val:
raise ParamSourceUndefinedExn(f"no value for CSV column {self.csv_column!r}")
return val

File diff suppressed because it is too large Load Diff

View File

@@ -673,7 +673,7 @@ class FilesUsimDf5GS(ProfileTemplate):
FileTemplate(0x4f06, 'EF.UAC_AIC', 'TR', None, 4, 2, 0x06, None, True, ass_serv=[126]), FileTemplate(0x4f06, 'EF.UAC_AIC', 'TR', None, 4, 2, 0x06, None, True, ass_serv=[126]),
FileTemplate(0x4f07, 'EF.SUCI_Calc_Info', 'TR', None, None, 2, 0x07, 'FF...FF', False, ass_serv=[124]), FileTemplate(0x4f07, 'EF.SUCI_Calc_Info', 'TR', None, None, 2, 0x07, 'FF...FF', False, ass_serv=[124]),
FileTemplate(0x4f08, 'EF.OPL5G', 'LF', None, 10, 10, 0x08, 'FF...FF', False, ['nb_rec'], ass_serv=[129]), FileTemplate(0x4f08, 'EF.OPL5G', 'LF', None, 10, 10, 0x08, 'FF...FF', False, ['nb_rec'], ass_serv=[129]),
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130], pe_name='ef-supinai'), FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130]),
FileTemplate(0x4f0a, 'EF.Routing_Indicator', 'TR', None, 4, 2, 0x0a, 'F0FFFFFF', False, ass_serv=[124]), FileTemplate(0x4f0a, 'EF.Routing_Indicator', 'TR', None, 4, 2, 0x0a, 'F0FFFFFF', False, ass_serv=[124]),
] ]
@@ -818,7 +818,7 @@ class FilesIsimOptional(ProfileTemplate):
base_path = Path('ADF.ISIM') base_path = Path('ADF.ISIM')
extends = FilesIsimMandatory extends = FilesIsimMandatory
files = [ files = [
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5], pe_name='ef-pcscf'), FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5]),
FileTemplate(0x6f3c, 'EF.SMS', 'LF', 10, 176, 5, None, '00FF...FF', False, ass_serv=[6,8]), FileTemplate(0x6f3c, 'EF.SMS', 'LF', 10, 176, 5, None, '00FF...FF', False, ass_serv=[6,8]),
FileTemplate(0x6f42, 'EF.SMSP', 'LF', 1, 38, 5, None, 'FF...FF', False, ass_serv=[8]), FileTemplate(0x6f42, 'EF.SMSP', 'LF', 1, 38, 5, None, 'FF...FF', False, ass_serv=[8]),
FileTemplate(0x6f43, 'EF.SMSS', 'TR', None, 2, 5, None, 'FFFF', False, ass_serv=[6,8]), FileTemplate(0x6f43, 'EF.SMSS', 'TR', None, 2, 5, None, 'FFFF', False, ass_serv=[6,8]),

View File

@@ -103,26 +103,6 @@ class CheckBasicStructure(ProfileConstraintChecker):
if 'profile-a-p256' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs): if 'profile-a-p256' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs):
raise ProfileError('profile-a-p256 mandatory, but no usim or isim') raise ProfileError('profile-a-p256 mandatory, but no usim or isim')
def check_mandatory_services_aka(self, pes: ProfileElementSequence):
"""Ensure that no unnecessary authentication related services are marked as mandatory but not
actually used within the profile"""
m_svcs = pes.get_pe_for_type('header').decoded['eUICC-Mandatory-services']
# list of tuples (algo_id, key_len_in_octets) for all the akaParameters in the PE Sequence
algo_id_klen = [(x.decoded['algoConfiguration'][1]['algorithmID'],
len(x.decoded['algoConfiguration'][1]['key'])) for x in pes.get_pes_for_type('akaParameter')]
# just a plain list of algorithm IDs in akaParameters
algorithm_ids = [x[0] for x in algo_id_klen]
if 'milenage' in m_svcs and not 1 in algorithm_ids:
raise ProfileError('milenage mandatory, but no related algorithm_id in akaParameter')
if 'tuak128' in m_svcs and not (2, 128/8) in algo_id_klen:
raise ProfileError('tuak128 mandatory, but no related algorithm_id in akaParameter')
if 'cave' in m_svcs and not pes.get_pe_for_type('cdmaParameter'):
raise ProfileError('cave mandatory, but no related cdmaParameter')
if 'tuak256' in m_svcs and (2, 256/8) in algo_id_klen:
raise ProfileError('tuak256 mandatory, but no related algorithm_id in akaParameter')
if 'usim-test-algorithm' in m_svcs and not 3 in algorithm_ids:
raise ProfileError('usim-test-algorithm mandatory, but no related algorithm_id in akaParameter')
def check_identification_unique(self, pes: ProfileElementSequence): def check_identification_unique(self, pes: ProfileElementSequence):
"""Ensure that each PE has a unique identification value.""" """Ensure that each PE has a unique identification value."""
id_list = [pe.header['identification'] for pe in pes.pe_list if pe.header] id_list = [pe.header['identification'] for pe in pes.pe_list if pe.header]

View File

@@ -181,7 +181,7 @@ class SeqNumber(BER_TLV_IE, tag=0x80):
class NotificationAddress(BER_TLV_IE, tag=0x0c): class NotificationAddress(BER_TLV_IE, tag=0x0c):
_construct = Utf8Adapter(GreedyBytes) _construct = Utf8Adapter(GreedyBytes)
class Iccid(BER_TLV_IE, tag=0x5a): class Iccid(BER_TLV_IE, tag=0x5a):
_construct = PaddedBcdAdapter(GreedyBytes) _construct = BcdAdapter(GreedyBytes)
class NotificationMetadata(BER_TLV_IE, tag=0xbf2f, nested=[SeqNumber, ProfileMgmtOperation, class NotificationMetadata(BER_TLV_IE, tag=0xbf2f, nested=[SeqNumber, ProfileMgmtOperation,
NotificationAddress, Iccid]): NotificationAddress, Iccid]):
pass pass

View File

@@ -1,6 +1,6 @@
# GlobalPlatform install parameter generator # GlobalPlatform install parameter generator
# #
# (C) 2024 by sysmocom - s.f.m.c. GmbH # (C) 2024 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify

View File

@@ -266,13 +266,11 @@ class SCP02(SCP):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def dek_encrypt(self, plaintext:bytes) -> bytes: def dek_encrypt(self, plaintext:bytes) -> bytes:
# See also GPC section B.1.1.2, E.4.7, and E.4.1 cipher = DES.new(self.card_keys.dek[:8], DES.MODE_ECB)
cipher = DES3.new(self.sk.data_enc, DES.MODE_ECB)
return cipher.encrypt(plaintext) return cipher.encrypt(plaintext)
def dek_decrypt(self, ciphertext:bytes) -> bytes: def dek_decrypt(self, ciphertext:bytes) -> bytes:
# See also GPC section B.1.1.2, E.4.7, and E.4.1 cipher = DES.new(self.card_keys.dek[:8], DES.MODE_ECB)
cipher = DES3.new(self.sk.data_enc, DES.MODE_ECB)
return cipher.decrypt(ciphertext) return cipher.decrypt(ciphertext)
def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes): def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes):

View File

@@ -91,7 +91,6 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
# Key Usage: # Key Usage:
# KVN 0x01 .. 0x0F reserved for SCP80 # KVN 0x01 .. 0x0F reserved for SCP80
# KVN 0x81 .. 0x8f reserved for SCP81
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226 # KVN 0x11 reserved for DAP specified in ETSI TS 102 226
# KVN 0x20 .. 0x2F reserved for SCP02 # KVN 0x20 .. 0x2F reserved for SCP02
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK # KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK

View File

@@ -1,6 +1,6 @@
# JavaCard related utilities # JavaCard related utilities
# #
# (C) 2024 by sysmocom - s.f.m.c. GmbH # (C) 2024 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify

View File

@@ -4,7 +4,7 @@
""" """
# #
# (C) 2025 by sysmocom - s.f.m.c. GmbH # (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# Author: Philipp Maier <pmaier@sysmocom.de> # Author: Philipp Maier <pmaier@sysmocom.de>
@@ -44,7 +44,7 @@ class PySimLogger:
""" """
LOG_FMTSTR = "%(levelname)s: %(message)s" LOG_FMTSTR = "%(levelname)s: %(message)s"
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- " + LOG_FMTSTR LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
__formatter = logging.Formatter(LOG_FMTSTR) __formatter = logging.Formatter(LOG_FMTSTR)
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE) __formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
@@ -108,7 +108,7 @@ class PySimLogger:
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record) formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
color = PySimLogger.colors.get(record.levelno) color = PySimLogger.colors.get(record.levelno)
if color: if color:
if isinstance(color, str): if type(color) is str:
PySimLogger.print_callback(color + formatted_message + "\033[0m") PySimLogger.print_callback(color + formatted_message + "\033[0m")
else: else:
PySimLogger.print_callback(style(formatted_message, fg = color)) PySimLogger.print_callback(style(formatted_message, fg = color))

View File

@@ -57,13 +57,12 @@ CompactRemoteResp = Struct('number_of_commands'/Int8ub,
'last_response_data'/HexAdapter(GreedyBytes)) 'last_response_data'/HexAdapter(GreedyBytes))
RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3) RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
CNTR_REQ = Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, counter_must_be_higher=2, counter_must_be_lower=3)
POR_REQ = Enum(BitsInteger(2), no_por=0, por_required=1, por_only_when_error=2)
# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2 # TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
SPI = BitStruct( # first octet SPI = BitStruct( # first octet
Padding(3), Padding(3),
'counter'/CNTR_REQ, 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
counter_must_be_higher=2, counter_must_be_lower=3),
'ciphering'/Flag, 'ciphering'/Flag,
'rc_cc_ds'/RC_CC_DS, 'rc_cc_ds'/RC_CC_DS,
# second octet # second octet
@@ -71,7 +70,8 @@ SPI = BitStruct( # first octet
'por_in_submit'/Flag, 'por_in_submit'/Flag,
'por_shall_be_ciphered'/Flag, 'por_shall_be_ciphered'/Flag,
'por_rc_cc_ds'/RC_CC_DS, 'por_rc_cc_ds'/RC_CC_DS,
'por'/POR_REQ 'por'/Enum(BitsInteger(2), no_por=0,
por_required=1, por_only_when_error=2)
) )
# TS 102 225 Section 5.1.2 # TS 102 225 Section 5.1.2

View File

@@ -4,7 +4,7 @@
""" """
# #
# (C) 2021 by sysmocom - s.f.m.c. GmbH # (C) 2021 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify

View File

@@ -1,5 +1,4 @@
# coding=utf-8 # coding=utf-8
"""Representation of the runtime state of an application like pySim-shell. """Representation of the runtime state of an application like pySim-shell.
""" """
@@ -26,7 +25,7 @@ from pySim.exceptions import *
from pySim.filesystem import * from pySim.filesystem import *
from pySim.log import PySimLogger from pySim.log import PySimLogger
log = PySimLogger.get(__name__) log = PySimLogger.get("RUNTIME")
def lchan_nr_from_cla(cla: int) -> int: def lchan_nr_from_cla(cla: int) -> int:
"""Resolve the logical channel number from the CLA byte.""" """Resolve the logical channel number from the CLA byte."""
@@ -116,7 +115,7 @@ class RuntimeState:
for a in aids_unknown: for a in aids_unknown:
log.info(" unknown: %s (EF.DIR)" % a) log.info(" unknown: %s (EF.DIR)" % a)
else: else:
log.warning("EF.DIR seems to be empty!") log.warn("EF.DIR seems to be empty!")
# Some card applications may not be registered in EF.DIR, we will actively # Some card applications may not be registered in EF.DIR, we will actively
# probe for those applications # probe for those applications
@@ -557,8 +556,8 @@ class RuntimeLchan:
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" % raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
(data_len, writeable_name, writeable_size, data_len - writeable_size)) (data_len, writeable_name, writeable_size, data_len - writeable_size))
elif data_len < writeable_size: elif data_len < writeable_size:
log.warning("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" % log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name)) (data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
def update_binary(self, data_hex: str, offset: int = 0): def update_binary(self, data_hex: str, offset: int = 0):
"""Update transparent EF binary data. """Update transparent EF binary data.

View File

@@ -3,6 +3,18 @@
""" pySim: PCSC reader transport link base """ pySim: PCSC reader transport link base
""" """
import os
import abc
import argparse
from typing import Optional, Tuple
from construct import Construct
from osmocom.utils import b2h, h2b, i2h, Hexstr
from pySim.exceptions import *
from pySim.utils import SwHexstr, SwMatchstr, ResTuple, sw_match, parse_command_apdu
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
#
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com> # Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
# Copyright (C) 2021-2023 Harald Welte <laforge@osmocom.org> # Copyright (C) 2021-2023 Harald Welte <laforge@osmocom.org>
# #
@@ -18,20 +30,8 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import abc
import argparse
from typing import Optional, Tuple
from construct import Construct
from osmocom.utils import b2h, h2b, i2h, Hexstr
from pySim.exceptions import *
from pySim.utils import SwHexstr, SwMatchstr, ResTuple, sw_match, parse_command_apdu
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
from pySim.log import PySimLogger
log = PySimLogger.get(__name__)
class ApduTracer: class ApduTracer:
def trace_command(self, cmd): def trace_command(self, cmd):
@@ -46,11 +46,11 @@ class ApduTracer:
class StdoutApduTracer(ApduTracer): class StdoutApduTracer(ApduTracer):
"""Minimalistic APDU tracer, printing commands to stdout.""" """Minimalistic APDU tracer, printing commands to stdout."""
def trace_response(self, cmd, sw, resp): def trace_response(self, cmd, sw, resp):
log.info("-> %s %s", cmd[:10], cmd[10:]) print("-> %s %s" % (cmd[:10], cmd[10:]))
log.info("<- %s: %s", sw, resp) print("<- %s: %s" % (sw, resp))
def trace_reset(self): def trace_reset(self):
log.info("-- RESET") print("-- RESET")
class ProactiveHandler(abc.ABC): class ProactiveHandler(abc.ABC):
"""Abstract base class representing the interface of some code that handles """Abstract base class representing the interface of some code that handles
@@ -177,7 +177,7 @@ class LinkBase(abc.ABC):
if self.apdu_strict: if self.apdu_strict:
raise ValueError(exeption_str) raise ValueError(exeption_str)
else: else:
log.warning(exeption_str) print('Warning: %s' % exeption_str)
return (data, sw) return (data, sw)
@@ -211,7 +211,7 @@ class LinkBase(abc.ABC):
# parse the proactive command # parse the proactive command
pcmd = ProactiveCommand() pcmd = ProactiveCommand()
parsed = pcmd.from_tlv(h2b(fetch_rv[0])) parsed = pcmd.from_tlv(h2b(fetch_rv[0]))
log.info("FETCH: %s (%s)", fetch_rv[0], type(parsed).__name__) print("FETCH: %s (%s)" % (fetch_rv[0], type(parsed).__name__))
if self.proactive_handler: if self.proactive_handler:
# Extension point: If this does return a list of TLV objects, # Extension point: If this does return a list of TLV objects,
# they could be appended after the Result; if the first is a # they could be appended after the Result; if the first is a
@@ -361,13 +361,13 @@ def init_reader(opts, **kwargs) -> LinkBase:
from pySim.transport.modem_atcmd import ModemATCommandLink from pySim.transport.modem_atcmd import ModemATCommandLink
sl = ModemATCommandLink(opts, **kwargs) sl = ModemATCommandLink(opts, **kwargs)
else: # Serial reader is default else: # Serial reader is default
log.warning("No reader/driver specified; falling back to default (Serial reader)") print("No reader/driver specified; falling back to default (Serial reader)")
from pySim.transport.serial import SerialSimLink from pySim.transport.serial import SerialSimLink
sl = SerialSimLink(opts, **kwargs) sl = SerialSimLink(opts, **kwargs)
if os.environ.get('PYSIM_INTEGRATION_TEST') == "1": if os.environ.get('PYSIM_INTEGRATION_TEST') == "1":
log.info("Using %s reader interface" % (sl.name)) print("Using %s reader interface" % (sl.name))
else: else:
log.info("Using reader %s" % sl) print("Using reader %s" % sl)
return sl return sl

View File

@@ -166,7 +166,7 @@ class ModemATCommandLink(LinkBaseTpdu):
# Make sure that the response has format: b'+CSIM: %d,\"%s\"' # Make sure that the response has format: b'+CSIM: %d,\"%s\"'
try: try:
result = re.match(rb'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp) result = re.match(b'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
(_rsp_tpdu_len, rsp_tpdu) = result.groups() (_rsp_tpdu_len, rsp_tpdu) = result.groups()
except Exception as exc: except Exception as exc:
raise ReaderError('Failed to parse response from modem: %s' % rsp) from exc raise ReaderError('Failed to parse response from modem: %s' % rsp) from exc

View File

@@ -486,17 +486,17 @@ class EF_UST(EF_UServiceTable):
# TS 31.103 Section 4.2.7 - *not* the same as DF.GSM/EF.ECC! # TS 31.103 Section 4.2.7 - *not* the same as DF.GSM/EF.ECC!
class EF_ECC(LinFixedEF): class EF_ECC(LinFixedEF):
_test_de_encode = [ _test_de_encode = [
( '19f1ff01', { "call_code": "911", ( '19f1ff01', { "call_code": "911f",
"service_category": { "police": True, "ambulance": False, "fire_brigade": False, "service_category": { "police": True, "ambulance": False, "fire_brigade": False,
"marine_guard": False, "mountain_rescue": False, "marine_guard": False, "mountain_rescue": False,
"manual_ecall": False, "automatic_ecall": False } } ), "manual_ecall": False, "automatic_ecall": False } } ),
( '19f3ff02', { "call_code": "913", ( '19f3ff02', { "call_code": "913f",
"service_category": { "police": False, "ambulance": True, "fire_brigade": False, "service_category": { "police": False, "ambulance": True, "fire_brigade": False,
"marine_guard": False, "mountain_rescue": False, "marine_guard": False, "mountain_rescue": False,
"manual_ecall": False, "automatic_ecall": False } } ), "manual_ecall": False, "automatic_ecall": False } } ),
] ]
_test_no_pad = True _test_no_pad = True
cc_construct = PaddedBcdAdapter(Rpad(Bytes(3))) cc_construct = BcdAdapter(Rpad(Bytes(3)))
category_construct = FlagsEnum(Byte, police=1, ambulance=2, fire_brigade=3, marine_guard=4, category_construct = FlagsEnum(Byte, police=1, ambulance=2, fire_brigade=3, marine_guard=4,
mountain_rescue=5, manual_ecall=6, automatic_ecall=7) mountain_rescue=5, manual_ecall=6, automatic_ecall=7)
alpha_construct = GsmOrUcs2Adapter(Rpad(GreedyBytes)) alpha_construct = GsmOrUcs2Adapter(Rpad(GreedyBytes))
@@ -596,7 +596,7 @@ class EF_ICI(CyclicEF):
self._construct = Struct('alpha_id'/Bytes(this._.total_len-28), self._construct = Struct('alpha_id'/Bytes(this._.total_len-28),
'len_of_bcd_contents'/Int8ub, 'len_of_bcd_contents'/Int8ub,
'ton_npi'/Int8ub, 'ton_npi'/Int8ub,
'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))), 'call_number'/BcdAdapter(Bytes(10)),
'cap_cfg2_record_id'/Int8ub, 'cap_cfg2_record_id'/Int8ub,
'ext5_record_id'/Int8ub, 'ext5_record_id'/Int8ub,
'date_and_time'/BcdAdapter(Bytes(7)), 'date_and_time'/BcdAdapter(Bytes(7)),
@@ -612,7 +612,7 @@ class EF_OCI(CyclicEF):
self._construct = Struct('alpha_id'/Bytes(this._.total_len-27), self._construct = Struct('alpha_id'/Bytes(this._.total_len-27),
'len_of_bcd_contents'/Int8ub, 'len_of_bcd_contents'/Int8ub,
'ton_npi'/Int8ub, 'ton_npi'/Int8ub,
'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))), 'call_number'/BcdAdapter(Bytes(10)),
'cap_cfg2_record_id'/Int8ub, 'cap_cfg2_record_id'/Int8ub,
'ext5_record_id'/Int8ub, 'ext5_record_id'/Int8ub,
'date_and_time'/BcdAdapter(Bytes(7)), 'date_and_time'/BcdAdapter(Bytes(7)),
@@ -1118,7 +1118,7 @@ class EF_Routing_Indicator(TransparentEF):
# responsibility of home network operator but BCD coding shall be used. If a network # responsibility of home network operator but BCD coding shall be used. If a network
# operator decides to assign less than 4 digits to Routing Indicator, the remaining digits # operator decides to assign less than 4 digits to Routing Indicator, the remaining digits
# shall be coded as "1111" to fill the 4 digits coding of Routing Indicator # shall be coded as "1111" to fill the 4 digits coding of Routing Indicator
self._construct = Struct('routing_indicator'/PaddedBcdAdapter(Rpad(Bytes(2))), self._construct = Struct('routing_indicator'/Rpad(BcdAdapter(Bytes(2)), 'f', 2),
'rfu'/Bytes(2)) 'rfu'/Bytes(2))
# TS 31.102 Section 4.4.11.13 (Rel 16) # TS 31.102 Section 4.4.11.13 (Rel 16)

View File

@@ -40,7 +40,6 @@ from osmocom.utils import *
from osmocom.construct import * from osmocom.construct import *
from pySim.utils import dec_iccid, enc_iccid, dec_imsi, enc_imsi, dec_plmn, enc_plmn, dec_xplmn_w_act from pySim.utils import dec_iccid, enc_iccid, dec_imsi, enc_imsi, dec_plmn, enc_plmn, dec_xplmn_w_act
from pySim.utils import bytes_for_nibbles
from pySim.profile import CardProfile, CardProfileAddon from pySim.profile import CardProfile, CardProfileAddon
from pySim.filesystem import * from pySim.filesystem import *
from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X
@@ -152,7 +151,7 @@ class EF_ADN(LinFixedEF):
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))), self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))),
'len_of_bcd'/Int8ub, 'len_of_bcd'/Int8ub,
'ton_npi'/TonNpi, 'ton_npi'/TonNpi,
'dialing_nr'/ExtendedBcdAdapter(PaddedBcdAdapter(Rpad(Bytes(10)))), 'dialing_nr'/ExtendedBcdAdapter(BcdAdapter(Rpad(Bytes(10)))),
'cap_conf_id'/Int8ub, 'cap_conf_id'/Int8ub,
ext_name/Int8ub) ext_name/Int8ub)
@@ -193,11 +192,11 @@ class EF_MSISDN(LinFixedEF):
( 'ffffffffffffffffffffffffffffffffffffffff04b12143f5ffffffffffffffffff', ( 'ffffffffffffffffffffffffffffffffffffffff04b12143f5ffffffffffffffffff',
{"alpha_id": "", "len_of_bcd": 4, "ton_npi": {"ext": True, "type_of_number": "network_specific", {"alpha_id": "", "len_of_bcd": 4, "ton_npi": {"ext": True, "type_of_number": "network_specific",
"numbering_plan_id": "isdn_e164"}, "numbering_plan_id": "isdn_e164"},
"dialing_nr": "12345"}), "dialing_nr": "12345f"}),
( '456967656e65205275666e756d6d6572ffffffff0891947172199181f3ffffffffff', ( '456967656e65205275666e756d6d6572ffffffff0891947172199181f3ffffffffff',
{"alpha_id": "Eigene Rufnummer", "len_of_bcd": 8, "ton_npi": {"ext": True, "type_of_number": "international", {"alpha_id": "Eigene Rufnummer", "len_of_bcd": 8, "ton_npi": {"ext": True, "type_of_number": "international",
"numbering_plan_id": "isdn_e164"}, "numbering_plan_id": "isdn_e164"},
"dialing_nr": "4917279119183"}), "dialing_nr": "4917279119183f"}),
] ]
# Ensure deprecated representations still work # Ensure deprecated representations still work
@@ -215,7 +214,7 @@ class EF_MSISDN(LinFixedEF):
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))), self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))),
'len_of_bcd'/Int8ub, 'len_of_bcd'/Int8ub,
'ton_npi'/TonNpi, 'ton_npi'/TonNpi,
'dialing_nr'/ExtendedBcdAdapter(PaddedBcdAdapter(Rpad(Bytes(10)))), 'dialing_nr'/ExtendedBcdAdapter(BcdAdapter(Rpad(Bytes(10)))),
Padding(2, pattern=b'\xff')) Padding(2, pattern=b'\xff'))
# Maintain compatibility with deprecated representations # Maintain compatibility with deprecated representations
@@ -240,20 +239,11 @@ class EF_MSISDN(LinFixedEF):
# TS 51.011 Section 10.5.6 # TS 51.011 Section 10.5.6
class EF_SMSP(LinFixedEF): class EF_SMSP(LinFixedEF):
_test_de_encode = [ # FIXME: re-encode fails / missing alpha_id at start of output
( '534d5343ffffffffffffffffffffffffe1ffffffffffffffffffffffff0891945197109099f9ffffff0000a9', _test_decode = [
{ "alpha_id": "SMSC", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
"tp_pid": True, "tp_dcs": True, "tp_vp": True },
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" },
"call_number": "" },
"tp_sc_addr": { "length": 8, "ton_npi": { "ext": True, "type_of_number": "international",
"numbering_plan_id": "isdn_e164" },
"call_number": "4915790109999" },
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
( '454e6574776f726b73fffffffffffffff1ffffffffffffffffffffffffffffffffffffffffffffffff0000a7', ( '454e6574776f726b73fffffffffffffff1ffffffffffffffffffffffffffffffffffffffffffffffff0000a7',
{ "alpha_id": "ENetworks", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True, { "alpha_id": "ENetworks", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
"tp_pid": True, "tp_dcs": True, "tp_vp": False }, "tp_pid": True, "tp_dcs": True, "tp_vp": True },
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension", "tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" }, "numbering_plan_id": "reserved_for_extension" },
"call_number": "" }, "call_number": "" },
@@ -287,26 +277,12 @@ class EF_SMSP(LinFixedEF):
else: else:
raise ValueError raise ValueError
@staticmethod
def sc_addr_len(ctx):
"""Compute the length field for an address field (like TP-DestAddr or TP-ScAddr)."""
if not hasattr(ctx, 'call_number') or len(ctx.call_number) == 0:
return 0xff
else:
return bytes_for_nibbles(len(ctx.call_number)) + 1
def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs): def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs):
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs) super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs)
ScAddr = Struct('length'/Rebuild(Int8ub, lambda ctx: EF_SMSP.sc_addr_len(ctx)), ScAddr = Struct('length'/Int8ub, 'ton_npi'/TonNpi, 'call_number'/BcdAdapter(Rpad(Bytes(10))))
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))), self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))),
'parameter_indicators'/InvertAdapter(BitStruct( 'parameter_indicators'/InvertAdapter(FlagsEnum(Byte, tp_dest_addr=1, tp_sc_addr=2,
Const(7, BitsInteger(3)), tp_pid=3, tp_dcs=4, tp_vp=5)),
'tp_vp'/Flag,
'tp_dcs'/Flag,
'tp_pid'/Flag,
'tp_sc_addr'/Flag,
'tp_dest_addr'/Flag)),
'tp_dest_addr'/ScAddr, 'tp_dest_addr'/ScAddr,
'tp_sc_addr'/ScAddr, 'tp_sc_addr'/ScAddr,
@@ -661,12 +637,12 @@ class EF_AD(TransparentEF):
# TS 51.011 Section 10.3.20 / 10.3.22 # TS 51.011 Section 10.3.20 / 10.3.22
class EF_VGCS(TransRecEF): class EF_VGCS(TransRecEF):
_test_de_encode = [ _test_de_encode = [
( "92f9ffff", "299" ), ( "92f9ffff", "299fffff" ),
] ]
def __init__(self, fid='6fb1', sfid=None, name='EF.VGCS', size=(4, 200), rec_len=4, def __init__(self, fid='6fb1', sfid=None, name='EF.VGCS', size=(4, 200), rec_len=4,
desc='Voice Group Call Service', **kwargs): desc='Voice Group Call Service', **kwargs):
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, rec_len=rec_len, **kwargs) super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, rec_len=rec_len, **kwargs)
self._construct = PaddedBcdAdapter(Rpad(Bytes(4))) self._construct = BcdAdapter(Bytes(4))
# TS 51.011 Section 10.3.21 / 10.3.23 # TS 51.011 Section 10.3.21 / 10.3.23
class EF_VGCSS(TransparentEF): class EF_VGCSS(TransparentEF):

View File

@@ -526,13 +526,6 @@ def expand_hex(hexstring, length):
# no change # no change
return hexstring return hexstring
def bytes_for_nibbles(num_nibbles: int) -> int:
"""compute the number of bytes needed to store the given number of nibbles."""
n_bytes = num_nibbles // 2
if num_nibbles & 1:
n_bytes += 1
return n_bytes
def boxed_heading_str(heading, width=80): def boxed_heading_str(heading, width=80):
"""Generate a string that contains a boxed heading.""" """Generate a string that contains a boxed heading."""
@@ -631,17 +624,15 @@ def decomposeATR(atr_txt):
Returns: Returns:
dictionary of field and values dictionary of field and values
Example:: >>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
{ 'T0': {'value': 167},
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52") 'TB': {1: {'value': 0}},
{ 'T0': {'value': 167}, 'TC': {2: {'value': 24}},
'TB': {1: {'value': 0}}, 'TD': {1: {'value': 64}},
'TC': {2: {'value': 24}}, 'TS': {'value': 59},
'TD': {1: {'value': 64}}, 'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
'TS': {'value': 59}, 'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82], 'hbn': 7}
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
'hbn': 7}
""" """
ATR_PROTOCOL_TYPE_T0 = 0 ATR_PROTOCOL_TYPE_T0 = 0
atr_txt = normalizeATR(atr_txt) atr_txt = normalizeATR(atr_txt)

View File

@@ -5,7 +5,7 @@ cmd2>=2.6.2,<3.0
jsonpath-ng jsonpath-ng
construct>=2.10.70 construct>=2.10.70
bidict bidict
pyosmocom>=0.0.12 pyosmocom>=0.0.9
pyyaml>=5.1 pyyaml>=5.1
termcolor termcolor
colorlog colorlog
@@ -15,4 +15,4 @@ git+https://github.com/osmocom/asn1tools
packaging packaging
git+https://github.com/hologram-io/smpp.pdu git+https://github.com/hologram-io/smpp.pdu
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
smpplib psycopg2-binary

View File

@@ -25,7 +25,7 @@ setup(
"jsonpath-ng", "jsonpath-ng",
"construct >= 2.10.70", "construct >= 2.10.70",
"bidict", "bidict",
"pyosmocom >= 0.0.12", "pyosmocom >= 0.0.9",
"pyyaml >= 5.1", "pyyaml >= 5.1",
"termcolor", "termcolor",
"colorlog", "colorlog",
@@ -34,6 +34,7 @@ setup(
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu", "smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
"asn1tools", "asn1tools",
"smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted", "smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted",
"psycopg2-binary"
], ],
scripts=[ scripts=[
'pySim-prog.py', 'pySim-prog.py',
@@ -55,10 +56,6 @@ setup(
"service-identity", "service-identity",
"pyopenssl", "pyopenssl",
"requests", "requests",
"smpplib",
],
"CardKeyProviderPgsql": [
"psycopg2-binary",
] ]
}, },
) )

View File

@@ -2200,9 +2200,9 @@ update_record 6 fe0112ffb53e96e5ff99731d51ad7beafd0e23ffffffffffffffffffffffffff
update_record 7 fe02101da012f436d06824ecdd15050419ff9affffffffffffffffffffffffffffffff update_record 7 fe02101da012f436d06824ecdd15050419ff9affffffffffffffffffffffffffffffff
update_record 8 fe02116929a373388ac904aff57ff57f6b3431ffffffffffffffffffffffffffffffff update_record 8 fe02116929a373388ac904aff57ff57f6b3431ffffffffffffffffffffffffffffffff
update_record 9 fe0212a99245a5dc814e2f4c1aa908e9946e03ffffffffffffffffffffffffffffffff update_record 9 fe0212a99245a5dc814e2f4c1aa908e9946e03ffffffffffffffffffffffffffffffff
update_record 10 fe03601111111111111111111111111111111111111111111111111111111111111111 update_record 10 fe0310521312c05a9aea93d70d44405172a580ffffffffffffffffffffffffffffffff
update_record 11 fe03612222222222222222222222222222222222222222222222222222222222222222 update_record 11 fe0311a9e45c72d45abde7db74261ee0c11b1bffffffffffffffffffffffffffffffff
update_record 12 fe03623333333333333333333333333333333333333333333333333333333333333333 update_record 12 fe0312867ba36b5873d60ea8b2cdcf3c0ddddaffffffffffffffffffffffffffffffff
# #
################################################################################ ################################################################################
# MF/DF.SYSTEM/EF.SIM_AUTH_COUNTER # # MF/DF.SYSTEM/EF.SIM_AUTH_COUNTER #

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000111
GID1: ffffffffffffffff GID1: ffffffffffffffff
GID2: ffffffffffffffff GID2: ffffffffffffffff
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
SMSC: 0015555
SPN: Fairwaves SPN: Fairwaves
Show in HPLMN: False Show in HPLMN: False
Hide in OPLMN: False Hide in OPLMN: False

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: Can't read file -- SW match failed! Expected 9000 and got 6a82. GID1: Can't read file -- SW match failed! Expected 9000 and got 6a82.
GID2: Can't read file -- SW match failed! Expected 9000 and got 6a82. GID2: Can't read file -- SW match failed! Expected 9000 and got 6a82.
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
SMSC: 0015555
SPN: wavemobile SPN: wavemobile
Show in HPLMN: False Show in HPLMN: False
Hide in OPLMN: False Hide in OPLMN: False

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: Can't read file -- SW match failed! Expected 9000 and got 9404. GID1: Can't read file -- SW match failed! Expected 9000 and got 9404.
GID2: Can't read file -- SW match failed! Expected 9000 and got 9404. GID2: Can't read file -- SW match failed! Expected 9000 and got 9404.
SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000 SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic SPN: Magic
Show in HPLMN: True Show in HPLMN: True
Hide in OPLMN: False Hide in OPLMN: False

View File

@@ -2,7 +2,7 @@
# Utility to verify the functionality of pySim-prog.py # Utility to verify the functionality of pySim-prog.py
# #
# (C) 2018 by sysmocom - s.f.m.c. GmbH # (C) 2018 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# Author: Philipp Maier # Author: Philipp Maier

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: ffffffffffffffffffff GID1: ffffffffffffffffffff
GID2: ffffffffffffffffffff GID2: ffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000 SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic SPN: Magic
Show in HPLMN: True Show in HPLMN: True
Hide in OPLMN: True Hide in OPLMN: True

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: ffffffffffffffffffff GID1: ffffffffffffffffffff
GID2: ffffffffffffffffffff GID2: ffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000 SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic SPN: Magic
Show in HPLMN: True Show in HPLMN: True
Hide in OPLMN: True Hide in OPLMN: True

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: ffffffffffffffffffff GID1: ffffffffffffffffffff
GID2: ffffffffffffffffffff GID2: ffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000 SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic SPN: Magic
Show in HPLMN: True Show in HPLMN: True
Hide in OPLMN: True Hide in OPLMN: True

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: Can't read file -- SW match failed! Expected 9000 and got 9404. GID1: Can't read file -- SW match failed! Expected 9000 and got 9404.
GID2: Can't read file -- SW match failed! Expected 9000 and got 9404. GID2: Can't read file -- SW match failed! Expected 9000 and got 9404.
SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000 SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Not available SPN: Not available
Show in HPLMN: False Show in HPLMN: False
Hide in OPLMN: False Hide in OPLMN: False

View File

@@ -15,7 +15,7 @@
}, },
{ {
"profile_info": { "profile_info": {
"iccid": "8949449999999990031", "iccid": "8949449999999990031f",
"isdp_aid": "a0000005591010ffffffff8900001200", "isdp_aid": "a0000005591010ffffffff8900001200",
"profile_state": "disabled", "profile_state": "disabled",
"service_provider_name": "OsmocomSPN", "service_provider_name": "OsmocomSPN",

View File

@@ -23,7 +23,7 @@ import os
import json import json
from utils import * from utils import *
# This testcase requires a sysmoEUICC1-C2T with the test prfile TS48V1-B-UNIQUE (ICCID 8949449999999990031) # This testcase requires a sysmoEUICC1-C2T with the test prfile TS48V1-B-UNIQUE (ICCID 8949449999999990031f)
# installed, and in disabled state. Also the profile must be installed in such a way that notifications are # installed, and in disabled state. Also the profile must be installed in such a way that notifications are
# generated when the profile is disabled or enabled (ProfileMetadata) # generated when the profile is disabled or enabled (ProfileMetadata)
@@ -56,7 +56,7 @@ class test_case(UnittestUtils):
self.runPySimShell(cardname, "test_enable_disable_profile.script") self.runPySimShell(cardname, "test_enable_disable_profile.script")
self.assertEqualFiles("enable_disable_profile.tmp") self.assertEqualFiles("enable_disable_profile.tmp")
def test_set_nickname(self): def test_enable_disable_profile(self):
cardname = 'sysmoEUICC1-C2T' cardname = 'sysmoEUICC1-C2T'
self.runPySimShell(cardname, "test_set_nickname.script") self.runPySimShell(cardname, "test_set_nickname.script")

View File

@@ -3,9 +3,6 @@ set echo true
select ADF.ISD-R select ADF.ISD-R
# Ensure that the test-profile we intend to test with is actually enabled
enable_profile --iccid 89000123456789012341
# by ICCID (pre-installed test profile on sysmoEUICC1-C2T) # by ICCID (pre-installed test profile on sysmoEUICC1-C2T)
disable_profile --iccid 89000123456789012341 > enable_disable_profile.tmp disable_profile --iccid 89000123456789012341 > enable_disable_profile.tmp
enable_profile --iccid 89000123456789012341 >> enable_disable_profile.tmp enable_profile --iccid 89000123456789012341 >> enable_disable_profile.tmp

View File

@@ -3,11 +3,6 @@ set echo true
select ADF.ISD-R select ADF.ISD-R
# Ensure that the test-profile is actually enabled. (In case te test-profile
# was disabled, a notification may be generated. The testcase should tolerate
# that)
enable_profile --iccid 89000123456789012341
# Generate two (additional) notifications by quickly enabeling the test profile # Generate two (additional) notifications by quickly enabeling the test profile
enable_profile --iccid 8949449999999990031 enable_profile --iccid 8949449999999990031f
enable_profile --iccid 89000123456789012341 enable_profile --iccid 89000123456789012341

View File

@@ -1,10 +1,5 @@
set debug true set debug true
set echo true set echo true
# The output of get_profiles_info will also include the "profile_state", which
# can be either "enabled" or "disabled". Ensure that the correct profile is
# enabled.
enable_profile --iccid 89000123456789012341
select ADF.ISD-R select ADF.ISD-R
get_profiles_info > get_profiles_info.tmp get_profiles_info > get_profiles_info.tmp

View File

@@ -19,7 +19,7 @@
"type_of_number": "reserved_for_extension", "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" "numbering_plan_id": "reserved_for_extension"
}, },
"dialing_nr": "1234567", "dialing_nr": "123456",
"cap_conf_id": 42, "cap_conf_id": 42,
"ext4_record_id": 23 "ext4_record_id": 23
}, },
@@ -67,7 +67,7 @@
"type_of_number": "reserved_for_extension", "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" "numbering_plan_id": "reserved_for_extension"
}, },
"dialing_nr": "1234567", "dialing_nr": "123456",
"cap_conf_id": 42, "cap_conf_id": 42,
"ext4_record_id": 23 "ext4_record_id": 23
}, },
@@ -127,7 +127,7 @@
"type_of_number": "reserved_for_extension", "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" "numbering_plan_id": "reserved_for_extension"
}, },
"dialing_nr": "1234567", "dialing_nr": "123456",
"cap_conf_id": 42, "cap_conf_id": 42,
"ext4_record_id": 23 "ext4_record_id": 23
} }
@@ -140,7 +140,7 @@
"type_of_number": "reserved_for_extension", "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" "numbering_plan_id": "reserved_for_extension"
}, },
"dialing_nr": "1234567", "dialing_nr": "123456",
"cap_conf_id": 42, "cap_conf_id": 42,
"ext4_record_id": 23 "ext4_record_id": 23
} }

View File

@@ -1,216 +0,0 @@
#!/bin/bash
# Utility to verify the functionality of pySim-trace.py
#
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
PYSIM_SHELL=./pySim-shell.py
PYSIM_SHELL_LOG=./pySim-shell.log
PYSIM_SMPP2SIM=./pySim-smpp2sim.py
PYSIM_SMPP2SIM_LOG=./pySim-smpp2sim.log
PYSIM_SMPP2SIM_PORT=2775
PYSIM_SMPP2SIM_TIMEOUT=10
PYSIM_SMPPOTATOOL=./contrib/smpp-ota-tool.py
PYSIM_SMPPOTATOOL_LOG=./smpp-ota-tool.log
function dump_logs {
echo ""
echo "$PYSIM_SMPPOTATOOL_LOG"
echo "------------8<------------"
cat $PYSIM_SMPPOTATOOL_LOG
echo "------------8<------------"
echo ""
echo "$PYSIM_SMPP2SIM_LOG"
echo "------------8<------------"
cat $PYSIM_SMPP2SIM_LOG
echo "------------8<------------"
}
function send_test_request {
echo ""
echo "Sending request to SMPP server:"
C_APDU=$1
R_APDU_EXPECTED=$2
echo "Sending: $C_APDU"
COMMANDLINE="$PYSIM_SMPPOTATOOL --verbose --port $PYSIM_SMPP2SIM_PORT --kic $KIC --kid $KID --kic-idx $KEY_INDEX --kid-idx $KEY_INDEX --algo-crypt $ALGO_CRYPT --algo-auth $ALGO_AUTH --tar $TAR --apdu $C_APDU"
echo "Commandline: $COMMANDLINE"
R_APDU=`$COMMANDLINE 2> $PYSIM_SMPPOTATOOL_LOG`
if [ $? -ne 0 ]; then
echo "Unable to send request! -- failed!"
dump_logs
exit 1
fi
echo ""
echo "Got response from SMPP server:"
echo "Sent: $C_APDU"
echo "Received: $R_APDU"
echo "Expected: $R_APDU_EXPECTED"
if [ "$R_APDU" != "$R_APDU_EXPECTED" ]; then
echo "Response does not match the expected response! -- failed!"
dump_logs
exit 1
fi
echo "Response matches the expected response -- success!"
}
function start_smpp_server {
PCSC_READER=$1
echo ""
echo "Starting SMPP server:"
# Start the SMPP server
COMMANDLINE="$PYSIM_SMPP2SIM -p $PCSC_READER --smpp-bind-port $PYSIM_SMPP2SIM_PORT --apdu-trace"
echo "Commandline: $COMMANDLINE"
$COMMANDLINE > $PYSIM_SMPP2SIM_LOG 2>&1 &
PYSIM_SMPP2SIM_PID=$!
trap 'kill $PYSIM_SMPP2SIM_PID' EXIT
echo "SMPP server started (PID=$PYSIM_SMPP2SIM_PID)"
# Wait until the SMPP server is reachable
RC=1
RETRY_COUNT=0
while [ $RC -ne 0 ]; do
nc -z localhost $PYSIM_SMPP2SIM_PORT
RC=$?
((RETRY_COUNT++))
if [ $RETRY_COUNT -gt $PYSIM_SMPP2SIM_TIMEOUT ]; then
echo "SMPP server not reachable (port=$PYSIM_SMPP2SIM_PORT) -- abort"
dump_logs
exit 1
fi
sleep 1
done
echo "SMPP server reachable (port=$PYSIM_SMPP2SIM_PORT)"
}
function stop_smpp_server {
echo ""
echo "Stopping SMPP server:"
kill $PYSIM_SMPP2SIM_PID
echo "SMPP server stopped (PID=$PYSIM_SMPP2SIM_PID)"
trap EXIT
}
function find_card_by_iccid_or_eid {
ICCID=$1
EID=$2
echo ""
echo "Searching for card:"
echo "ICCID: \"$ICCID\""
if [ -n "$EID" ]; then
echo "EID: \"$EID\""
fi
# Determine number of available PCSC readers
PCSC_READER_COUNT=`pcsc_scan -rn | wc -l`
# In case an EID is set, search for a card with that EID first
if [ -n "$EID" ]; then
for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
echo "probing card (eID) in reader $PCSC_READER ..."
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "get_eid" 2> /dev/null | tail -3`
echo $RESULT_JSON | grep $EID > /dev/null
if [ $? -eq 0 ]; then
echo "Found card (eID) in reader $PCSC_READER"
return $PCSC_READER
fi
done
fi
# Search for card with the given ICCID
if [ -z "$ICCID" ]; then
echo "invalid ICCID, zero length ICCID is not allowed! -- abort"
exit 1
fi
for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
echo "probing card (ICCID) in reader $PCSC_READER ..."
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select EF.ICCID" -e "read_binary_decoded" 2> /dev/null | tail -3`
echo $RESULT_JSON | grep $ICCID > /dev/null
if [ $? -eq 0 ]; then
echo "Found card (by ICCID) in reader $PCSC_READER"
return $PCSC_READER
fi
done
echo "Card not found -- abort"
exit 1
}
function enable_profile {
PCSC_READER=$1
ICCID=$2
EID=$3
if [ -z "$EID" ]; then
# This is no eUICC, nothing to enable
return 0
fi
# Check if the profile is already enabled
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select EF.ICCID" -e "read_binary_decoded" 2> /dev/null | tail -3`
ICCID_ENABLED=`echo $RESULT_JSON | jq -r '.iccid'`
if [ $ICCID != $ICCID_ENABLED ]; then
# Disable the currentle enabled profile
echo ""
echo "Disabeling currently enabled profile:"
echo "ICCID: \"$ICCID\""
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "disable_profile --iccid $ICCID_ENABLED" 2> /dev/null | tail -3`
echo $RESULT_JSON | grep "ok" > /dev/null
if [ $? -ne 0 ]; then
echo "unable to disable profile with \"$ICCID_ENABLED\""
exit 1
fi
echo "profile disabled"
# Enable the profile we intend to test with
echo ""
echo "Enabeling profile:"
echo "ICCID: \"$ICCID\""
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "enable_profile --iccid $ICCID" 2> /dev/null | tail -3`
echo $RESULT_JSON | grep "ok\|profileNotInDisabledState" > /dev/null
if [ $? -ne 0 ]; then
echo "unable to enable profile with \"$ICCID\""
exit 1
fi
echo "profile enabled"
fi
}
export PYTHONPATH=./
echo "pySim-smpp2sim_test - a test program to test pySim-smpp2sim.py"
echo "=============================================================="
TESTCASE_DIR=`dirname $0`
for TEST_CONFIG_FILE in $TESTCASE_DIR/testcase_*.cfg ; do
echo ""
echo "running testcase: $TEST_CONFIG_FILE"
. $TEST_CONFIG_FILE
find_card_by_iccid_or_eid $ICCID $EID
PCSC_READER=$?
enable_profile $PCSC_READER $ICCID $EID
start_smpp_server $PCSC_READER
send_test_request $APDU "$EXPECTED_RESPONSE"
stop_smpp_server
echo ""
echo "testcase ok"
echo "--------------------------------------------------------------"
done
echo "done."

View File

@@ -1,17 +0,0 @@
# Preparation:
# This testcase executes against a sysmoISIM-SJA5 card. For the testcase, the
# key configuration on the card may be used as it is.
# Card parameter:
ICCID="8949440000001155314" # <-- change to the ICCID of your card!
EID=""
KIC='51D4FC44BCBA7C4589DFADA3297720AF' # <-- change to the KIC1 of your card!
KID='0449699C472CE71E2FB7B56245EF7684' # <-- change to the KID1 of your card!
KEY_INDEX=1
ALGO_CRYPT=triple_des_cbc2
ALGO_AUTH=triple_des_cbc2
TAR='B00010'
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
APDU='A0A40000027F20A0C0000016'
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'

View File

@@ -1,19 +0,0 @@
# Preparation:
# This testcase executes against a sysmoEUICC1-C2T, which is equipped with the
# TS48V1-B-UNIQUE test profile from https://test.rsp.sysmocom.de/ (Activation
# code: 1$smdpp.test.rsp.sysmocom.de$TS48V1-B-UNIQUE). This testprofile must be
# present on the eUICC before this testcase can be executed.
# Card parameter:
ICCID="8949449999999990031"
EID="89049044900000000000000000102355" # <-- change to the EID of your card!
KIC='66778899aabbccdd1122334455eeff10'
KID='112233445566778899aabbccddeeff10'
KEY_INDEX=2
ALGO_CRYPT=aes_cbc
ALGO_AUTH=aes_cmac
TAR='b00120'
# Testcase: Send OTA-SMS that selects DF.ICCID and returns the select response
APDU='00a40004022fe200C000001d'
EXPECTED_RESPONSE='621b8202412183022fe2a503d001408a01058b032f06038002000a8800 9000'

View File

@@ -1,28 +0,0 @@
# Preparation:
# This testcase executes against a sysmoISIM-SJA5 card. Since this card model is
# shipped with a classic DES key configuration, it is necessary to provision
# AES128 test keys before this testcase may be executed. The the following
# pySim-shell command sequence may be used:
#
# verify_adm 34173960 # <-- change to the ADM key of your card!
# select /DF.SYSTEM/EF.0348_KEY
# update_record 10 fe03601111111111111111111111111111111111111111111111111111111111111111
# update_record 11 fe03612222222222222222222222222222222222222222222222222222222222222222
# update_record 12 fe03623333333333333333333333333333333333333333333333333333333333333333
#
# This overwrites one of the already existing 3DES SCP02 key (KVN 47) and replaces it
# with an AES256 SCP80 key (KVN 3).
# Card parameter:
ICCID="8949440000001155314" # <-- change to the ICCID of your card!
EID=""
KIC='1111111111111111111111111111111111111111111111111111111111111111'
KID='2222222222222222222222222222222222222222222222222222222222222222'
KEY_INDEX=3
ALGO_CRYPT=aes_cbc
ALGO_AUTH=aes_cmac
TAR='B00010'
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
APDU='A0A40000027F20A0C0000016'
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'

View File

@@ -2,7 +2,7 @@
# Utility to verify the functionality of pySim-trace.py # Utility to verify the functionality of pySim-trace.py
# #
# (C) 2023 by sysmocom - s.f.m.c. GmbH # (C) 2023 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# Author: Philipp Maier # Author: Philipp Maier

View File

@@ -143,7 +143,7 @@ CardReset(3b9f96801f878031e073fe211b674a4c753034054ba9)
=============================== ===============================
00 SEARCH RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [2], "sw": "9000"}} 00 SEARCH RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [2], "sw": "9000"}}
=============================== ===============================
00 READ RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"alpha_id": "", "parameter_indicators": {"tp_vp": true, "tp_dcs": true, "tp_pid": true, "tp_sc_addr": true, "tp_dest_addr": false}, "tp_dest_addr": {"length": 255, "ton_npi": {"ext": true, "type_of_number": "reserved_for_extension", "numbering_plan_id": "reserved_for_extension"}, "call_number": ""}, "tp_sc_addr": {"length": 5, "ton_npi": {"ext": true, "type_of_number": "unknown", "numbering_plan_id": "isdn_e164"}, "call_number": "0015555"}, "tp_pid": "00", "tp_dcs": "00", "tp_vp_minutes": 5} 00 READ RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"alpha_id": "", "parameter_indicators": {"tp_dest_addr": false, "tp_sc_addr": true, "tp_pid": true, "tp_dcs": true, "tp_vp": true}, "tp_dest_addr": {"length": 255, "ton_npi": {"ext": true, "type_of_number": "reserved_for_extension", "numbering_plan_id": "reserved_for_extension"}, "call_number": ""}, "tp_sc_addr": {"length": 5, "ton_npi": {"ext": true, "type_of_number": "unknown", "numbering_plan_id": "isdn_e164"}, "call_number": "0015555f"}, "tp_pid": "00", "tp_dcs": "00", "tp_vp_minutes": 5}
=============================== ===============================
00 SEARCH RECORD MF/ADF.USIM/EF.SMS 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "00ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], "sw": "9000"}} 00 SEARCH RECORD MF/ADF.USIM/EF.SMS 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "00ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], "sw": "9000"}}
=============================== ===============================

View File

@@ -1 +0,0 @@
../../smdpp-data

View File

@@ -1,451 +0,0 @@
#!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: Neels Hofmeyr
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import io
import sys
import unittest
import io
from importlib import resources
from osmocom.utils import hexstr
from pySim.esim.saip import ProfileElementSequence
import pySim.esim.saip.personalization as p13n
import smdpp_data.upp
import xo
update_expected_output = False
def valstr(val):
if isinstance(val, io.BytesIO):
val = val.getvalue()
if isinstance(val, bytearray):
val = bytes(val)
return f'{val!r}'
def valtypestr(val):
if isinstance(val, dict):
types = []
for v in val.values():
types.append(f'{type(v).__name__}')
val_type = '{' + ', '.join(types) + '}'
else:
val_type = f'{type(val).__name__}'
return f'{valstr(val)}:{val_type}'
class ConfigurableParameterTest(unittest.TestCase):
def test_parameters(self):
upp_fnames = (
'TS48v5_SAIP2.1A_NoBERTLV.der',
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
'TS48v5_SAIP2.1B_NoBERTLV.der',
'TS48v5_SAIP2.3_NoBERTLV.der',
)
class Paramtest:
def __init__(self, param_cls, val, expect_val, expect_clean_val=None):
self.param_cls = param_cls
self.val = val
self.expect_clean_val = expect_clean_val
self.expect_val = expect_val
param_tests = [
Paramtest(param_cls=p13n.Imsi, val='123456',
expect_clean_val=str('123456'),
expect_val={'IMSI': hexstr('123456'),
'IMSI-ACC': '0040'}),
Paramtest(param_cls=p13n.Imsi, val=int(123456),
expect_val={'IMSI': hexstr('123456'),
'IMSI-ACC': '0040'}),
Paramtest(param_cls=p13n.Imsi, val='123456789012345',
expect_clean_val=str('123456789012345'),
expect_val={'IMSI': hexstr('123456789012345'),
'IMSI-ACC': '0020'}),
Paramtest(param_cls=p13n.Imsi, val=int(123456789012345),
expect_val={'IMSI': hexstr('123456789012345'),
'IMSI-ACC': '0020'}),
Paramtest(param_cls=p13n.Puk1,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Puk1,
val=int(12345678),
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Puk2,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Pin1,
val='1234',
expect_clean_val=b'1234\xff\xff\xff\xff',
expect_val='1234'),
Paramtest(param_cls=p13n.Pin1,
val='123456',
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.Pin1,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Pin1,
val=int(1234),
expect_clean_val=b'1234\xff\xff\xff\xff',
expect_val='1234'),
Paramtest(param_cls=p13n.Pin1,
val=int(123456),
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.Pin1,
val=int(12345678),
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Adm1,
val='1234',
expect_clean_val=b'1234\xff\xff\xff\xff',
expect_val='1234'),
Paramtest(param_cls=p13n.Adm1,
val='123456',
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.Adm1,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Adm1,
val=int(123456),
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.AlgorithmID,
val='Milenage',
expect_clean_val=1,
expect_val='Milenage'),
Paramtest(param_cls=p13n.AlgorithmID,
val='TUAK',
expect_clean_val=2,
expect_val='TUAK'),
Paramtest(param_cls=p13n.AlgorithmID,
val='usim-test',
expect_clean_val=3,
expect_val='usim-test'),
Paramtest(param_cls=p13n.AlgorithmID,
val=1,
expect_clean_val=1,
expect_val='Milenage'),
Paramtest(param_cls=p13n.AlgorithmID,
val=2,
expect_clean_val=2,
expect_val='TUAK'),
Paramtest(param_cls=p13n.AlgorithmID,
val=3,
expect_clean_val=3,
expect_val='usim-test'),
Paramtest(param_cls=p13n.K,
val='01020304050607080910111213141516',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=int(11020304050607080910111213141516),
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='11020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val='01020304050607080910111213141516',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.SmspTpScAddr,
val='+1234567',
expect_clean_val=(True, '1234567'),
expect_val='+1234567'),
Paramtest(param_cls=p13n.SmspTpScAddr,
val=1234567,
expect_clean_val=(False, '1234567'),
expect_val='1234567'),
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
val='123',
expect_clean_val=123,
expect_val='123'),
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
val=123,
expect_clean_val=123,
expect_val='123'),
Paramtest(param_cls=p13n.MilenageRotationConstants,
val='0a 0b 0c 01 02',
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
expect_val='0a0b0c0102'),
Paramtest(param_cls=p13n.MilenageRotationConstants,
val=b'\x0a\x0b\x0c\x01\x02',
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
expect_val='0a0b0c0102'),
Paramtest(param_cls=p13n.MilenageRotationConstants,
val=bytearray(b'\x0a\x0b\x0c\x01\x02'),
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
expect_val='0a0b0c0102'),
Paramtest(param_cls=p13n.MilenageXoringConstants,
val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
' bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
' cccccccccccccccccccccccccccccccc'
' 11111111111111111111111111111111'
' 22222222222222222222222222222222',
expect_clean_val=b'\xaa' * 16
+ b'\xbb' * 16
+ b'\xcc' * 16
+ b'\x11' * 16
+ b'\x22' * 16,
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
'cccccccccccccccccccccccccccccccc'
'11111111111111111111111111111111'
'22222222222222222222222222222222'),
Paramtest(param_cls=p13n.MilenageXoringConstants,
val=b'\xaa' * 16
+ b'\xbb' * 16
+ b'\xcc' * 16
+ b'\x11' * 16
+ b'\x22' * 16,
expect_clean_val=b'\xaa' * 16
+ b'\xbb' * 16
+ b'\xcc' * 16
+ b'\x11' * 16
+ b'\x22' * 16,
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
'cccccccccccccccccccccccccccccccc'
'11111111111111111111111111111111'
'22222222222222222222222222222222'),
]
for sdkey_cls in (
# thin out the number of tests, as a compromise between completeness and test runtime
p13n.SdKeyScp02Kvn20AesDek,
#p13n.SdKeyScp02Kvn20AesEnc,
#p13n.SdKeyScp02Kvn20AesMac,
#p13n.SdKeyScp02Kvn21AesDek,
p13n.SdKeyScp02Kvn21AesEnc,
#p13n.SdKeyScp02Kvn21AesMac,
#p13n.SdKeyScp02Kvn22AesDek,
#p13n.SdKeyScp02Kvn22AesEnc,
p13n.SdKeyScp02Kvn22AesMac,
#p13n.SdKeyScp02KvnffAesDek,
#p13n.SdKeyScp02KvnffAesEnc,
#p13n.SdKeyScp02KvnffAesMac,
p13n.SdKeyScp03Kvn30AesDek,
#p13n.SdKeyScp03Kvn30AesEnc,
#p13n.SdKeyScp03Kvn30AesMac,
#p13n.SdKeyScp03Kvn31AesDek,
p13n.SdKeyScp03Kvn31AesEnc,
#p13n.SdKeyScp03Kvn31AesMac,
#p13n.SdKeyScp03Kvn32AesDek,
#p13n.SdKeyScp03Kvn32AesEnc,
p13n.SdKeyScp03Kvn32AesMac,
#p13n.SdKeyScp80Kvn01AesDek,
#p13n.SdKeyScp80Kvn01AesEnc,
#p13n.SdKeyScp80Kvn01AesMac,
p13n.SdKeyScp80Kvn01DesDek,
#p13n.SdKeyScp80Kvn01DesEnc,
#p13n.SdKeyScp80Kvn01DesMac,
#p13n.SdKeyScp80Kvn02AesDek,
p13n.SdKeyScp80Kvn02AesEnc,
#p13n.SdKeyScp80Kvn02AesMac,
#p13n.SdKeyScp80Kvn02DesDek,
#p13n.SdKeyScp80Kvn02DesEnc,
p13n.SdKeyScp80Kvn02DesMac,
#p13n.SdKeyScp80Kvn03AesDek,
#p13n.SdKeyScp80Kvn03AesEnc,
#p13n.SdKeyScp80Kvn03AesMac,
p13n.SdKeyScp80Kvn03DesDek,
#p13n.SdKeyScp80Kvn03DesEnc,
#p13n.SdKeyScp80Kvn03DesMac,
#p13n.SdKeyScp81Kvn40AesDek,
p13n.SdKeyScp81Kvn40DesDek,
#p13n.SdKeyScp81Kvn40Tlspsk,
#p13n.SdKeyScp81Kvn41AesDek,
#p13n.SdKeyScp81Kvn41DesDek,
p13n.SdKeyScp81Kvn41Tlspsk,
#p13n.SdKeyScp81Kvn42AesDek,
#p13n.SdKeyScp81Kvn42DesDek,
#p13n.SdKeyScp81Kvn42Tlspsk,
):
for key_len in sdkey_cls.allow_len:
val = '0102030405060708091011121314151617181920212223242526272829303132'
expect_clean_val = (b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
b'\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31\x32')
expect_val = '0102030405060708091011121314151617181920212223242526272829303132'
val = val[:key_len*2]
expect_clean_val = expect_clean_val[:key_len]
expect_val = val
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
# test bytes input
val = expect_clean_val
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
# test bytearray input
val = bytearray(expect_clean_val)
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
# test BytesIO input
val = io.BytesIO(expect_clean_val)
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
if key_len == 16:
# test huge integer input.
# needs to start with nonzero.. stupid
val = 11020304050607080910111213141516
expect_clean_val = (b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16')
expect_val = '11020304050607080910111213141516'
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
outputs = []
for upp_fname in upp_fnames:
test_idx = -1
try:
der = resources.read_binary(smdpp_data.upp, upp_fname)
for t in param_tests:
test_idx += 1
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
param = None
try:
param = t.param_cls()
param.input_value = t.val
param.validate()
except ValueError as e:
raise ValueError(f'{logloc}: {e}') from e
clean_val = param.value
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
raise ValueError(f'{logloc}: expected'
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
# pes = copy.deepcopy(orig_pes)
pes = ProfileElementSequence.from_der(der)
try:
param.apply(pes)
except ValueError as e:
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
changed_der = pes.to_der()
pes2 = ProfileElementSequence.from_der(changed_der)
read_back_val = t.param_cls.get_value_from_pes(pes2)
# compose log string to show the precise type of dict values
if isinstance(read_back_val, dict):
types = set()
for v in read_back_val.values():
types.add(f'{type(v).__name__}')
read_back_val_type = '{' + ', '.join(types) + '}'
else:
read_back_val_type = f'{type(read_back_val).__name__}'
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
expect_val = t.expect_val
if not isinstance(expect_val, dict):
expect_val = { t.param_cls.get_name(): expect_val }
if read_back_val != expect_val:
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
ok = logloc.replace(' clean_val', '\n\tclean_val'
).replace(' read_back_val', '\n\tread_back_val'
).replace('=', '=\t'
)
output = f'\nok: {ok}'
outputs.append(output)
print(output)
except Exception as e:
raise RuntimeError(f'Error while testing UPP {upp_fname} {test_idx=}: {e}') from e
output = '\n'.join(outputs) + '\n'
xo_name = 'test_configurable_parameters'
if update_expected_output:
with resources.path(xo, xo_name) as xo_path:
with open(xo_path, 'w', encoding='utf-8') as f:
f.write(output)
else:
xo_str = resources.read_text(xo, xo_name)
if xo_str != output:
at = 0
while at < len(output):
if output[at] == xo_str[at]:
at += 1
continue
break
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
if __name__ == "__main__":
if '-u' in sys.argv:
update_expected_output = True
sys.argv.remove('-u')
unittest.main()

View File

@@ -21,7 +21,7 @@ import copy
from osmocom.utils import h2b, b2h from osmocom.utils import h2b, b2h
from pySim.esim.saip import * from pySim.esim.saip import *
from pySim.esim.saip import personalization from pySim.esim.saip.personalization import *
from pprint import pprint as pp from pprint import pprint as pp
@@ -55,56 +55,14 @@ class SaipTest(unittest.TestCase):
def test_personalization(self): def test_personalization(self):
"""Test some of the personalization operations.""" """Test some of the personalization operations."""
pes = copy.deepcopy(self.pes) pes = copy.deepcopy(self.pes)
params = [personalization.Puk1('01234567'), params = [Puk1('01234567'), Puk2(98765432), Pin1('1111'), Pin2(2222), Adm1('11111111'),
personalization.Puk2(98765432), K(h2b('000102030405060708090a0b0c0d0e0f')), Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
personalization.Pin1('1111'),
personalization.Pin2(2222),
personalization.Adm1('11111111'),
personalization.K(h2b('000102030405060708090a0b0c0d0e0f')),
personalization.Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
for p in params: for p in params:
p.validate() p.validate()
p.apply(pes) p.apply(pes)
# TODO: we don't actually test the results here, but we just verify there is no exception # TODO: we don't actually test the results here, but we just verify there is no exception
pes.to_der() pes.to_der()
def test_personalization2(self):
"""Test some of the personalization operations."""
cls = personalization.SdKeyScp80Kvn01DesEnc
pes = ProfileElementSequence.from_der(self.per_input)
prev_val = tuple(cls.get_values_from_pes(pes))
print(f'{prev_val=}')
self.assertTrue(prev_val)
set_val = '42342342342342342342342342342342'
param = cls(set_val)
param.validate()
param.apply(pes)
get_val1 = tuple(cls.get_values_from_pes(pes))
print(f'{get_val1=} {set_val=}')
self.assertEqual(get_val1, ({cls.name: set_val},))
get_val1b = tuple(cls.get_values_from_pes(pes))
print(f'{get_val1b=} {set_val=}')
self.assertEqual(get_val1b, ({cls.name: set_val},))
der = pes.to_der()
get_val1c = tuple(cls.get_values_from_pes(pes))
print(f'{get_val1c=} {set_val=}')
self.assertEqual(get_val1c, ({cls.name: set_val},))
# assertTrue to not dump the entire der.
# Expecting the modified DER to be different. If this assertion fails, then no change has happened in the output
# DER and the ConfigurableParameter subclass is buggy.
self.assertTrue(der != self.per_input)
pes2 = ProfileElementSequence.from_der(der)
get_val2 = tuple(cls.get_values_from_pes(pes2))
print(f'{get_val2=} {set_val=}')
self.assertEqual(get_val2, ({cls.name: set_val},))
def test_constructor_encode(self): def test_constructor_encode(self):
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception.""" """Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom, for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
@@ -132,34 +90,5 @@ class OidTest(unittest.TestCase):
self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0')) self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0'))
self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1')) self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1'))
class NonMatchTest(unittest.TestCase):
def test_nonmatch(self):
# non-matches before, in between and after matches
match_list = [Match(a=10, b=10, size=5), Match(a=20, b=20, size=4)]
nm_list = NonMatch.from_matchlist(match_list, 26)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=10), NonMatch(a=15, b=15, size=5),
NonMatch(a=24, b=24, size=2)])
def test_nonmatch_beg(self):
# single match at beginning
match_list = [Match(a=0, b=0, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 20)
self.assertEqual(nm_list, [NonMatch(a=5, b=5, size=15)])
def test_nonmatch_end(self):
# single match at end
match_list = [Match(a=19, b=19, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=19)])
def test_nonmatch_none(self):
# no match at all
match_list = []
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=24)])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -96,7 +96,7 @@ class LinFixed_Test(unittest.TestCase):
inst = c() inst = c()
encoded, rec_num, decoded = self._parse_t(t) encoded, rec_num, decoded = self._parse_t(t)
logging.debug("Testing encode of %s", name) logging.debug("Testing encode of %s", name)
re_enc = inst.encode_record_hex(decoded, rec_num, len(encoded)//2) re_enc = inst.encode_record_hex(decoded, rec_num)
self.assertEqual(encoded.upper(), re_enc.upper()) self.assertEqual(encoded.upper(), re_enc.upper())
def test_de_encode_record(self): def test_de_encode_record(self):
@@ -122,7 +122,7 @@ class LinFixed_Test(unittest.TestCase):
self.assertEqual(decoded, re_dec) self.assertEqual(decoded, re_dec)
# re-encode the decoded data # re-encode the decoded data
logging.debug("Testing re-encode of %s", name) logging.debug("Testing re-encode of %s", name)
re_enc = inst.encode_record_hex(re_dec, rec_num, len(encoded)//2) re_enc = inst.encode_record_hex(re_dec, rec_num)
self.assertEqual(encoded.upper(), re_enc.upper()) self.assertEqual(encoded.upper(), re_enc.upper())
if hasattr(c, '_test_no_pad') and c._test_no_pad: if hasattr(c, '_test_no_pad') and c._test_no_pad:
continue continue
@@ -196,7 +196,7 @@ class TransRecEF_Test(unittest.TestCase):
self.assertEqual(decoded, re_dec) self.assertEqual(decoded, re_dec)
# re-encode the decoded data # re-encode the decoded data
logging.debug("Testing re-encode of %s", name) logging.debug("Testing re-encode of %s", name)
re_enc = inst.encode_record_hex(re_dec, len(encoded)//2) re_enc = inst.encode_record_hex(re_dec)
self.assertEqual(encoded.upper(), re_enc.upper()) self.assertEqual(encoded.upper(), re_enc.upper())
# there's no point in testing padded input, as TransRecEF have a fixed record # there's no point in testing padded input, as TransRecEF have a fixed record
# size and we cannot ever receive more input data than that size. # size and we cannot ever receive more input data than that size.
@@ -256,8 +256,8 @@ class TransparentEF_Test(unittest.TestCase):
encoded = t[0] encoded = t[0]
decoded = t[1] decoded = t[1]
logging.debug("Testing encode of %s", name) logging.debug("Testing encode of %s", name)
re_enc = inst.encode_hex(decoded, len(encoded)//2) re_dec = inst.decode_hex(encoded)
self.assertEqual(encoded, re_enc) self.assertEqual(decoded, re_dec)
def test_de_encode_file(self): def test_de_encode_file(self):
"""Test the decoder and encoder for a transparent EF. Performs first a decoder """Test the decoder and encoder for a transparent EF. Performs first a decoder
@@ -280,7 +280,7 @@ class TransparentEF_Test(unittest.TestCase):
self.assertEqual(decoded, re_dec) self.assertEqual(decoded, re_dec)
logging.debug("Testing re-encode of %s", name) logging.debug("Testing re-encode of %s", name)
re_dec = inst.decode_hex(encoded) re_dec = inst.decode_hex(encoded)
re_enc = inst.encode_hex(re_dec, len(encoded)//2) re_enc = inst.encode_hex(re_dec)
self.assertEqual(encoded.upper(), re_enc.upper()) self.assertEqual(encoded.upper(), re_enc.upper())
if hasattr(c, '_test_no_pad') and c._test_no_pad: if hasattr(c, '_test_no_pad') and c._test_no_pad:
continue continue

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH # (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# Author: Philipp Maier <pmaier@sysmocom.de> # Author: Philipp Maier <pmaier@sysmocom.de>
@@ -25,7 +25,7 @@ import io
import sys import sys
from inspect import currentframe, getframeinfo from inspect import currentframe, getframeinfo
log = PySimLogger.get(__name__) log = PySimLogger.get("TEST")
TEST_MSG_DEBUG = "this is a debug message" TEST_MSG_DEBUG = "this is a debug message"
TEST_MSG_INFO = "this is an info message" TEST_MSG_INFO = "this is an info message"
@@ -82,15 +82,15 @@ class PySimLogger_Test(unittest.TestCase):
PySimLogger.setup(self._test_print_callback) PySimLogger.setup(self._test_print_callback)
PySimLogger.set_verbose(True) PySimLogger.set_verbose(True)
frame = currentframe() frame = currentframe()
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- DEBUG: " + TEST_MSG_DEBUG expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
log.debug(TEST_MSG_DEBUG) log.debug(TEST_MSG_DEBUG)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- INFO: " + TEST_MSG_INFO expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO) log.info(TEST_MSG_INFO)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- WARNING: " + TEST_MSG_WARNING expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING) log.warning(TEST_MSG_WARNING)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- ERROR: " + TEST_MSG_ERROR expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR) log.error(TEST_MSG_ERROR)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- CRITICAL: " + TEST_MSG_CRITICAL expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL) log.critical(TEST_MSG_CRITICAL)
def test_04_level(self): def test_04_level(self):

View File

@@ -1,216 +0,0 @@
#!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: Neels Hofmeyr
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import math
from importlib import resources
import unittest
from pySim.esim.saip import param_source
import xo
update_expected_output = False
class D:
mandatory = set()
optional = set()
def __init__(self, **kwargs):
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
for k, v in kwargs.items():
setattr(self, k, v)
for k in self.optional:
if not hasattr(self, k):
setattr(self, k, None)
decimals = '0123456789'
hexadecimals = '0123456789abcdefABCDEF'
class FakeRandom:
vals = b'\xab\xcfm\xf0\x98J_\xcf\x96\x87fp5l\xe7f\xd1\xd6\x97\xc1\xf9]\x8c\x86+\xdb\t^ke\xc1r'
i = 0
@classmethod
def next(cls):
cls.i = (cls.i + 1) % len(cls.vals)
return cls.vals[cls.i]
@staticmethod
def randint(a, b):
d = b - a
n_bytes = math.ceil(math.log(d, 2))
r = int.from_bytes( bytes(FakeRandom.next() for i in range(n_bytes)) )
return a + (r % (b - a))
@staticmethod
def randbytes(n):
return bytes(FakeRandom.next() for i in range(n))
class ParamSourceTest(unittest.TestCase):
def test_param_source(self):
class ParamSourceTest(D):
mandatory = (
'param_source',
'n',
'expect',
)
optional = (
'expect_arg',
'csv_rows',
)
def expect_const(t, vals):
return tuple(t.expect_arg) == tuple(vals)
def expect_random(t, vals):
chars = t.expect_arg.get('digits')
repetitions = (t.n - len(set(vals)))
if repetitions:
raise RuntimeError(f'expect_random: there are {repetitions} repetitions in the returned values: {vals}')
for val_i in range(len(vals)):
v = vals[val_i]
val_minlen = t.expect_arg.get('val_minlen')
val_maxlen = t.expect_arg.get('val_maxlen')
if len(v) < val_minlen or len(v) > val_maxlen:
raise RuntimeError(f'expect_random: invalid length {len(v)} for value [{val_i}]: {v!r}, expecting'
f' {val_minlen}..{val_maxlen}')
if chars is not None and not all(c in chars for c in v):
raise RuntimeError(f'expect_random: invalid char in value [{val_i}]: {v!r}')
return True
param_source_tests = [
ParamSourceTest(param_source=param_source.ConstantSource.from_str('123'),
n=3,
expect=expect_const,
expect_arg=('123', '123', '123')
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('12345'),
n=3,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 5,
'val_maxlen': 5,
},
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('1..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 1,
'val_maxlen': 3,
},
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('001..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 3,
'val_maxlen': 3,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.IncDigitSource.from_str('10001'),
n=3,
expect=expect_const,
expect_arg=('10001', '10002', '10003')
),
ParamSourceTest(param_source=param_source.CsvSource('column_name'),
n=3,
expect=expect_const,
expect_arg=('first val', 'second val', 'third val'),
csv_rows=(
{'column_name': 'first val',},
{'column_name': 'second val',},
{'column_name': 'third val',},
)
),
]
outputs = []
for t in param_source_tests:
try:
if hasattr(t.param_source, 'random_impl'):
t.param_source.random_impl = FakeRandom
vals = []
for i in range(t.n):
csv_row = None
if t.csv_rows is not None:
csv_row = t.csv_rows[i]
vals.append( t.param_source.get_next(csv_row=csv_row) )
if not t.expect(t, vals):
raise RuntimeError(f'invalid values returned: returned {vals}')
output = f'ok: {t.param_source.__class__.__name__} {vals=!r}'
outputs.append(output)
print(output)
except RuntimeError as e:
raise RuntimeError(f'{t.param_source.__class__.__name__} {t.n=} {t.expect.__name__}({t.expect_arg!r}): {e}') from e
output = '\n'.join(outputs) + '\n'
xo_name = 'test_param_src'
if update_expected_output:
with resources.path(xo, xo_name) as xo_path:
with open(xo_path, 'w', encoding='utf-8') as f:
f.write(output)
else:
xo_str = resources.read_text(xo, xo_name)
if xo_str != output:
at = 0
while at < len(output):
if output[at] == xo_str[at]:
at += 1
continue
break
raise RuntimeError(f'output differs from expected output at position {at}: {xo_str[at:at+128]!r}')
if __name__ == "__main__":
if '-u' in sys.argv:
update_expected_output = True
sys.argv.remove('-u')
unittest.main()

View File

@@ -79,30 +79,6 @@ class DecTestCase(unittest.TestCase):
def testDecMNCfromPLMN_unused_str(self): def testDecMNCfromPLMN_unused_str(self):
self.assertEqual(utils.dec_mnc_from_plmn_str("00f0ff"), "") self.assertEqual(utils.dec_mnc_from_plmn_str("00f0ff"), "")
def testEncImsi(self):
#Type IMSI, odd number of identity digits
self.assertEqual(utils.enc_imsi("228062800000208"), "082982608200002080")
self.assertEqual(utils.enc_imsi("001010000123456"), "080910100000214365")
self.assertEqual(utils.enc_imsi("0010100001234"), "0709101000002143ff")
#Type IMSI, even number of identity digits
self.assertEqual(utils.enc_imsi("22806280000028"), "0821826082000020f8")
self.assertEqual(utils.enc_imsi("00101000012345"), "0801101000002143f5")
self.assertEqual(utils.enc_imsi("001010000123"), "07011010000021f3ff")
def testDecImsi(self):
#Type IMSI, odd number of identity digits
self.assertEqual(utils.dec_imsi("082982608200002080"), "228062800000208")
self.assertEqual(utils.dec_imsi("080910100000214365"), "001010000123456")
self.assertEqual(utils.dec_imsi("0709101000002143ff"), "0010100001234")
self.assertEqual(utils.dec_imsi("0709101000002143"), "0010100001234")
#Type IMSI, even number of identity digits
self.assertEqual(utils.dec_imsi("0821826082000020f8"), "22806280000028")
self.assertEqual(utils.dec_imsi("0801101000002143f5"), "00101000012345")
self.assertEqual(utils.dec_imsi("07011010000021f3ff"), "001010000123")
self.assertEqual(utils.dec_imsi("07011010000021f3"), "001010000123")
def test_enc_plmn(self): def test_enc_plmn(self):
with self.subTest("2-digit MCC"): with self.subTest("2-digit MCC"):
self.assertEqual(utils.enc_plmn("001", "01F"), "00F110") self.assertEqual(utils.enc_plmn("001", "01F"), "00F110")

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +0,0 @@
ok: ConstantSource vals=['123', '123', '123']
ok: RandomDigitSource vals=['13987', '49298', '55670']
ok: RandomDigitSource vals=['650', '580', '49', '885', '497', '195', '320', '137', '245', '663']
ok: RandomDigitSource vals=['638', '025', '232', '779', '826', '972', '650', '580', '049', '885']
ok: RandomHexDigitSource vals=['6b65c172', 'abcf6df0', '984a5fcf']
ok: RandomHexDigitSource vals=['96876670', '356ce766', 'd1d697c1']
ok: RandomHexDigitSource vals=['f95d8c86', '2bdb095e', '6b65c172']
ok: IncDigitSource vals=['10001', '10002', '10003']
ok: CsvSource vals=['first val', 'second val', 'third val']