2 Commits

Author SHA1 Message Date
Philipp Maier
a3469bc03b card_key_provider: add PostgreSQL support
The Card Key Provider currently only has support for CSV files
as input. Unfortunately using CSV files does not scale very well
when the card inventory is very large and continously updated.
In this case a centralized storage in the form of a database
is the more suitable approach.

This patch adds PostgreSQL support next to the existing CSV
file support. It also adds an importer tool to import existing
CSV files into the database.

Change-Id: Icba625c02a60d7e1f519b506a46bda5ded0537d3
Related: SYS#7725
2025-12-18 15:59:31 +01:00
Philipp Maier
c118012fb9 pysim/log: also accept ANSI strings to specify the log message colors
the PySimLogger class currently only accepts cmd2 color enum values.
This is what we need for pySim-shell.py. However, in case we want to
use the PySimLogger in stand-alone programs that do not use cmd2, this
is a bit bulky. Let's add some flexibility to PySimLogger, so that we
can specify the colors as raw ANSI strings as well.

Change-Id: I93543e19649064043ae8323f82ecd8c423d1d921
Related: SYS#7725
2025-12-18 15:59:01 +01:00
68 changed files with 380 additions and 5965 deletions

2
.gitignore vendored
View File

@@ -1,5 +1,5 @@
*.pyc
.*.sw?
.*.swp
/docs/_*
/docs/generated

View File

@@ -100,6 +100,7 @@ Please install the following dependencies:
- pyyaml >= 5.1
- smpp.pdu (from `github.com/hologram-io/smpp.pdu`)
- termcolor
- psycopg2-binary
Example for Debian:
```sh

View File

@@ -1,112 +0,0 @@
#!/usr/bin/env python3
# A tool to analyze the eUICC simaResponse (series of EUICCResponse)
#
# (C) 2025 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
from osmocom.utils import h2b, b2h
from osmocom.tlv import bertlv_parse_one, bertlv_encode_tag, bertlv_encode_len
from pySim.esim.saip import *
parser = argparse.ArgumentParser(description="""Utility program to analyze the contents of an eUICC simaResponse.""")
parser.add_argument('SIMA_RESPONSE', help='Hexstring containing the simaResponse as received from the eUICC')
def split_sima_response(sima_response):
"""split an eUICC simaResponse field into a list of EUICCResponse fields"""
remainder = sima_response
result = []
while len(remainder):
tdict, l, v, next_remainder = bertlv_parse_one(remainder)
rawtag = bertlv_encode_tag(tdict)
rawlen = bertlv_encode_len(l)
result = result + [remainder[0:len(rawtag) + len(rawlen) + l]]
remainder = next_remainder
return result
def analyze_status(status):
"""
Convert a status code (integer) into a human readable string
(see eUICC Profile Package: Interoperable Format Technical Specification, section 8.11)
"""
# SIMA status codes
string_values = {0 : 'ok',
1 : 'pe-not-supported',
2 : 'memory-failure',
3 : 'bad-values',
4 : 'not-enough-memory',
5 : 'invalid-request-format',
6 : 'invalid-parameter',
7 : 'runtime-not-supported',
8 : 'lib-not-supported',
9 : 'template-not-supported ',
10 : 'feature-not-supported',
11 : 'pin-code-missing',
31 : 'unsupported-profile-version'}
string_value = string_values.get(status, None)
if string_value is not None:
return "%d = %s (SIMA status code)" % (status, string_value)
# ISO 7816 status words
if status >= 24576 and status <= 28671:
return "%d = %04x (ISO7816 status word)" % (status, status)
elif status >= 36864 and status <= 40959:
return "%d = %04x (ISO7816 status word)" % (status, status)
# Proprietary status codes
elif status >= 40960 and status <= 65535:
return "%d = %04x (proprietary)" % (status, status)
# Unknown status codes
return "%d (unknown, proprietary?)" % status
def analyze_euicc_response(euicc_response):
"""Analyze and display the contents of an EUICCResponse"""
print(" EUICCResponse: %s" % b2h(euicc_response))
euicc_response_decoded = asn1.decode('EUICCResponse', euicc_response)
pe_status = euicc_response_decoded.get('peStatus')
print(" peStatus:")
for s in pe_status:
print(" status: %s" % analyze_status(s.get('status')))
print(" identification: %s" % str(s.get('identification', None)))
print(" additional-information: %s" % str(s.get('additional-information', None)))
print(" offset: %s" % str(s.get('offset', None)))
if euicc_response_decoded.get('profileInstallationAborted', False) is None:
# This type is defined as profileInstallationAborted NULL OPTIONAL, so when it is present it
# will have the value None, otherwise it is simply not present.
print(" profileInstallationAborted: True")
else:
print(" profileInstallationAborted: False")
status_message = euicc_response_decoded.get('statusMessage', None)
print(" statusMessage: %s" % str(status_message))
if __name__ == '__main__':
opts = parser.parse_args()
sima_response = h2b(opts.SIMA_RESPONSE);
print("simaResponse: %s" % b2h(sima_response))
euicc_response_list = split_sima_response(sima_response)
for euicc_response in euicc_response_list:
analyze_euicc_response(euicc_response)

View File

@@ -1,28 +1,9 @@
#!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import logging
import csv
import sys
import os
import yaml
import psycopg2
from psycopg2.sql import Identifier, SQL
@@ -30,7 +11,7 @@ from pathlib import Path
from pySim.log import PySimLogger
from packaging import version
log = PySimLogger.get(Path(__file__).stem)
log = PySimLogger.get("CSV2PGQSL")
class CardKeyDatabase:
def __init__(self, config_filename: str, table_name: str, create_table: bool = False, admin: bool = False):
@@ -53,7 +34,8 @@ class CardKeyDatabase:
raise ValueError("user for role '%s' not set up in config file." % role)
return user.get('name'), user.get('pass')
self.table = table_name.lower()
log = PySimLogger.get("PQSQL")
self.table = table_name
self.cols = None
# Depending on the table type, the table name must contain either the substring "uicc_keys" or "euicc_keys".
@@ -113,7 +95,7 @@ class CardKeyDatabase:
"""
# Create table columns with primary key
query = SQL("CREATE TABLE {} ({} VARCHAR PRIMARY KEY").format(Identifier(self.table),
query = SQL("CREATE TABLE {} ({} VARCHAR PRIMARY KEY").format(Identifier(self.table.lower()),
Identifier(cols[0].lower()))
for c in cols[1:]:
query += SQL(", {} VARCHAR").format(Identifier(c.lower()))
@@ -123,16 +105,16 @@ class CardKeyDatabase:
# Create indexes for all other columns
for c in cols[1:]:
self.cur.execute(query = SQL("CREATE INDEX {} ON {}({});").format(Identifier(c.lower()),
Identifier(self.table),
Identifier(self.table.lower()),
Identifier(c.lower())))
# Set permissions
self.cur.execute(SQL("GRANT INSERT ON {} TO {};").format(Identifier(self.table),
self.cur.execute(SQL("GRANT INSERT ON {} TO {};").format(Identifier(self.table.lower()),
Identifier(user_importer)))
self.cur.execute(SQL("GRANT SELECT ON {} TO {};").format(Identifier(self.table),
self.cur.execute(SQL("GRANT SELECT ON {} TO {};").format(Identifier(self.table.lower()),
Identifier(user_reader)))
log.info("New database table created: %s", self.table)
log.info("New database table created: %s", str(self.table.lower()))
def get_cols(self) -> list[str]:
"""
@@ -147,7 +129,7 @@ class CardKeyDatabase:
return self.cols
# Request a list of current cols from the database
self.cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (self.table,))
self.cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (self.table.lower(),))
cols_result = self.cur.fetchall()
cols = []
@@ -190,7 +172,7 @@ class CardKeyDatabase:
# Add the missing columns to the table
self.cols = None
for c in cols_missing:
self.cur.execute(query = SQL("ALTER TABLE {} ADD {} VARCHAR;").format(Identifier(self.table),
self.cur.execute(query = SQL("ALTER TABLE {} ADD {} VARCHAR;").format(Identifier(self.table.lower()),
Identifier(c.lower())))
def insert_row(self, row:dict[str, str]):
@@ -211,7 +193,7 @@ class CardKeyDatabase:
# Insert row into datbase table
row_keys = list(row.keys())
row_values = list(row.values())
query = SQL("INSERT INTO {} ").format(Identifier(self.table))
query = SQL("INSERT INTO {} ").format(Identifier(self.table.lower()))
query += SQL("({} ").format(Identifier(row_keys[0].lower()))
for k in row_keys[1:]:
query += SQL(", {}").format(Identifier(k.lower()))
@@ -237,7 +219,7 @@ def open_csv(opts: argparse.Namespace):
def open_db(cr: csv.DictReader, opts: argparse.Namespace) -> CardKeyDatabase:
try:
db = CardKeyDatabase(os.path.expanduser(opts.pgsql), opts.table_name, opts.create_table, opts.admin)
db = CardKeyDatabase(opts.pqsql, opts.table_name, opts.create_table, opts.admin)
# Check CSV format against table schema, add missing columns
cols_missing = db.get_missing_cols(cr.fieldnames)
@@ -275,8 +257,8 @@ if __name__ == '__main__':
option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
option_parser.add_argument('--pgsql', metavar='FILE',
default="~/.osmocom/pysim/card_data_pgsql.cfg",
option_parser.add_argument('--pqsql', metavar='FILE',
default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
help='Read card data from PostgreSQL database (config file)')
option_parser.add_argument('--csv', metavar='FILE', help='input CSV file with card data', required=True)
option_parser.add_argument("--table-name", help="name of the card key table", type=str, required=True)

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import argparse
import logging
import json
import asn1tools
import asn1tools.codecs.ber
import asn1tools.codecs.der
import pySim.esim.rsp as rsp
import pySim.esim.saip as saip
from pySim.esim.es2p import param, Es2pApiServerMno, Es2pApiServerHandlerMno
from osmocom.utils import b2h
from datetime import datetime
from analyze_simaResponse import split_sima_response
from pathlib import Path
logger = logging.getLogger(Path(__file__).stem)
parser = argparse.ArgumentParser(description="""
Utility to receive and log requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
parser.add_argument("--host", help="Host/IP to bind HTTP(S) to", default="localhost")
parser.add_argument("--port", help="TCP port to bind HTTP(S) to", default=443, type=int)
parser.add_argument('--server-cert', help='X.509 server certificate used to provide the ES2+ HTTPs service')
parser.add_argument('--client-ca-cert', help='X.509 CA certificates to authenticate the requesting client(s)')
parser.add_argument("-v", "--verbose", help="enable debug output", action='store_true', default=False)
def decode_sima_response(sima_response):
decoded = []
euicc_response_list = split_sima_response(sima_response)
for euicc_response in euicc_response_list:
decoded.append(saip.asn1.decode('EUICCResponse', euicc_response))
return decoded
def decode_result_data(result_data):
return rsp.asn1.decode('PendingNotification', result_data)
def decode(data, path="/"):
if data is None:
return 'none'
elif type(data) is datetime:
return data.isoformat()
elif type(data) is tuple:
return {str(data[0]) : decode(data[1], path + str(data[0]) + "/")}
elif type(data) is list:
new_data = []
for item in data:
new_data.append(decode(item, path))
return new_data
elif type(data) is bytes:
return b2h(data)
elif type(data) is dict:
new_data = {}
for key, item in data.items():
new_key = str(key)
if path == '/' and new_key == 'resultData':
new_item = decode_result_data(item)
elif (path == '/resultData/profileInstallationResult/profileInstallationResultData/finalResult/successResult/' \
or path == '/resultData/profileInstallationResult/profileInstallationResultData/finalResult/errorResult/') \
and new_key == 'simaResponse':
new_item = decode_sima_response(item)
else:
new_item = item
new_data[new_key] = decode(new_item, path + new_key + "/")
return new_data
else:
return data
class Es2pApiServerHandlerForLogging(Es2pApiServerHandlerMno):
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
logging.info("ES2+:handleDownloadProgressInfo: %s" % json.dumps(decode(data)))
return {}, None
if __name__ == "__main__":
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
Es2pApiServerMno(args.port, args.host, Es2pApiServerHandlerForLogging(), args.server_cert, args.client_ca_cert)

View File

@@ -126,14 +126,14 @@ class Es9pClient:
if self.opts.iccid:
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid))
if self.opts.operation == 'install':
if self.opts.operation == 'download':
pird = {
'transactionId': h2b(self.opts.transaction_id),
'transactionId': self.opts.transaction_id,
'notificationMetadata': ntf_metadata,
'smdpOid': self.opts.smdpp_oid,
'finalResult': ('successResult', {
'aid': h2b(self.opts.isdp_aid),
'simaResponse': h2b(self.opts.sima_response),
'aid': self.opts.isdp_aid,
'simaResponse': self.opts.sima_response,
}),
}
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)

View File

@@ -42,9 +42,6 @@ case "$JOB_TYPE" in
# Run pySim-shell integration tests (requires physical cards)
python3 -m unittest discover -v -s ./tests/pySim-shell_test/
# Run pySim-smpp2sim test
tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
;;
"distcheck")
virtualenv -p python3 venv --system-site-packages

View File

@@ -107,7 +107,7 @@ parser_esrv.add_argument('--output-file', required=True, help='Output file name'
parser_esrv.add_argument('--add-flag', default=[], choices=esrv_flag_choices, action='append', help='Add flag to mandatory services list')
parser_esrv.add_argument('--remove-flag', default=[], choices=esrv_flag_choices, action='append', help='Remove flag from mandatory services list')
parser_tree = subparsers.add_parser('tree', help='Display the filesystem tree')
parser_info = subparsers.add_parser('tree', help='Display the filesystem tree')
def write_pes(pes: ProfileElementSequence, output_file:str):
"""write the PE sequence to a file"""

View File

@@ -1,239 +0,0 @@
#!/usr/bin/env python3
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Harald Welte, Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import logging
import smpplib.gsm
import smpplib.client
import smpplib.consts
import time
from pySim.ota import OtaKeyset, OtaDialectSms, OtaAlgoCrypt, OtaAlgoAuth, CNTR_REQ, RC_CC_DS, POR_REQ
from pySim.utils import b2h, h2b, is_hexstr
from pathlib import Path
logger = logging.getLogger(Path(__file__).stem)
class SmppHandler:
client = None
def __init__(self, host: str, port: int,
system_id: str, password: str,
ota_keyset: OtaKeyset, spi: dict, tar: bytes):
"""
Initialize connection to SMPP server and set static OTA SMS-TPDU ciphering parameters
Args:
host : Hostname or IPv4/IPv6 address of the SMPP server
port : TCP Port of the SMPP server
system_id: SMPP System-ID used by ESME (client) to bind
password: SMPP Password used by ESME (client) to bind
ota_keyset: OTA keyset to be used for SMS-TPDU ciphering
spi: Security Parameter Indicator (SPI) to be used for SMS-TPDU ciphering
tar: Toolkit Application Reference (TAR) of the targeted card application
"""
# Create and connect SMPP client
client = smpplib.client.Client(host, port, allow_unknown_opt_params=True)
client.set_message_sent_handler(self.message_sent_handler)
client.set_message_received_handler(self.message_received_handler)
client.connect()
client.bind_transceiver(system_id=system_id, password=password)
self.client = client
# Setup static OTA parameters
self.ota_dialect = OtaDialectSms()
self.ota_keyset = ota_keyset
self.tar = tar
self.spi = spi
def __del__(self):
if self.client:
self.client.unbind()
self.client.disconnect()
def message_received_handler(self, pdu):
if pdu.short_message:
logger.info("SMS-TPDU received: %s", b2h(pdu.short_message))
try:
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
except ValueError:
# Retry to decoding with ciphering disabled (in case the card has problems to decode the SMS-TDPU
# we have sent, the response will contain an unencrypted error message)
spi = self.spi.copy()
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
logger.info("SMS-TPDU decoded: %s", dec)
self.response = dec
return None
def message_sent_handler(self, pdu):
logger.debug("SMS-TPDU sent: pdu_sequence=%s pdu_message_id=%s", pdu.sequence, pdu.message_id)
def transceive_sms_tpdu(self, tpdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple:
"""
Transceive SMS-TPDU. This method sends the SMS-TPDU to the SMPP server, and waits for a response. The method
returns when the response is received.
Args:
tpdu : short message content (plaintext)
src_addr : short message source address
dest_addr : short message destination address
timeout : timeout after which this method should give up waiting for a response
Returns:
tuple containing the response (plaintext)
"""
logger.info("SMS-TPDU sending: %s...", b2h(tpdu))
self.client.send_message(
# TODO: add parameters to switch source_addr_ton and dest_addr_ton between SMPP_TON_INTL and SMPP_NPI_ISDN
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
source_addr=src_addr,
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
destination_addr=dest_addr,
short_message=tpdu,
# TODO: add parameters to set data_coding and esm_class
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
protocol_id=0x7f,
# TODO: add parameter to use registered delivery
# registered_delivery=True,
)
logger.info("SMS-TPDU sent, waiting for response...")
timestamp_sent=int(time.time())
self.response = None
while self.response is None:
self.client.poll()
if int(time.time()) - timestamp_sent > timeout:
raise ValueError("Timeout reached, no response SMS-TPDU received!")
return self.response
def transceive_apdu(self, apdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple[bytes, bytes]:
"""
Transceive APDU. This method wraps the given APDU into an SMS-TPDU, sends it to the SMPP server and waits for
the response. When the response is received, the last response data and the last status word is extracted from
the response and returned to the caller.
Args:
apdu : one or more concatenated APDUs
src_addr : short message source address
dest_addr : short message destination address
timeout : timeout after which this method should give up waiting for a response
Returns:
tuple containing the last response data and the last status word as byte strings
"""
logger.info("C-APDU sending: %s...", b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
response = self.transceive_sms_tpdu(tpdu, src_addr, dest_addr, timeout)
# Extract last_response_data and last_status_word from the response
sw = None
resp = None
for container in response:
if container:
container_dict = dict(container)
resp = container_dict.get('last_response_data')
sw = container_dict.get('last_status_word')
if resp is None:
raise ValueError("Response does not contain any last_response_data, no R-APDU received!")
if sw is None:
raise ValueError("Response does not contain any last_status_word, no R-APDU received!")
logger.info("R-APDU received: %s %s", resp, sw)
return h2b(resp), h2b(sw)
if __name__ == '__main__':
option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
option_parser.add_argument("--host", help="Host/IP of the SMPP server", default="localhost")
option_parser.add_argument("--port", help="TCP port of the SMPP server", default=2775, type=int)
option_parser.add_argument("--system-id", help="System ID to use to bind to the SMPP server", default="test")
option_parser.add_argument("--password", help="Password to use to bind to the SMPP server", default="test")
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
algo_crypt_choices = []
algo_crypt_classes = OtaAlgoCrypt.__subclasses__()
for cls in algo_crypt_classes:
algo_crypt_choices.append(cls.enum_name)
option_parser.add_argument("--algo-crypt", choices=algo_crypt_choices, default='triple_des_cbc2',
help="OTA crypt algorithm")
algo_auth_choices = []
algo_auth_classes = OtaAlgoAuth.__subclasses__()
for cls in algo_auth_classes:
algo_auth_choices.append(cls.enum_name)
option_parser.add_argument("--algo-auth", choices=algo_auth_choices, default='triple_des_cbc2',
help="OTA auth algorithm")
option_parser.add_argument('--kic', required=True, type=is_hexstr, help='OTA key (KIC)')
option_parser.add_argument('--kic_idx', default=1, type=int, help='OTA key index (KIC)')
option_parser.add_argument('--kid', required=True, type=is_hexstr, help='OTA key (KID)')
option_parser.add_argument('--kid_idx', default=1, type=int, help='OTA key index (KID)')
option_parser.add_argument('--cntr', default=0, type=int, help='replay protection counter')
option_parser.add_argument('--tar', required=True, type=is_hexstr, help='Toolkit Application Reference')
option_parser.add_argument("--cntr_req", choices=CNTR_REQ.decmapping.values(), default='no_counter',
help="Counter requirement")
option_parser.add_argument('--no-ciphering', action='store_true', default=False, help='Disable ciphering')
option_parser.add_argument("--rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
help="message check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
option_parser.add_argument('--por-in-submit', action='store_true', default=False,
help='require PoR to be sent via SMS-SUBMIT')
option_parser.add_argument('--por-no-ciphering', action='store_true', default=False, help='Disable ciphering (PoR)')
option_parser.add_argument("--por-rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
help="PoR check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
option_parser.add_argument("--por_req", choices=POR_REQ.decmapping.values(), default='por_required',
help="Proof of Receipt requirements")
option_parser.add_argument('--src-addr', default='12', type=str, help='SMS source address (MSISDN)')
option_parser.add_argument('--dest-addr', default='23', type=str, help='SMS destination address (MSISDN)')
option_parser.add_argument('--timeout', default=10, type=int, help='Maximum response waiting time')
option_parser.add_argument('-a', '--apdu', action='append', required=True, type=is_hexstr, help='C-APDU to send')
opts = option_parser.parse_args()
logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
if opts.kic_idx != opts.kid_idx:
logger.warning("KIC index (%s) and KID index (%s) are different (security violation, card should reject message)",
opts.kic_idx, opts.kid_idx)
ota_keyset = OtaKeyset(algo_crypt=opts.algo_crypt,
kic_idx=opts.kic_idx,
kic=h2b(opts.kic),
algo_auth=opts.algo_auth,
kid_idx=opts.kid_idx,
kid=h2b(opts.kid),
cntr=opts.cntr)
spi = {'counter' : opts.cntr_req,
'ciphering' : not opts.no_ciphering,
'rc_cc_ds': opts.rc_cc_ds,
'por_in_submit': opts.por_in_submit,
'por_shall_be_ciphered': not opts.por_no_ciphering,
'por_rc_cc_ds': opts.por_rc_cc_ds,
'por': opts.por_req}
apdu = h2b("".join(opts.apdu))
smpp_handler = SmppHandler(opts.host, opts.port, opts.system_id, opts.password, ota_keyset, spi, h2b(opts.tar))
resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr, opts.dest_addr, opts.timeout)
print("%s %s" % (b2h(resp), b2h(sw)))

View File

@@ -48,48 +48,39 @@ must be made available on multiple sites, the `CardKeyProviderPgsql` is the
better option.
The CardKeyProviderPgsql
The CardKeyProviderPqsql
------------------------
With the `CardKeyProviderPgsql` you can use a PostgreSQL database as storage
With the `CardKeyProviderPsql` you can use a PostgreSQL database as storage
medium. The implementation comes with a CSV importer tool that consumes the
same CSV files you would normally use with the `CardKeyProviderCsv`, so you
can just use your existing CSV files and import them into the database.
Requirements
^^^^^^^^^^^^
The `CardKeyProviderPgsql` uses the `Psycopg` PostgreSQL database adapter
(https://www.psycopg.org). `Psycopg` is not part of the default requirements
of pySim-shell and must be installed separately. `Psycopg` is available as
Python package under the name `psycopg2-binary`.
Setting up the database
^^^^^^^^^^^^^^^^^^^^^^^
From the perspective of the database, the `CardKeyProviderPgsql` has only
From the perspective of the database, the `CardKeyProviderPsql` has only
minimal requirements. You do not have to create any tables in advance. An empty
database and at least one user that may create, alter and insert into tables is
sufficient. However, for increased reliability and as a protection against
incorrect operation, the `CardKeyProviderPgsql` supports a hierarchical model
incorrect operation, the `CardKeyProviderPsql` supports a hierarchical model
with three users (or roles):
* **admin**:
This should be the owner of the database. It is intended to be used for
administrative tasks like adding new tables or adding new columns to existing
tables. This user should not be used to insert new data into tables or to access
data from within pySim-shell using the `CardKeyProviderPgsql`
data from within pySim-shell using the `CardKeyProviderPsql`
* **importer**:
This user is used when feeding new data into an existing table. It should only
be able to insert new rows into existing tables. It should not be used for
administrative tasks or to access data from within pySim-shell using the
`CardKeyProviderPgsql`
`CardKeyProviderPsql`
* **reader**:
To access data from within pySim shell using the `CardKeyProviderPgsql` the
To access data from within pySim shell using the `CardKeyProviderPsql` the
reader user is the correct one to use. This user should have no write access
to the database or any of the tables.
@@ -97,7 +88,7 @@ with three users (or roles):
Creating a config file
^^^^^^^^^^^^^^^^^^^^^^
The default location for the config file is `~/.osmocom/pysim/card_data_pgsql.cfg`
The default location for the config file is `~/.osmocom/pysim/card_data_pqsql.cfg`
The file uses `yaml` syntax and should look like the example below:
::
@@ -129,9 +120,9 @@ passwords for each of the aforementioned users (or roles). In case only a single
admin user is used, all three entries may be populated with the same user name
and password (not recommended)
The field `table_names` sets the tables that the `CardKeyProviderPgsql` shall
The field `table_names` sets the tables that the `CardKeyProviderPsql` shall
use to query to locate card key data. You can set up as many tables as you
want, `CardKeyProviderPgsql` will query them in order, one by one until a
want, `CardKeyProviderPsql` will query them in order, one by one until a
matching entry is found.
NOTE: In case you do not want to disclose the admin and the importer credentials
@@ -173,7 +164,7 @@ with the `admin` user. The `admin` user is selected using the `--admin` switch.
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys --create-table --admin
INFO: CSV file: ./csv-to-pgsql_example_01.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_admin_user
@@ -191,7 +182,7 @@ now ready to be filled with data.
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys
INFO: CSV file: ./csv-to-pgsql_example_01.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_importer_user
@@ -223,7 +214,7 @@ creating new tables, this operation also requires admin privileges, so the
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys --update-columns --admin
INFO: CSV file: ./csv-to-pgsql_example_02.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_admin_user
@@ -240,7 +231,7 @@ first one:
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys
INFO: CSV file: ./csv-to-pgsql_example_02.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_importer_user

View File

@@ -44,7 +44,6 @@ from pySim.exceptions import SwMatchError
from pySim.legacy.cards import card_detect, SimCard, UsimCard, IsimCard
from pySim.utils import dec_imsi, dec_iccid
from pySim.legacy.utils import format_xplmn_w_act, dec_st, dec_msisdn
from pySim.ts_51_011 import EF_SMSP
option_parser = argparse.ArgumentParser(description='Legacy tool for reading some parts of a SIM card',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@@ -142,15 +141,6 @@ if __name__ == '__main__':
(res, sw) = card.read_record('SMSP', 1)
if sw == '9000':
print("SMSP: %s" % (res,))
ef_smsp = EF_SMSP()
smsc_a = ef_smsp.decode_record_bin(h2b(res), 1).get('tp_sc_addr', {})
smsc_n = smsc_a.get('call_number', None)
if smsc_a.get('ton_npi', {}).get('type_of_number', None) == 'international' and smsc_n is not None:
smsc = '+' + smsc_n
else:
smsc = smsc_n
if smsc is not None:
print("SMSC: %s" % (smsc,))
else:
print("SMSP: Can't read, response code = %s" % (sw,))

View File

@@ -74,7 +74,7 @@ from pySim.card_key_provider import card_key_provider_register, card_key_provide
from pySim.app import init_card
log = PySimLogger.get(Path(__file__).stem)
log = PySimLogger.get("main")
class Cmd2Compat(cmd2.Cmd):
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
@@ -519,17 +519,8 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
@cmd2.with_category(CUSTOM_CATEGORY)
def do_version(self, opts):
"""Print the pySim software version."""
from importlib.metadata import version as vsn
self.poutput("pyosmocom " + vsn('pyosmocom'))
import os
cwd = os.path.dirname(os.path.realpath(__file__))
if os.path.isdir(os.path.join(cwd, ".git")):
import subprocess
url = subprocess.check_output(['git', 'config', '--get', 'remote.origin.url']).decode('ascii').strip()
version = subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=cwd).decode('ascii').strip()
self.poutput(os.path.basename(url) + " " + version)
else:
self.poutput("pySim " + vsn('pySim'))
import pkg_resources
self.poutput(pkg_resources.get_distribution('pySim'))
@with_default_category('pySim Commands')
class PySimCommands(CommandSet):
@@ -1147,10 +1138,10 @@ global_group.add_argument("--verbose", help="Enable verbose logging",
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
card_key_group.add_argument('--csv', metavar='FILE',
default="~/.osmocom/pysim/card_data.csv",
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
help='Read card data from CSV file')
card_key_group.add_argument('--pgsql', metavar='FILE',
default="~/.osmocom/pysim/card_data_pgsql.cfg",
card_key_group.add_argument('--pqsql', metavar='FILE',
default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
help='Read card data from PostgreSQL database (config file)')
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help=argparse.SUPPRESS, dest='column_key')
@@ -1176,7 +1167,7 @@ if __name__ == '__main__':
# Ensure that we are able to print formatted warnings from the beginning.
PySimLogger.setup(print, {logging.WARN: YELLOW})
if opts.verbose:
if (opts.verbose):
PySimLogger.set_verbose(True)
PySimLogger.set_level(logging.DEBUG)
else:
@@ -1189,10 +1180,10 @@ if __name__ == '__main__':
for par in opts.column_key:
name, key = par.split(':')
column_keys[name] = key
if os.path.isfile(os.path.expanduser(opts.csv)):
card_key_provider_register(CardKeyProviderCsv(os.path.expanduser(opts.csv), column_keys))
if os.path.isfile(os.path.expanduser(opts.pgsql)):
card_key_provider_register(CardKeyProviderPgsql(os.path.expanduser(opts.pgsql), column_keys))
if os.path.isfile(opts.csv):
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
if os.path.isfile(opts.pqsql):
card_key_provider_register(CardKeyProviderPgsql(opts.pqsql, column_keys))
# Init card reader driver
sl = init_reader(opts, proactive_handler = Proact())

View File

@@ -23,7 +23,6 @@ from pySim.apdu_source.gsmtap import GsmtapApduSource
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive
from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap
from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource
from pySim.apdu_source.stdin_hex import StdinHexApduSource
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
@@ -191,10 +190,6 @@ parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
help='Name of the log file to be read')
parser_stdin_hex = subparsers.add_parser('stdin-hex', help="""
Read APDUs as hex-string from stdin.""")
if __name__ == '__main__':
opts = option_parser.parse_args()
@@ -210,8 +205,6 @@ if __name__ == '__main__':
s = PysharkGsmtapPcap(opts.pcap_file)
elif opts.source == 'tca-loader-log':
s = TcaLoaderLogApduSource(opts.log_file)
elif opts.source == 'stdin-hex':
s = StdinHexApduSource()
else:
raise ValueError("unsupported source %s", opts.source)

View File

@@ -84,5 +84,5 @@ class PysharkGsmtapPcap(_PysharkGsmtap):
Args:
pcap_filename: File name of the pcap file to be opened
"""
pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='gsm_sim || iso7816.atr', use_json=True, keep_packets=False)
pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='gsm_sim', use_json=True, keep_packets=False)
super().__init__(pyshark_inst)

View File

@@ -1,39 +0,0 @@
# coding=utf-8
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.utils import h2b
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_102_222 import ApduCommands as UiccAdmApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
from . import ApduSource, PacketType, CardReset
ApduCommands = UiccApduCommands + UiccAdmApduCommands + UsimApduCommands + GpApduCommands
class StdinHexApduSource(ApduSource):
"""ApduSource for reading apdu hex-strings from stdin."""
def read_packet(self) -> PacketType:
while True:
command = input("C-APDU >")
if len(command) == 0:
continue
response = '9000'
return ApduCommands.parse_cmd_bytes(h2b(command) + h2b(response))

View File

@@ -7,7 +7,7 @@ there are also automatic card feeders.
"""
#
# (C) 2019 by sysmocom - s.f.m.c. GmbH
# (C) 2019 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# This program is free software: you can redistribute it and/or modify

View File

@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
operation with pySim-shell.
"""
# (C) 2021-2025 by sysmocom - s.f.m.c. GmbH
# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier, Harald Welte
@@ -37,8 +37,10 @@ import abc
import csv
import logging
import yaml
import psycopg2
from psycopg2.sql import Identifier, SQL
log = PySimLogger.get(__name__)
log = PySimLogger.get("CARDKEY")
card_key_providers = [] # type: List['CardKeyProvider']
@@ -58,7 +60,7 @@ class CardKeyFieldCryptor:
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
'SCP03_ISDA': ['SCP03_ENC_ISDA', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
}
@@ -197,7 +199,6 @@ class CardKeyProviderPgsql(CardKeyProvider):
config_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor)
"""
import psycopg2
log.info("Using SQL database as card key data source: %s" % config_filename)
with open(config_filename, "r") as cfg:
config = yaml.load(cfg, Loader=yaml.FullLoader)
@@ -215,8 +216,6 @@ class CardKeyProviderPgsql(CardKeyProvider):
self.crypt = CardKeyFieldCryptor(transport_keys)
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
import psycopg2
from psycopg2.sql import Identifier, SQL
db_result = None
for t in self.tables:
self.conn.rollback()

View File

@@ -54,8 +54,6 @@ def compile_asn1_subdir(subdir_name:str, codec='der'):
__ver = sys.version_info
if (__ver.major, __ver.minor) >= (3, 9):
for i in resources.files('pySim.esim').joinpath('asn1').joinpath(subdir_name).iterdir():
if not i.name.endswith('.asn'):
continue
asn_txt += i.read_text()
asn_txt += "\n"
#else:

View File

@@ -27,7 +27,7 @@ logger.setLevel(logging.DEBUG)
class param:
class Iccid(ApiParamString):
"""String representation of 18 to 20 digits, where the 20th digit MAY optionally be the padding
"""String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding
character F."""
@classmethod
def _encode(cls, data):
@@ -40,7 +40,7 @@ class param:
@classmethod
def verify_encoded(cls, data):
if len(data) not in (18, 19, 20):
if len(data) not in [19, 20]:
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
@classmethod
@@ -53,7 +53,7 @@ class param:
@classmethod
def verify_decoded(cls, data):
data = str(data)
if len(data) not in (18, 19, 20):
if len(data) not in [19, 20]:
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
if len(data) == 19:
decimal_part = data

View File

@@ -149,8 +149,7 @@ class ApiError(Exception):
'message': None,
}
actual_sec = func_ex_status.get('statusCodeData', None)
if actual_sec:
sec.update(actual_sec)
sec.update(actual_sec)
self.subject_code = sec['subjectCode']
self.reason_code = sec['reasonCode']
self.subject_id = sec['subjectIdentifier']
@@ -256,10 +255,5 @@ class JsonHttpApiFunction(abc.ABC):
raise HttpHeaderError(response)
if response.content:
if response.headers.get('Content-Type').startswith('application/json'):
return self.decode(response.json())
elif response.headers.get('Content-Type').startswith('text/plain;charset=UTF-8'):
return { 'data': response.content.decode('utf-8') }
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
return self.decode(response.json())
return None

View File

@@ -21,8 +21,6 @@ import io
import os
from typing import Tuple, List, Optional, Dict, Union
from collections import OrderedDict
from difflib import SequenceMatcher, Match
import asn1tools
import zipfile
from pySim import javacard
@@ -46,29 +44,6 @@ asn1 = compile_asn1_subdir('saip')
logger = logging.getLogger(__name__)
class NonMatch(Match):
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
@classmethod
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
The caller must ensure that the input list is ordered, non-overlapping and only contains
matches at equal offsets in a and b."""
res = []
cur = 0
for match in l:
if match.a != match.b:
raise ValueError('only works for equal-offset matches')
assert match.a >= cur
nm_len = match.a - cur
if nm_len > 0:
# there's no point in generating zero-lenth non-matching sections
res.append(cls(a=cur, b=cur, size=nm_len))
cur = match.a + match.size
if size > cur:
res.append(cls(a=cur, b=cur, size=size-cur))
return res
class Naa:
"""A class defining a Network Access Application (NAA)"""
name = None
@@ -169,9 +144,6 @@ class File:
def file_size(self) -> Optional[int]:
"""Return the size of the file in bytes."""
if self.file_type in ['LF', 'CY']:
if self._file_size and self.nb_rec is None and self.rec_len:
self.nb_rec = self._file_size // self.rec_len
return self.nb_rec * self.rec_len
elif self.file_type in ['TR', 'BT']:
return self._file_size
@@ -319,10 +291,6 @@ class File:
dfName = fileDescriptor.get('dfName', None)
if dfName:
self.df_name = dfName
efFileSize = fileDescriptor.get('efFileSize', None)
if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
pefi = fileDescriptor.get('proprietaryEFInfo', {})
securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None)
if securityAttributesReferenced:
@@ -332,11 +300,13 @@ class File:
fdb_dec = fd_dec['file_descriptor_byte']
self.shareable = fdb_dec['shareable']
if fdb_dec['file_type'] == 'working_ef':
efFileSize = fileDescriptor.get('efFileSize', None)
if fd_dec['num_of_rec']:
self.nb_rec = fd_dec['num_of_rec']
if fd_dec['record_len']:
self.rec_len = fd_dec['record_len']
if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
if self.rec_len and self.nb_rec == None:
# compute the number of records from file size and record length
self.nb_rec = self._file_size // self.rec_len
@@ -436,40 +406,12 @@ class File:
return ValueError("Unknown key '%s' in tuple list" % k)
return stream.getvalue()
def file_content_to_tuples(self, optimize:bool = False) -> List[Tuple]:
"""Encode the file contents into a list of fillFileContent / fillFileOffset tuples that can be fed
into the asn.1 encoder. If optimize is True, it will try to encode only the differences from the
fillFileContent of the profile template. Otherwise, the entire file contents will be encoded
as-is."""
if not self.file_type in ['TR', 'LF', 'CY', 'BT']:
return []
if not optimize:
# simplistic approach: encode the full file, ignoring the template/default
return [('fillFileContent', self.body)]
# Try to 'compress' the file body, based on the default file contents.
if self.template:
default = self.template.expand_default_value_pattern(length=len(self.body))
if not default:
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
else:
if default == self.body:
# 100% match: return an empty tuple list to make eUICC use the default
return []
sm = SequenceMatcher(a=default, b=self.body)
else:
# no template at all: we can only remove padding
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
matching_blocks = sm.get_matching_blocks()
# we can only make use of matches that have the same offset in 'a' and 'b'
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
ret = []
cur = 0
for block in non_matching_blocks:
ret.append(('fillFileOffset', block.a - cur))
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
cur += block.size
return ret
def file_content_to_tuples(self) -> List[Tuple]:
# FIXME: simplistic approach. needs optimization. We should first check if the content
# matches the expanded default value from the template. If it does, return empty list.
# Next, we should compute the diff between the default value and self.body, and encode
# that as a sequence of fillFileOffset and fillFileContent tuples.
return [('fillFileContent', self.body)]
def __str__(self) -> str:
return "File(%s)" % self.pe_name
@@ -691,15 +633,8 @@ class FsProfileElement(ProfileElement):
self.pe_sequence.cur_df = pe_df
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
def file2pe(self, file: File):
"""Update the "decoded" member for the given file with the contents from the given File instance.
We expect that the File instance is part of self.files"""
if self.files[file.pe_name] != file:
raise ValueError("The file you passed is not part of this ProfileElement")
self.decoded[file.pe_name] = file.to_tuples()
def files2pe(self):
"""Update the "decoded" member for each file with the contents of the "files" member."""
"""Update the "decoded" member with the contents of the "files" member."""
for k, f in self.files.items():
self.decoded[k] = f.to_tuples()
@@ -1071,13 +1006,6 @@ class SecurityDomainKey:
'keyVersionNumber': bytes([self.key_version_number]),
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
def get_key_component(self, key_type):
for kc in self.key_components:
if kc.key_type == key_type:
return kc.key_data
return None
class ProfileElementSD(ProfileElement):
"""Class representing a securityDomain ProfileElement."""
type = 'securityDomain'
@@ -1092,7 +1020,6 @@ class ProfileElementSD(ProfileElement):
def __init__(self, decoded: Optional[dict] = None, **kwargs):
super().__init__(decoded, **kwargs)
if decoded:
self._post_decode()
return
# provide some reasonable defaults for a MNO-SD
self.decoded['instance'] = {
@@ -1811,7 +1738,8 @@ class ProfileElementSequence:
del hdr.decoded['eUICC-Mandatory-services'][service]
# remove any associated mandatory filesystem templates
for template in naa.templates:
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
# determine the ADF names (AIDs) of all NAA ADFs
naa_adf_names = []
if naa.pe_types[0] in self.pe_by_type:
@@ -1854,7 +1782,7 @@ class ProfileElementSequence:
return None
@staticmethod
def peclass_for_path(path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]:
def peclass_for_path(path: Path) -> Optional[ProfileElement]:
"""Return the ProfileElement class that can contain a file with given path."""
naa = ProfileElementSequence.naa_for_path(path)
if naa:
@@ -1887,7 +1815,7 @@ class ProfileElementSequence:
return ProfileElementTelecom, ft
return ProfileElementGFM, None
def pe_for_path(self, path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]:
def pe_for_path(self, path: Path) -> Optional[ProfileElement]:
"""Return the ProfileElement instance that can contain a file with matching path. This will
either be an existing PE within the sequence, or it will be a newly-allocated PE that is
inserted into the sequence."""
@@ -1953,10 +1881,7 @@ class ProfileElementSequence:
class FsNode:
"""A node in the filesystem hierarchy. Each node can have a parent node and any number of children.
Each node is identified uniquely within the parent by its numeric FID and its optional human-readable
name. Each node usually is associated with an instance of the File class for the actual content of
the file. FsNode is the base class used by more specific nodes, such as FsNode{EF,DF,ADF,MF}."""
"""A node in the filesystem hierarchy."""
def __init__(self, fid: int, parent: Optional['FsNode'], file: Optional[File] = None,
name: Optional[str] = None):
self.fid = fid
@@ -2011,7 +1936,7 @@ class FsNode:
return x
def walk(self, fn, **kwargs):
"""call 'fn(self, ``**kwargs``) for the File."""
"""call 'fn(self, **kwargs) for the File."""
return [fn(self, **kwargs)]
class FsNodeEF(FsNode):
@@ -2101,7 +2026,7 @@ class FsNodeDF(FsNode):
return cur
def walk(self, fn, **kwargs):
"""call 'fn(self, ``**kwargs``) for the DF and recursively for all children."""
"""call 'fn(self, **kwargs) for the DF and recursively for all children."""
ret = super().walk(fn, **kwargs)
for c in self.children.values():
ret += c.walk(fn, **kwargs)

View File

@@ -1,361 +0,0 @@
"""Implementation of Personalization of eSIM profiles in SimAlliance/TCA Interoperable Profile:
Run a batch of N personalizations"""
# (C) 2025-2026 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: nhofmeyr@sysmocom.de
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import pprint
from typing import List, Generator
from pySim.esim.saip.personalization import ConfigurableParameter
from pySim.esim.saip import param_source
from pySim.esim.saip import ProfileElementSequence, ProfileElementSD
from pySim.global_platform import KeyUsageQualifier
from osmocom.utils import b2h
class BatchPersonalization:
"""Produce a series of eSIM profiles from predefined parameters.
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
Usage example:
der_input = some_file.open('rb').read()
pes = ProfileElementSequence.from_der(der_input)
p = pers.BatchPersonalization(
n=10,
src_pes=pes,
csv_rows=get_csv_reader())
p.add_param_and_src(
personalization.Iccid(),
param_source.IncDigitSource(
num_digits=18,
first_value=123456789012340001,
last_value=123456789012340010))
# add more parameters here, using ConfigurableParameter and ParamSource subclass instances to define the profile
# ...
# generate all 10 profiles (from n=10 above)
for result_pes in p.generate_profiles():
upp = result_pes.to_der()
store_upp(upp)
"""
class ParamAndSrc:
'tie a ConfigurableParameter to a source of actual values'
def __init__(self, param_cls: ConfigurableParameter, src: param_source.ParamSource):
self.param_cls = param_cls
self.src = src
def __init__(self,
n: int,
src_pes: ProfileElementSequence,
params: list[ParamAndSrc]=[],
csv_rows: Generator=None,
):
"""
n: number of eSIM profiles to generate.
src_pes: a decoded eSIM profile as ProfileElementSequence, to serve as template. This is not modified, only
copied.
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
profile values.
csv_rows: A generator (or iter(list_of_rows)) producing all CSV rows one at a time, starting with a row
containing the column headers. This is compatible with the python csv.reader. Each row gets passed to
ParamSource.get_next(), such that ParamSource implementations can access the row items. See
param_source.CsvSource.
"""
self.n = n
self.params = params or []
self.src_pes = src_pes
self.csv_rows = csv_rows
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
if isinstance(param, type):
param_cls = param
else:
param_cls = param.__class__
self.params.append(BatchPersonalization.ParamAndSrc(param_cls=param_cls, src=src))
def generate_profiles(self):
# get first row of CSV: column names
csv_columns = None
if self.csv_rows:
try:
csv_columns = next(self.csv_rows)
except StopIteration as e:
raise ValueError('the input CSV file appears to be empty') from e
for i in range(self.n):
csv_row = None
if self.csv_rows and csv_columns:
try:
csv_row_list = next(self.csv_rows)
except StopIteration as e:
raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e
csv_row = dict(zip(csv_columns, csv_row_list))
pes = copy.deepcopy(self.src_pes)
for p in self.params:
try:
input_value = p.src.get_next(csv_row=csv_row)
assert input_value is not None
value = p.param_cls.validate_val(input_value)
p.param_cls.apply_val(pes, value)
except Exception as e:
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e
yield pes
class UppAudit(dict):
"""
Key-value pairs collected from a single UPP DER or PES.
UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP.
"""
@classmethod
def from_der(cls, der: bytes, params: List, der_size=False, additional_sd_keys=False):
'''return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
both 'IMSI' and 'IMSI-ACC' parameters.
e.g.
UppAudit.from_der(my_der, [Imsi, ])
--> {'IMSI': '001010000000023', 'IMSI-ACC': '5'}
(where 'IMSI' == Imsi.name)
Read all parameters listed in params. params is a list of either ConfigurableParameter classes or
ConfigurableParameter class instances. This calls only classmethods, so each entry in params can either be the
class itself, or a class-instance of, a (non-abstract) ConfigurableParameter subclass.
For example, params = [Imsi, ] is equivalent to params = [Imsi(), ].
For der_size=True, also include a {'der_size':12345} entry.
For additional_sd_keys=True, output also all Security Domain KVN that there are *no* ConfigurableParameter
subclasses for. For example, SCP80 has reserved kvn 0x01..0x0f, but we offer only Scp80Kvn01, Scp80Kvn02,
Scp80Kvn03. So we would not show kvn 0x04..0x0f in an audit. additional_sd_keys=True includes audits of all SD
key KVN there may be in the UPP. This helps to spot SD keys that may already be present in a UPP template, with
unexpected / unusual kvn.
'''
# make an instance of this class
upp_audit = cls()
if der_size:
upp_audit['der_size'] = set((len(der), ))
pes = ProfileElementSequence.from_der(der)
for param in params:
try:
for valdict in param.get_values_from_pes(pes):
upp_audit.add_values(valdict)
except Exception as e:
raise ValueError(f'Error during audit for parameter {param}: {e}') from e
if not additional_sd_keys:
return upp_audit
# additional_sd_keys
for pe in pes.pe_list:
if pe.type != 'securityDomain':
continue
assert isinstance(pe, ProfileElementSD)
for key in pe.keys:
audit_key = f'SdKey_KVN{key.key_version_number:02x}_ID{key.key_identifier:02x}'
kuq_bin = KeyUsageQualifier.build(key.key_usage_qualifier).hex()
audit_val = f'{key.key_components=!r} key_usage_qualifier=0x{kuq_bin}={key.key_usage_qualifier!r}'
upp_audit[audit_key] = set((audit_val, ))
return upp_audit
def get_single_val(self, key, validate=True, allow_absent=False, absent_val=None):
"""
Return the audit's value for the given audit key (like 'IMSI' or 'IMSI-ACC').
Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value,
return that value. When they do not agree, raise a ValueError.
"""
# key should be a string, but if someone passes a ConfigurableParameter, just use its default name
if ConfigurableParameter.is_super_of(key):
key = key.get_name()
assert isinstance(key, str)
v = self.get(key)
if v is None and allow_absent:
return absent_val
if not isinstance(v, set):
raise ValueError(f'audit value should be a set(), got {v!r}')
if len(v) != 1:
raise ValueError(f'expected a single value for {key}, got {v!r}')
v = tuple(v)[0]
return v
@staticmethod
def audit_val_to_str(v):
"""
Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values,
audit values are always python sets. Turn it into a nice string representation: only the value when it is
unambiguous, otherwise a list of the ambiguous values.
A value may also be completely absent, then return 'not present'.
"""
def try_single_val(w):
'change single-entry sets to just the single value'
if isinstance(w, set):
if len(w) == 1:
return tuple(w)[0]
if len(w) == 0:
return None
return w
v = try_single_val(v)
if isinstance(v, bytes):
v = bytes_to_hexstr(v)
if v is None:
return 'not present'
return str(v)
def get_val_str(self, key):
"""Return a string of the value stored for the given key"""
return UppAudit.audit_val_to_str(self.get(key))
def add_values(self, src:dict):
"""self and src are both a dict of sets.
For example from
self == { 'a': set((123,)) }
and
src == { 'a': set((456,)), 'b': set((789,)) }
then after this function call:
self == { 'a': set((123, 456,)), 'b': set((789,)) }
"""
assert isinstance(src, dict)
for key, srcvalset in src.items():
dstvalset = self.get(key)
if dstvalset is None:
dstvalset = set()
self[key] = dstvalset
dstvalset.add(srcvalset)
def __str__(self):
return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys()))
class BatchAudit(list):
"""
Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization.
Produce an output CSV.
Usage example:
ba = BatchAudit(params=(personalization.Iccid, ))
for upp_der in upps:
ba.add_audit(upp_der)
print(ba.summarize())
with open('output.csv', 'wb') as csv_data:
csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='')
csv.writer(csv_str).writerows( ba.to_csv_rows() )
csv_str.flush()
BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances.
"""
def __init__(self, params:List):
assert params
self.params = params
def add_audit(self, upp_der:bytes):
audit = UppAudit.from_der(upp_der, self.params)
self.append(audit)
return audit
def summarize(self):
batch_audit = UppAudit()
audits = self
if len(audits) > 2:
val_sep = ', ..., '
else:
val_sep = ', '
first_audit = None
last_audit = None
if len(audits) >= 1:
first_audit = audits[0]
if len(audits) >= 2:
last_audit = audits[-1]
if first_audit:
if last_audit:
for key in first_audit.keys():
first_val = first_audit.get_val_str(key)
last_val = last_audit.get_val_str(key)
if first_val == last_val:
val = first_val
else:
val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}"
val = val_sep_with_newline.join((first_val, last_val))
batch_audit[key] = val
else:
batch_audit.update(first_audit)
return batch_audit
def to_csv_rows(self, headers=True, sort_key=None):
'''generator that yields all audits' values as rows, useful feed to a csv.writer.'''
columns = set()
for audit in self:
columns.update(audit.keys())
columns = tuple(sorted(columns, key=sort_key))
if headers:
yield columns
for audit in self:
yield (audit.get_single_val(col, allow_absent=True, absent_val="") for col in columns)
def bytes_to_hexstr(b:bytes, sep=''):
return sep.join(f'{x:02x}' for x in b)
def esim_profile_introspect(upp):
pes = ProfileElementSequence.from_der(upp.read())
d = {}
d['upp'] = repr(pes)
def show_bytes_as_hexdump(item):
if isinstance(item, bytes):
return bytes_to_hexstr(item)
if isinstance(item, list):
return list(show_bytes_as_hexdump(i) for i in item)
if isinstance(item, tuple):
return tuple(show_bytes_as_hexdump(i) for i in item)
if isinstance(item, dict):
d = {}
for k, v in item.items():
d[k] = show_bytes_as_hexdump(v)
return d
return item
l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes)
d['pp'] = pprint.pformat(l, width=120)
return d

View File

@@ -1,232 +0,0 @@
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
#
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: nhofmeyr@sysmocom.de
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import secrets
import re
from osmocom.utils import b2h
class ParamSourceExn(Exception):
pass
class ParamSourceExhaustedExn(ParamSourceExn):
pass
class ParamSourceUndefinedExn(ParamSourceExn):
pass
class ParamSource:
"""abstract parameter source. For usage, see personalization.BatchPersonalization."""
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
name = "none"
numeric_base = None # or 10 or 16
def __init__(self, input_val:str):
"""Subclasses must implement a constructor with this signature. The input_val is typically a user entered string
value, and the subclass chooses how to store it; for example ConstantSource() just does self.val = input_val."""
pass
@classmethod
def from_str(cls, s:str):
"""Subclasses implement this:
if a parameter source defines some string input magic, override this function.
For example, a RandomDigitSource derives the number of digits from the string length,
so the user can enter '0000' to get a four digit random number."""
return cls(s)
def get_next(self, csv_row:dict=None):
"""Subclasses implement this: return the next value from the parameter source.
When there are no more values from the source, raise a ParamSourceExhaustedExn.
This default implementation is an empty source."""
raise ParamSourceExhaustedExn()
class ConstantSource(ParamSource):
"""one value for all"""
name = "constant"
def __init__(self, input_val:str):
self.val = input_val
def get_next(self, csv_row:dict=None):
return self.val
class InputExpandingParamSource(ParamSource):
@classmethod
def expand_str(cls, s:str):
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
if "*" not in s:
return s
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
if len(tokens) < 3:
return s
parts = []
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
parts.append(unchanged)
repeat = int(repeat_str)
parts.append(snippet * repeat)
return "".join(parts)
@classmethod
def from_str(cls, s:str):
return cls(cls.expand_str(s))
class DecimalRangeSource(InputExpandingParamSource):
"""abstract: decimal numbers with a value range"""
numeric_base = 10
def __init__(self, num_digits, first_value, last_value):
"""
See also from_str().
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
num_digits: fixed number of digits (possibly with leading zeros) to generate.
first_value, last_value: the decimal range in which to provide digits.
"""
num_digits = int(num_digits)
first_value = int(first_value)
last_value = int(last_value)
assert num_digits > 0
assert first_value <= last_value
self.num_digits = num_digits
self.val_first_last = (first_value, last_value)
def val_to_digit(self, val:int):
return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string
@classmethod
def from_str(cls, s:str):
s = cls.expand_str(s)
if ".." in s:
first_str, last_str = s.split('..')
first_str = first_str.strip()
last_str = last_str.strip()
else:
first_str = s.strip()
last_str = None
first_value = int(first_str)
last_value = int(last_str if last_str is not None else "9" * len(first_str))
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
class RandomSourceMixin:
random_impl = secrets.SystemRandom()
class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
"""return a different sequence of random decimal digits each"""
name = "random decimal digits"
used_keys = set()
def get_next(self, csv_row:dict=None):
# try to generate random digits that are always different from previously produced random bytes
attempts = 10
while True:
val = self.random_impl.randint(*self.val_first_last)
if val in RandomDigitSource.used_keys:
attempts -= 1
if attempts:
continue
RandomDigitSource.used_keys.add(val)
break
return self.val_to_digit(val)
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
"""return a different sequence of random hexadecimal digits each"""
name = "random hexadecimal digits"
numeric_base = 16
used_keys = set()
def __init__(self, num_digits):
"""see from_str()"""
num_digits = int(num_digits)
if num_digits < 1:
raise ValueError("zero number of digits")
# hex digits always come in two
if (num_digits & 1) != 0:
raise ValueError(f"hexadecimal value should have even number of digits, not {num_digits}")
self.num_digits = num_digits
def get_next(self, csv_row:dict=None):
# try to generate random bytes that are always different from previously produced random bytes
attempts = 10
while True:
val = self.random_impl.randbytes(self.num_digits // 2)
if val in RandomHexDigitSource.used_keys:
attempts -= 1
if attempts:
continue
RandomHexDigitSource.used_keys.add(val)
break
return b2h(val)
@classmethod
def from_str(cls, s:str):
s = cls.expand_str(s)
return cls(num_digits=len(s.strip()))
class IncDigitSource(DecimalRangeSource):
"""incrementing sequence of digits"""
name = "incrementing decimal digits"
def __init__(self, num_digits, first_value, last_value):
super().__init__(num_digits, first_value, last_value)
self.next_val = None
self.reset()
def reset(self):
"""Restart from the first value of the defined range passed to __init__()."""
self.next_val = self.val_first_last[0]
def get_next(self, csv_row:dict=None):
val = self.next_val
if val is None:
raise ParamSourceExhaustedExn()
returnval = self.val_to_digit(val)
val += 1
if val > self.val_first_last[1]:
self.next_val = None
else:
self.next_val = val
return returnval
class CsvSource(ParamSource):
"""apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)"""
name = "from CSV"
def __init__(self, csv_column):
"""
csv_column: column name indicating the column to use for this parameter.
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
CsvSource picks the column with the name matching csv_column.
"""
self.csv_column = csv_column
def get_next(self, csv_row:dict=None):
val = None
if csv_row:
val = csv_row.get(self.csv_column)
if not val:
raise ParamSourceUndefinedExn(f"no value for CSV column {self.csv_column!r}")
return val

File diff suppressed because it is too large Load Diff

View File

@@ -673,7 +673,7 @@ class FilesUsimDf5GS(ProfileTemplate):
FileTemplate(0x4f06, 'EF.UAC_AIC', 'TR', None, 4, 2, 0x06, None, True, ass_serv=[126]),
FileTemplate(0x4f07, 'EF.SUCI_Calc_Info', 'TR', None, None, 2, 0x07, 'FF...FF', False, ass_serv=[124]),
FileTemplate(0x4f08, 'EF.OPL5G', 'LF', None, 10, 10, 0x08, 'FF...FF', False, ['nb_rec'], ass_serv=[129]),
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130], pe_name='ef-supinai'),
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130]),
FileTemplate(0x4f0a, 'EF.Routing_Indicator', 'TR', None, 4, 2, 0x0a, 'F0FFFFFF', False, ass_serv=[124]),
]
@@ -818,7 +818,7 @@ class FilesIsimOptional(ProfileTemplate):
base_path = Path('ADF.ISIM')
extends = FilesIsimMandatory
files = [
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5], pe_name='ef-pcscf'),
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5]),
FileTemplate(0x6f3c, 'EF.SMS', 'LF', 10, 176, 5, None, '00FF...FF', False, ass_serv=[6,8]),
FileTemplate(0x6f42, 'EF.SMSP', 'LF', 1, 38, 5, None, 'FF...FF', False, ass_serv=[8]),
FileTemplate(0x6f43, 'EF.SMSS', 'TR', None, 2, 5, None, 'FFFF', False, ass_serv=[6,8]),

View File

@@ -103,26 +103,6 @@ class CheckBasicStructure(ProfileConstraintChecker):
if 'profile-a-p256' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs):
raise ProfileError('profile-a-p256 mandatory, but no usim or isim')
def check_mandatory_services_aka(self, pes: ProfileElementSequence):
"""Ensure that no unnecessary authentication related services are marked as mandatory but not
actually used within the profile"""
m_svcs = pes.get_pe_for_type('header').decoded['eUICC-Mandatory-services']
# list of tuples (algo_id, key_len_in_octets) for all the akaParameters in the PE Sequence
algo_id_klen = [(x.decoded['algoConfiguration'][1]['algorithmID'],
len(x.decoded['algoConfiguration'][1]['key'])) for x in pes.get_pes_for_type('akaParameter')]
# just a plain list of algorithm IDs in akaParameters
algorithm_ids = [x[0] for x in algo_id_klen]
if 'milenage' in m_svcs and not 1 in algorithm_ids:
raise ProfileError('milenage mandatory, but no related algorithm_id in akaParameter')
if 'tuak128' in m_svcs and not (2, 128/8) in algo_id_klen:
raise ProfileError('tuak128 mandatory, but no related algorithm_id in akaParameter')
if 'cave' in m_svcs and not pes.get_pe_for_type('cdmaParameter'):
raise ProfileError('cave mandatory, but no related cdmaParameter')
if 'tuak256' in m_svcs and (2, 256/8) in algo_id_klen:
raise ProfileError('tuak256 mandatory, but no related algorithm_id in akaParameter')
if 'usim-test-algorithm' in m_svcs and not 3 in algorithm_ids:
raise ProfileError('usim-test-algorithm mandatory, but no related algorithm_id in akaParameter')
def check_identification_unique(self, pes: ProfileElementSequence):
"""Ensure that each PE has a unique identification value."""
id_list = [pe.header['identification'] for pe in pes.pe_list if pe.header]

View File

@@ -181,7 +181,7 @@ class SeqNumber(BER_TLV_IE, tag=0x80):
class NotificationAddress(BER_TLV_IE, tag=0x0c):
_construct = Utf8Adapter(GreedyBytes)
class Iccid(BER_TLV_IE, tag=0x5a):
_construct = PaddedBcdAdapter(GreedyBytes)
_construct = BcdAdapter(GreedyBytes)
class NotificationMetadata(BER_TLV_IE, tag=0xbf2f, nested=[SeqNumber, ProfileMgmtOperation,
NotificationAddress, Iccid]):
pass

View File

@@ -1,6 +1,6 @@
# GlobalPlatform install parameter generator
#
# (C) 2024 by sysmocom - s.f.m.c. GmbH
# (C) 2024 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# This program is free software: you can redistribute it and/or modify

View File

@@ -91,7 +91,6 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
# Key Usage:
# KVN 0x01 .. 0x0F reserved for SCP80
# KVN 0x81 .. 0x8f reserved for SCP81
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
# KVN 0x20 .. 0x2F reserved for SCP02
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK

View File

@@ -1,6 +1,6 @@
# JavaCard related utilities
#
# (C) 2024 by sysmocom - s.f.m.c. GmbH
# (C) 2024 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# This program is free software: you can redistribute it and/or modify

View File

@@ -4,7 +4,7 @@
"""
#
# (C) 2025 by sysmocom - s.f.m.c. GmbH
# (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier <pmaier@sysmocom.de>
@@ -44,7 +44,7 @@ class PySimLogger:
"""
LOG_FMTSTR = "%(levelname)s: %(message)s"
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- " + LOG_FMTSTR
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
__formatter = logging.Formatter(LOG_FMTSTR)
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
@@ -108,7 +108,7 @@ class PySimLogger:
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
color = PySimLogger.colors.get(record.levelno)
if color:
if isinstance(color, str):
if type(color) is str:
PySimLogger.print_callback(color + formatted_message + "\033[0m")
else:
PySimLogger.print_callback(style(formatted_message, fg = color))

View File

@@ -57,13 +57,12 @@ CompactRemoteResp = Struct('number_of_commands'/Int8ub,
'last_response_data'/HexAdapter(GreedyBytes))
RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
CNTR_REQ = Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, counter_must_be_higher=2, counter_must_be_lower=3)
POR_REQ = Enum(BitsInteger(2), no_por=0, por_required=1, por_only_when_error=2)
# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
SPI = BitStruct( # first octet
Padding(3),
'counter'/CNTR_REQ,
'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
counter_must_be_higher=2, counter_must_be_lower=3),
'ciphering'/Flag,
'rc_cc_ds'/RC_CC_DS,
# second octet
@@ -71,7 +70,8 @@ SPI = BitStruct( # first octet
'por_in_submit'/Flag,
'por_shall_be_ciphered'/Flag,
'por_rc_cc_ds'/RC_CC_DS,
'por'/POR_REQ
'por'/Enum(BitsInteger(2), no_por=0,
por_required=1, por_only_when_error=2)
)
# TS 102 225 Section 5.1.2

View File

@@ -4,7 +4,7 @@
"""
#
# (C) 2021 by sysmocom - s.f.m.c. GmbH
# (C) 2021 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# This program is free software: you can redistribute it and/or modify

View File

@@ -1,5 +1,4 @@
# coding=utf-8
"""Representation of the runtime state of an application like pySim-shell.
"""
@@ -26,7 +25,7 @@ from pySim.exceptions import *
from pySim.filesystem import *
from pySim.log import PySimLogger
log = PySimLogger.get(__name__)
log = PySimLogger.get("RUNTIME")
def lchan_nr_from_cla(cla: int) -> int:
"""Resolve the logical channel number from the CLA byte."""
@@ -116,7 +115,7 @@ class RuntimeState:
for a in aids_unknown:
log.info(" unknown: %s (EF.DIR)" % a)
else:
log.warning("EF.DIR seems to be empty!")
log.warn("EF.DIR seems to be empty!")
# Some card applications may not be registered in EF.DIR, we will actively
# probe for those applications
@@ -557,8 +556,8 @@ class RuntimeLchan:
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
(data_len, writeable_name, writeable_size, data_len - writeable_size))
elif data_len < writeable_size:
log.warning("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
def update_binary(self, data_hex: str, offset: int = 0):
"""Update transparent EF binary data.

View File

@@ -3,6 +3,18 @@
""" pySim: PCSC reader transport link base
"""
import os
import abc
import argparse
from typing import Optional, Tuple
from construct import Construct
from osmocom.utils import b2h, h2b, i2h, Hexstr
from pySim.exceptions import *
from pySim.utils import SwHexstr, SwMatchstr, ResTuple, sw_match, parse_command_apdu
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
#
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
# Copyright (C) 2021-2023 Harald Welte <laforge@osmocom.org>
#
@@ -18,20 +30,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import abc
import argparse
from typing import Optional, Tuple
from construct import Construct
from osmocom.utils import b2h, h2b, i2h, Hexstr
from pySim.exceptions import *
from pySim.utils import SwHexstr, SwMatchstr, ResTuple, sw_match, parse_command_apdu
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
from pySim.log import PySimLogger
log = PySimLogger.get(__name__)
class ApduTracer:
def trace_command(self, cmd):
@@ -46,11 +46,11 @@ class ApduTracer:
class StdoutApduTracer(ApduTracer):
"""Minimalistic APDU tracer, printing commands to stdout."""
def trace_response(self, cmd, sw, resp):
log.info("-> %s %s", cmd[:10], cmd[10:])
log.info("<- %s: %s", sw, resp)
print("-> %s %s" % (cmd[:10], cmd[10:]))
print("<- %s: %s" % (sw, resp))
def trace_reset(self):
log.info("-- RESET")
print("-- RESET")
class ProactiveHandler(abc.ABC):
"""Abstract base class representing the interface of some code that handles
@@ -177,7 +177,7 @@ class LinkBase(abc.ABC):
if self.apdu_strict:
raise ValueError(exeption_str)
else:
log.warning(exeption_str)
print('Warning: %s' % exeption_str)
return (data, sw)
@@ -211,7 +211,7 @@ class LinkBase(abc.ABC):
# parse the proactive command
pcmd = ProactiveCommand()
parsed = pcmd.from_tlv(h2b(fetch_rv[0]))
log.info("FETCH: %s (%s)", fetch_rv[0], type(parsed).__name__)
print("FETCH: %s (%s)" % (fetch_rv[0], type(parsed).__name__))
if self.proactive_handler:
# Extension point: If this does return a list of TLV objects,
# they could be appended after the Result; if the first is a
@@ -361,13 +361,13 @@ def init_reader(opts, **kwargs) -> LinkBase:
from pySim.transport.modem_atcmd import ModemATCommandLink
sl = ModemATCommandLink(opts, **kwargs)
else: # Serial reader is default
log.warning("No reader/driver specified; falling back to default (Serial reader)")
print("No reader/driver specified; falling back to default (Serial reader)")
from pySim.transport.serial import SerialSimLink
sl = SerialSimLink(opts, **kwargs)
if os.environ.get('PYSIM_INTEGRATION_TEST') == "1":
log.info("Using %s reader interface" % (sl.name))
print("Using %s reader interface" % (sl.name))
else:
log.info("Using reader %s" % sl)
print("Using reader %s" % sl)
return sl

View File

@@ -166,7 +166,7 @@ class ModemATCommandLink(LinkBaseTpdu):
# Make sure that the response has format: b'+CSIM: %d,\"%s\"'
try:
result = re.match(rb'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
result = re.match(b'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
(_rsp_tpdu_len, rsp_tpdu) = result.groups()
except Exception as exc:
raise ReaderError('Failed to parse response from modem: %s' % rsp) from exc

View File

@@ -486,17 +486,17 @@ class EF_UST(EF_UServiceTable):
# TS 31.103 Section 4.2.7 - *not* the same as DF.GSM/EF.ECC!
class EF_ECC(LinFixedEF):
_test_de_encode = [
( '19f1ff01', { "call_code": "911",
( '19f1ff01', { "call_code": "911f",
"service_category": { "police": True, "ambulance": False, "fire_brigade": False,
"marine_guard": False, "mountain_rescue": False,
"manual_ecall": False, "automatic_ecall": False } } ),
( '19f3ff02', { "call_code": "913",
( '19f3ff02', { "call_code": "913f",
"service_category": { "police": False, "ambulance": True, "fire_brigade": False,
"marine_guard": False, "mountain_rescue": False,
"manual_ecall": False, "automatic_ecall": False } } ),
]
_test_no_pad = True
cc_construct = PaddedBcdAdapter(Rpad(Bytes(3)))
cc_construct = BcdAdapter(Rpad(Bytes(3)))
category_construct = FlagsEnum(Byte, police=1, ambulance=2, fire_brigade=3, marine_guard=4,
mountain_rescue=5, manual_ecall=6, automatic_ecall=7)
alpha_construct = GsmOrUcs2Adapter(Rpad(GreedyBytes))
@@ -596,7 +596,7 @@ class EF_ICI(CyclicEF):
self._construct = Struct('alpha_id'/Bytes(this._.total_len-28),
'len_of_bcd_contents'/Int8ub,
'ton_npi'/Int8ub,
'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))),
'call_number'/BcdAdapter(Bytes(10)),
'cap_cfg2_record_id'/Int8ub,
'ext5_record_id'/Int8ub,
'date_and_time'/BcdAdapter(Bytes(7)),
@@ -612,7 +612,7 @@ class EF_OCI(CyclicEF):
self._construct = Struct('alpha_id'/Bytes(this._.total_len-27),
'len_of_bcd_contents'/Int8ub,
'ton_npi'/Int8ub,
'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))),
'call_number'/BcdAdapter(Bytes(10)),
'cap_cfg2_record_id'/Int8ub,
'ext5_record_id'/Int8ub,
'date_and_time'/BcdAdapter(Bytes(7)),
@@ -1118,7 +1118,7 @@ class EF_Routing_Indicator(TransparentEF):
# responsibility of home network operator but BCD coding shall be used. If a network
# operator decides to assign less than 4 digits to Routing Indicator, the remaining digits
# shall be coded as "1111" to fill the 4 digits coding of Routing Indicator
self._construct = Struct('routing_indicator'/PaddedBcdAdapter(Rpad(Bytes(2))),
self._construct = Struct('routing_indicator'/Rpad(BcdAdapter(Bytes(2)), 'f', 2),
'rfu'/Bytes(2))
# TS 31.102 Section 4.4.11.13 (Rel 16)

View File

@@ -40,7 +40,6 @@ from osmocom.utils import *
from osmocom.construct import *
from pySim.utils import dec_iccid, enc_iccid, dec_imsi, enc_imsi, dec_plmn, enc_plmn, dec_xplmn_w_act
from pySim.utils import bytes_for_nibbles
from pySim.profile import CardProfile, CardProfileAddon
from pySim.filesystem import *
from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X
@@ -152,7 +151,7 @@ class EF_ADN(LinFixedEF):
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))),
'len_of_bcd'/Int8ub,
'ton_npi'/TonNpi,
'dialing_nr'/ExtendedBcdAdapter(PaddedBcdAdapter(Rpad(Bytes(10)))),
'dialing_nr'/ExtendedBcdAdapter(BcdAdapter(Rpad(Bytes(10)))),
'cap_conf_id'/Int8ub,
ext_name/Int8ub)
@@ -193,11 +192,11 @@ class EF_MSISDN(LinFixedEF):
( 'ffffffffffffffffffffffffffffffffffffffff04b12143f5ffffffffffffffffff',
{"alpha_id": "", "len_of_bcd": 4, "ton_npi": {"ext": True, "type_of_number": "network_specific",
"numbering_plan_id": "isdn_e164"},
"dialing_nr": "12345"}),
"dialing_nr": "12345f"}),
( '456967656e65205275666e756d6d6572ffffffff0891947172199181f3ffffffffff',
{"alpha_id": "Eigene Rufnummer", "len_of_bcd": 8, "ton_npi": {"ext": True, "type_of_number": "international",
"numbering_plan_id": "isdn_e164"},
"dialing_nr": "4917279119183"}),
"dialing_nr": "4917279119183f"}),
]
# Ensure deprecated representations still work
@@ -215,7 +214,7 @@ class EF_MSISDN(LinFixedEF):
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))),
'len_of_bcd'/Int8ub,
'ton_npi'/TonNpi,
'dialing_nr'/ExtendedBcdAdapter(PaddedBcdAdapter(Rpad(Bytes(10)))),
'dialing_nr'/ExtendedBcdAdapter(BcdAdapter(Rpad(Bytes(10)))),
Padding(2, pattern=b'\xff'))
# Maintain compatibility with deprecated representations
@@ -240,20 +239,11 @@ class EF_MSISDN(LinFixedEF):
# TS 51.011 Section 10.5.6
class EF_SMSP(LinFixedEF):
_test_de_encode = [
( '534d5343ffffffffffffffffffffffffe1ffffffffffffffffffffffff0891945197109099f9ffffff0000a9',
{ "alpha_id": "SMSC", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
"tp_pid": True, "tp_dcs": True, "tp_vp": True },
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" },
"call_number": "" },
"tp_sc_addr": { "length": 8, "ton_npi": { "ext": True, "type_of_number": "international",
"numbering_plan_id": "isdn_e164" },
"call_number": "4915790109999" },
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
# FIXME: re-encode fails / missing alpha_id at start of output
_test_decode = [
( '454e6574776f726b73fffffffffffffff1ffffffffffffffffffffffffffffffffffffffffffffffff0000a7',
{ "alpha_id": "ENetworks", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
"tp_pid": True, "tp_dcs": True, "tp_vp": False },
"tp_pid": True, "tp_dcs": True, "tp_vp": True },
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" },
"call_number": "" },
@@ -287,26 +277,12 @@ class EF_SMSP(LinFixedEF):
else:
raise ValueError
@staticmethod
def sc_addr_len(ctx):
"""Compute the length field for an address field (like TP-DestAddr or TP-ScAddr)."""
if not hasattr(ctx, 'call_number') or len(ctx.call_number) == 0:
return 0xff
else:
return bytes_for_nibbles(len(ctx.call_number)) + 1
def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs):
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs)
ScAddr = Struct('length'/Rebuild(Int8ub, lambda ctx: EF_SMSP.sc_addr_len(ctx)),
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
ScAddr = Struct('length'/Int8ub, 'ton_npi'/TonNpi, 'call_number'/BcdAdapter(Rpad(Bytes(10))))
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))),
'parameter_indicators'/InvertAdapter(BitStruct(
Const(7, BitsInteger(3)),
'tp_vp'/Flag,
'tp_dcs'/Flag,
'tp_pid'/Flag,
'tp_sc_addr'/Flag,
'tp_dest_addr'/Flag)),
'parameter_indicators'/InvertAdapter(FlagsEnum(Byte, tp_dest_addr=1, tp_sc_addr=2,
tp_pid=3, tp_dcs=4, tp_vp=5)),
'tp_dest_addr'/ScAddr,
'tp_sc_addr'/ScAddr,
@@ -661,12 +637,12 @@ class EF_AD(TransparentEF):
# TS 51.011 Section 10.3.20 / 10.3.22
class EF_VGCS(TransRecEF):
_test_de_encode = [
( "92f9ffff", "299" ),
( "92f9ffff", "299fffff" ),
]
def __init__(self, fid='6fb1', sfid=None, name='EF.VGCS', size=(4, 200), rec_len=4,
desc='Voice Group Call Service', **kwargs):
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, rec_len=rec_len, **kwargs)
self._construct = PaddedBcdAdapter(Rpad(Bytes(4)))
self._construct = BcdAdapter(Bytes(4))
# TS 51.011 Section 10.3.21 / 10.3.23
class EF_VGCSS(TransparentEF):

View File

@@ -526,13 +526,6 @@ def expand_hex(hexstring, length):
# no change
return hexstring
def bytes_for_nibbles(num_nibbles: int) -> int:
"""compute the number of bytes needed to store the given number of nibbles."""
n_bytes = num_nibbles // 2
if num_nibbles & 1:
n_bytes += 1
return n_bytes
def boxed_heading_str(heading, width=80):
"""Generate a string that contains a boxed heading."""
@@ -631,17 +624,15 @@ def decomposeATR(atr_txt):
Returns:
dictionary of field and values
Example::
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
{ 'T0': {'value': 167},
'TB': {1: {'value': 0}},
'TC': {2: {'value': 24}},
'TD': {1: {'value': 64}},
'TS': {'value': 59},
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
'hbn': 7}
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
{ 'T0': {'value': 167},
'TB': {1: {'value': 0}},
'TC': {2: {'value': 24}},
'TD': {1: {'value': 64}},
'TS': {'value': 59},
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
'hbn': 7}
"""
ATR_PROTOCOL_TYPE_T0 = 0
atr_txt = normalizeATR(atr_txt)

View File

@@ -5,7 +5,7 @@ cmd2>=2.6.2,<3.0
jsonpath-ng
construct>=2.10.70
bidict
pyosmocom>=0.0.12
pyosmocom>=0.0.9
pyyaml>=5.1
termcolor
colorlog
@@ -15,4 +15,4 @@ git+https://github.com/osmocom/asn1tools
packaging
git+https://github.com/hologram-io/smpp.pdu
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
smpplib
psycopg2-binary

View File

@@ -25,7 +25,7 @@ setup(
"jsonpath-ng",
"construct >= 2.10.70",
"bidict",
"pyosmocom >= 0.0.12",
"pyosmocom >= 0.0.9",
"pyyaml >= 5.1",
"termcolor",
"colorlog",
@@ -34,6 +34,7 @@ setup(
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
"asn1tools",
"smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted",
"psycopg2-binary"
],
scripts=[
'pySim-prog.py',
@@ -55,10 +56,6 @@ setup(
"service-identity",
"pyopenssl",
"requests",
"smpplib",
],
"CardKeyProviderPgsql": [
"psycopg2-binary",
]
},
)

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000111
GID1: ffffffffffffffff
GID2: ffffffffffffffff
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
SMSC: 0015555
SPN: Fairwaves
Show in HPLMN: False
Hide in OPLMN: False

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: Can't read file -- SW match failed! Expected 9000 and got 6a82.
GID2: Can't read file -- SW match failed! Expected 9000 and got 6a82.
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
SMSC: 0015555
SPN: wavemobile
Show in HPLMN: False
Hide in OPLMN: False

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: Can't read file -- SW match failed! Expected 9000 and got 9404.
GID2: Can't read file -- SW match failed! Expected 9000 and got 9404.
SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic
Show in HPLMN: True
Hide in OPLMN: False

View File

@@ -2,7 +2,7 @@
# Utility to verify the functionality of pySim-prog.py
#
# (C) 2018 by sysmocom - s.f.m.c. GmbH
# (C) 2018 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: ffffffffffffffffffff
GID2: ffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic
Show in HPLMN: True
Hide in OPLMN: True

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: ffffffffffffffffffff
GID2: ffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic
Show in HPLMN: True
Hide in OPLMN: True

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: ffffffffffffffffffff
GID2: ffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Magic
Show in HPLMN: True
Hide in OPLMN: True

View File

@@ -6,7 +6,6 @@ IMSI: 001010000000102
GID1: Can't read file -- SW match failed! Expected 9000 and got 9404.
GID2: Can't read file -- SW match failed! Expected 9000 and got 9404.
SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Not available
Show in HPLMN: False
Hide in OPLMN: False

View File

@@ -15,7 +15,7 @@
},
{
"profile_info": {
"iccid": "8949449999999990031",
"iccid": "8949449999999990031f",
"isdp_aid": "a0000005591010ffffffff8900001200",
"profile_state": "disabled",
"service_provider_name": "OsmocomSPN",

View File

@@ -23,7 +23,7 @@ import os
import json
from utils import *
# This testcase requires a sysmoEUICC1-C2T with the test prfile TS48V1-B-UNIQUE (ICCID 8949449999999990031)
# This testcase requires a sysmoEUICC1-C2T with the test prfile TS48V1-B-UNIQUE (ICCID 8949449999999990031f)
# installed, and in disabled state. Also the profile must be installed in such a way that notifications are
# generated when the profile is disabled or enabled (ProfileMetadata)
@@ -56,7 +56,7 @@ class test_case(UnittestUtils):
self.runPySimShell(cardname, "test_enable_disable_profile.script")
self.assertEqualFiles("enable_disable_profile.tmp")
def test_set_nickname(self):
def test_enable_disable_profile(self):
cardname = 'sysmoEUICC1-C2T'
self.runPySimShell(cardname, "test_set_nickname.script")

View File

@@ -3,9 +3,6 @@ set echo true
select ADF.ISD-R
# Ensure that the test-profile we intend to test with is actually enabled
enable_profile --iccid 89000123456789012341
# by ICCID (pre-installed test profile on sysmoEUICC1-C2T)
disable_profile --iccid 89000123456789012341 > enable_disable_profile.tmp
enable_profile --iccid 89000123456789012341 >> enable_disable_profile.tmp

View File

@@ -3,11 +3,6 @@ set echo true
select ADF.ISD-R
# Ensure that the test-profile is actually enabled. (In case te test-profile
# was disabled, a notification may be generated. The testcase should tolerate
# that)
enable_profile --iccid 89000123456789012341
# Generate two (additional) notifications by quickly enabeling the test profile
enable_profile --iccid 8949449999999990031
enable_profile --iccid 8949449999999990031f
enable_profile --iccid 89000123456789012341

View File

@@ -1,10 +1,5 @@
set debug true
set echo true
# The output of get_profiles_info will also include the "profile_state", which
# can be either "enabled" or "disabled". Ensure that the correct profile is
# enabled.
enable_profile --iccid 89000123456789012341
select ADF.ISD-R
get_profiles_info > get_profiles_info.tmp

View File

@@ -19,7 +19,7 @@
"type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension"
},
"dialing_nr": "1234567",
"dialing_nr": "123456",
"cap_conf_id": 42,
"ext4_record_id": 23
},
@@ -67,7 +67,7 @@
"type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension"
},
"dialing_nr": "1234567",
"dialing_nr": "123456",
"cap_conf_id": 42,
"ext4_record_id": 23
},
@@ -127,7 +127,7 @@
"type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension"
},
"dialing_nr": "1234567",
"dialing_nr": "123456",
"cap_conf_id": 42,
"ext4_record_id": 23
}
@@ -140,7 +140,7 @@
"type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension"
},
"dialing_nr": "1234567",
"dialing_nr": "123456",
"cap_conf_id": 42,
"ext4_record_id": 23
}

View File

@@ -1,9 +0,0 @@
# Card parameter:
ICCID="8949440000001155314"
KIC='51D4FC44BCBA7C4589DFADA3297720AF'
KID='0449699C472CE71E2FB7B56245EF7684'
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
TAR='B00010'
APDU='A0A40000027F20A0C0000016'
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'

View File

@@ -1,156 +0,0 @@
#!/bin/bash
# Utility to verify the functionality of pySim-trace.py
#
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
PYSIM_SMPP2SIM=./pySim-smpp2sim.py
PYSIM_SMPP2SIM_LOG=./pySim-smpp2sim.log
PYSIM_SMPP2SIM_PORT=2775
PYSIM_SMPP2SIM_TIMEOUT=10
PYSIM_SMPPOTATOOL=./contrib/smpp-ota-tool.py
PYSIM_SMPPOTATOOL_LOG=./smpp-ota-tool.log
PYSIM_SHELL=./pySim-shell.py
function dump_logs {
echo ""
echo "$PYSIM_SMPPOTATOOL_LOG"
echo "------------8<------------"
cat $PYSIM_SMPPOTATOOL_LOG
echo "------------8<------------"
echo ""
echo "$PYSIM_SMPP2SIM_LOG"
echo "------------8<------------"
cat $PYSIM_SMPP2SIM_LOG
echo "------------8<------------"
}
function send_test_request {
echo ""
echo "Sending request to SMPP server:"
TAR=$1
C_APDU=$2
R_APDU_EXPECTED=$3
echo "Sending: $C_APDU"
COMMANDLINE="$PYSIM_SMPPOTATOOL --verbose --port $PYSIM_SMPP2SIM_PORT --kic $KIC --kid $KID --tar $TAR --apdu $C_APDU"
echo "Commandline: $COMMANDLINE"
R_APDU=`$COMMANDLINE 2> $PYSIM_SMPPOTATOOL_LOG`
if [ $? -ne 0 ]; then
echo "Unable to send request! -- failed!"
dump_logs
exit 1
fi
echo "Got response from SMPP server:"
echo "Sent: $C_APDU"
echo "Received: $R_APDU"
echo "Expected: $R_APDU_EXPECTED"
if [ "$R_APDU" != "$R_APDU_EXPECTED" ]; then
echo "Response does not match the expected response! -- failed!"
dump_logs
exit 1
fi
echo "Response matches the expected response -- success!"
echo ""
}
function start_smpp_server {
PCSC_READER=$1
# Start the SMPP server
echo ""
echo "Starting SMPP server:"
COMMANDLINE="$PYSIM_SMPP2SIM -p $PCSC_READER --smpp-bind-port $PYSIM_SMPP2SIM_PORT --apdu-trace"
echo "Commandline: $COMMANDLINE"
$COMMANDLINE > $PYSIM_SMPP2SIM_LOG 2>&1 &
PYSIM_SMPP2SIM_PID=$!
trap 'kill $PYSIM_SMPP2SIM_PID' EXIT
echo "SMPP server started (PID=$PYSIM_SMPP2SIM_PID)"
# Wait until the SMPP server is reachable
RC=1
RETRY_COUNT=0
while [ $RC -ne 0 ]; do
nc -z localhost $PYSIM_SMPP2SIM_PORT
RC=$?
((RETRY_COUNT++))
if [ $RETRY_COUNT -gt $PYSIM_SMPP2SIM_TIMEOUT ]; then
echo "SMPP server not reachable (port=$PYSIM_SMPP2SIM_PORT) -- abort"
dump_logs
exit 1
fi
sleep 1
done
echo "SMPP server reachable (port=$PYSIM_SMPP2SIM_PORT)"
}
function find_card_by_iccid {
# Find reader number of the card
ICCID=$1
echo ""
echo "Searching for card:"
echo "ICCID: \"$ICCID\""
if [ -z "$ICCID" ]; then
echo "invalid ICCID, zero length ICCID is not allowed! -- abort"
exit 1
fi
PCSC_READER_COUNT=`pcsc_scan -rn | wc -l`
for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
echo "probing card in reader $PCSC_READER ..."
EF_ICCID_DECODED=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e 'select EF.ICCID' -e 'read_binary_decoded --oneline' 2> /dev/null | tail -1`
echo $EF_ICCID_DECODED | grep $ICCID > /dev/null
if [ $? -eq 0 ]; then
echo "Found card in reader $PCSC_READER"
return $PCSC_READER
fi
done
echo "Card with ICCID \"$ICCID\" not found -- abort"
exit 1
}
export PYTHONPATH=./
echo "pySim-smpp2sim_test - a test program to test pySim-smpp2sim.py"
echo "=============================================================="
# TODO: At the moment we can only have one card and one testcase. This is
# sufficient for now. We can extend this later as needed.
# Read test parameters from config from file
TEST_CONFIG_FILE=${0%.*}.cfg
echo "using config file: $TEST_CONFIG_FILE"
if ! [ -e "$TEST_CONFIG_FILE" ]; then
echo "test configuration file does not exist! -- abort"
exit 1
fi
. $TEST_CONFIG_FILE
# Execute testcase
find_card_by_iccid $ICCID
start_smpp_server $?
send_test_request $TAR $APDU "$EXPECTED_RESPONSE"

View File

@@ -2,7 +2,7 @@
# Utility to verify the functionality of pySim-trace.py
#
# (C) 2023 by sysmocom - s.f.m.c. GmbH
# (C) 2023 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier

View File

@@ -143,7 +143,7 @@ CardReset(3b9f96801f878031e073fe211b674a4c753034054ba9)
===============================
00 SEARCH RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [2], "sw": "9000"}}
===============================
00 READ RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"alpha_id": "", "parameter_indicators": {"tp_vp": true, "tp_dcs": true, "tp_pid": true, "tp_sc_addr": true, "tp_dest_addr": false}, "tp_dest_addr": {"length": 255, "ton_npi": {"ext": true, "type_of_number": "reserved_for_extension", "numbering_plan_id": "reserved_for_extension"}, "call_number": ""}, "tp_sc_addr": {"length": 5, "ton_npi": {"ext": true, "type_of_number": "unknown", "numbering_plan_id": "isdn_e164"}, "call_number": "0015555"}, "tp_pid": "00", "tp_dcs": "00", "tp_vp_minutes": 5}
00 READ RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"alpha_id": "", "parameter_indicators": {"tp_dest_addr": false, "tp_sc_addr": true, "tp_pid": true, "tp_dcs": true, "tp_vp": true}, "tp_dest_addr": {"length": 255, "ton_npi": {"ext": true, "type_of_number": "reserved_for_extension", "numbering_plan_id": "reserved_for_extension"}, "call_number": ""}, "tp_sc_addr": {"length": 5, "ton_npi": {"ext": true, "type_of_number": "unknown", "numbering_plan_id": "isdn_e164"}, "call_number": "0015555f"}, "tp_pid": "00", "tp_dcs": "00", "tp_vp_minutes": 5}
===============================
00 SEARCH RECORD MF/ADF.USIM/EF.SMS 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "00ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], "sw": "9000"}}
===============================

View File

@@ -1 +0,0 @@
../../smdpp-data

View File

@@ -1,448 +0,0 @@
#!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: Neels Hofmeyr
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import io
import sys
import unittest
import io
from importlib import resources
from osmocom.utils import hexstr
from pySim.esim.saip import ProfileElementSequence
import pySim.esim.saip.personalization as p13n
import smdpp_data.upp
import xo
update_expected_output = False
def valstr(val):
if isinstance(val, io.BytesIO):
val = val.getvalue()
if isinstance(val, bytearray):
val = bytes(val)
return f'{val!r}'
def valtypestr(val):
if isinstance(val, dict):
types = []
for v in val.values():
types.append(f'{type(v).__name__}')
val_type = '{' + ', '.join(types) + '}'
else:
val_type = f'{type(val).__name__}'
return f'{valstr(val)}:{val_type}'
class ConfigurableParameterTest(unittest.TestCase):
def test_parameters(self):
upp_fnames = (
'TS48v5_SAIP2.1A_NoBERTLV.der',
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
'TS48v5_SAIP2.1B_NoBERTLV.der',
'TS48v5_SAIP2.3_NoBERTLV.der',
)
class Paramtest:
def __init__(self, param_cls, val, expect_val, expect_clean_val=None):
self.param_cls = param_cls
self.val = val
self.expect_clean_val = expect_clean_val
self.expect_val = expect_val
param_tests = [
Paramtest(param_cls=p13n.Imsi, val='123456',
expect_clean_val=str('123456'),
expect_val={'IMSI': hexstr('123456'),
'IMSI-ACC': '0040'}),
Paramtest(param_cls=p13n.Imsi, val=int(123456),
expect_val={'IMSI': hexstr('123456'),
'IMSI-ACC': '0040'}),
Paramtest(param_cls=p13n.Imsi, val='123456789012345',
expect_clean_val=str('123456789012345'),
expect_val={'IMSI': hexstr('123456789012345'),
'IMSI-ACC': '0020'}),
Paramtest(param_cls=p13n.Imsi, val=int(123456789012345),
expect_val={'IMSI': hexstr('123456789012345'),
'IMSI-ACC': '0020'}),
Paramtest(param_cls=p13n.Puk1,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Puk1,
val=int(12345678),
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Puk2,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Pin1,
val='1234',
expect_clean_val=b'1234\xff\xff\xff\xff',
expect_val='1234'),
Paramtest(param_cls=p13n.Pin1,
val='123456',
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.Pin1,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Pin1,
val=int(1234),
expect_clean_val=b'1234\xff\xff\xff\xff',
expect_val='1234'),
Paramtest(param_cls=p13n.Pin1,
val=int(123456),
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.Pin1,
val=int(12345678),
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Adm1,
val='1234',
expect_clean_val=b'1234\xff\xff\xff\xff',
expect_val='1234'),
Paramtest(param_cls=p13n.Adm1,
val='123456',
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.Adm1,
val='12345678',
expect_clean_val=b'12345678',
expect_val='12345678'),
Paramtest(param_cls=p13n.Adm1,
val=int(123456),
expect_clean_val=b'123456\xff\xff',
expect_val='123456'),
Paramtest(param_cls=p13n.AlgorithmID,
val='Milenage',
expect_clean_val=1,
expect_val='Milenage'),
Paramtest(param_cls=p13n.AlgorithmID,
val='TUAK',
expect_clean_val=2,
expect_val='TUAK'),
Paramtest(param_cls=p13n.AlgorithmID,
val='usim-test',
expect_clean_val=3,
expect_val='usim-test'),
Paramtest(param_cls=p13n.AlgorithmID,
val=1,
expect_clean_val=1,
expect_val='Milenage'),
Paramtest(param_cls=p13n.AlgorithmID,
val=2,
expect_clean_val=2,
expect_val='TUAK'),
Paramtest(param_cls=p13n.AlgorithmID,
val=3,
expect_clean_val=3,
expect_val='usim-test'),
Paramtest(param_cls=p13n.K,
val='01020304050607080910111213141516',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.K,
val=int(11020304050607080910111213141516),
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='11020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val='01020304050607080910111213141516',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.Opc,
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
expect_val='01020304050607080910111213141516'),
Paramtest(param_cls=p13n.SmspTpScAddr,
val='+1234567',
expect_clean_val=(True, '1234567'),
expect_val='+1234567'),
Paramtest(param_cls=p13n.SmspTpScAddr,
val=1234567,
expect_clean_val=(False, '1234567'),
expect_val='1234567'),
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
val='123',
expect_clean_val=123,
expect_val='123'),
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
val=123,
expect_clean_val=123,
expect_val='123'),
Paramtest(param_cls=p13n.MilenageRotationConstants,
val='0a 0b 0c 01 02',
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
expect_val='0a0b0c0102'),
Paramtest(param_cls=p13n.MilenageRotationConstants,
val=b'\x0a\x0b\x0c\x01\x02',
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
expect_val='0a0b0c0102'),
Paramtest(param_cls=p13n.MilenageRotationConstants,
val=bytearray(b'\x0a\x0b\x0c\x01\x02'),
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
expect_val='0a0b0c0102'),
Paramtest(param_cls=p13n.MilenageXoringConstants,
val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
' bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
' cccccccccccccccccccccccccccccccc'
' 11111111111111111111111111111111'
' 22222222222222222222222222222222',
expect_clean_val=b'\xaa' * 16
+ b'\xbb' * 16
+ b'\xcc' * 16
+ b'\x11' * 16
+ b'\x22' * 16,
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
'cccccccccccccccccccccccccccccccc'
'11111111111111111111111111111111'
'22222222222222222222222222222222'),
Paramtest(param_cls=p13n.MilenageXoringConstants,
val=b'\xaa' * 16
+ b'\xbb' * 16
+ b'\xcc' * 16
+ b'\x11' * 16
+ b'\x22' * 16,
expect_clean_val=b'\xaa' * 16
+ b'\xbb' * 16
+ b'\xcc' * 16
+ b'\x11' * 16
+ b'\x22' * 16,
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
'cccccccccccccccccccccccccccccccc'
'11111111111111111111111111111111'
'22222222222222222222222222222222'),
]
for sdkey_cls in (
# thin out the number of tests, as a compromise between completeness and test runtime
p13n.SdKeyScp02Kvn20AesDek,
#p13n.SdKeyScp02Kvn20AesEnc,
#p13n.SdKeyScp02Kvn20AesMac,
#p13n.SdKeyScp02Kvn21AesDek,
p13n.SdKeyScp02Kvn21AesEnc,
#p13n.SdKeyScp02Kvn21AesMac,
#p13n.SdKeyScp02Kvn22AesDek,
#p13n.SdKeyScp02Kvn22AesEnc,
p13n.SdKeyScp02Kvn22AesMac,
#p13n.SdKeyScp02KvnffAesDek,
#p13n.SdKeyScp02KvnffAesEnc,
#p13n.SdKeyScp02KvnffAesMac,
p13n.SdKeyScp03Kvn30AesDek,
#p13n.SdKeyScp03Kvn30AesEnc,
#p13n.SdKeyScp03Kvn30AesMac,
#p13n.SdKeyScp03Kvn31AesDek,
p13n.SdKeyScp03Kvn31AesEnc,
#p13n.SdKeyScp03Kvn31AesMac,
#p13n.SdKeyScp03Kvn32AesDek,
#p13n.SdKeyScp03Kvn32AesEnc,
p13n.SdKeyScp03Kvn32AesMac,
#p13n.SdKeyScp80Kvn01AesDek,
#p13n.SdKeyScp80Kvn01AesEnc,
#p13n.SdKeyScp80Kvn01AesMac,
p13n.SdKeyScp80Kvn01DesDek,
#p13n.SdKeyScp80Kvn01DesEnc,
#p13n.SdKeyScp80Kvn01DesMac,
#p13n.SdKeyScp80Kvn02AesDek,
p13n.SdKeyScp80Kvn02AesEnc,
#p13n.SdKeyScp80Kvn02AesMac,
#p13n.SdKeyScp80Kvn02DesDek,
#p13n.SdKeyScp80Kvn02DesEnc,
p13n.SdKeyScp80Kvn02DesMac,
#p13n.SdKeyScp80Kvn03AesDek,
#p13n.SdKeyScp80Kvn03AesEnc,
#p13n.SdKeyScp80Kvn03AesMac,
p13n.SdKeyScp80Kvn03DesDek,
#p13n.SdKeyScp80Kvn03DesEnc,
#p13n.SdKeyScp80Kvn03DesMac,
p13n.SdKeyScp81Kvn40Dek ,
#p13n.SdKeyScp81Kvn40Tlspsk,
#p13n.SdKeyScp81Kvn41Dek ,
p13n.SdKeyScp81Kvn41Tlspsk,
#p13n.SdKeyScp81Kvn42Dek ,
#p13n.SdKeyScp81Kvn42Tlspsk,
):
for key_len in sdkey_cls.allow_len:
val = '0102030405060708091011121314151617181920212223242526272829303132'
expect_clean_val = (b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
b'\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31\x32')
expect_val = '0102030405060708091011121314151617181920212223242526272829303132'
val = val[:key_len*2]
expect_clean_val = expect_clean_val[:key_len]
expect_val = val
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
# test bytes input
val = expect_clean_val
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
# test bytearray input
val = bytearray(expect_clean_val)
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
# test BytesIO input
val = io.BytesIO(expect_clean_val)
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
if key_len == 16:
# test huge integer input.
# needs to start with nonzero.. stupid
val = 11020304050607080910111213141516
expect_clean_val = (b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16')
expect_val = '11020304050607080910111213141516'
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
outputs = []
for upp_fname in upp_fnames:
test_idx = -1
try:
der = resources.read_binary(smdpp_data.upp, upp_fname)
for t in param_tests:
test_idx += 1
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
param = None
try:
param = t.param_cls()
param.input_value = t.val
param.validate()
except ValueError as e:
raise ValueError(f'{logloc}: {e}') from e
clean_val = param.value
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
raise ValueError(f'{logloc}: expected'
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
# pes = copy.deepcopy(orig_pes)
pes = ProfileElementSequence.from_der(der)
try:
param.apply(pes)
except ValueError as e:
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
changed_der = pes.to_der()
pes2 = ProfileElementSequence.from_der(changed_der)
read_back_val = t.param_cls.get_value_from_pes(pes2)
# compose log string to show the precise type of dict values
if isinstance(read_back_val, dict):
types = set()
for v in read_back_val.values():
types.add(f'{type(v).__name__}')
read_back_val_type = '{' + ', '.join(types) + '}'
else:
read_back_val_type = f'{type(read_back_val).__name__}'
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
expect_val = t.expect_val
if not isinstance(expect_val, dict):
expect_val = { t.param_cls.get_name(): expect_val }
if read_back_val != expect_val:
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
ok = logloc.replace(' clean_val', '\n\tclean_val'
).replace(' read_back_val', '\n\tread_back_val'
).replace('=', '=\t'
)
output = f'\nok: {ok}'
outputs.append(output)
print(output)
except Exception as e:
raise RuntimeError(f'Error while testing UPP {upp_fname} {test_idx=}: {e}') from e
output = '\n'.join(outputs) + '\n'
xo_name = 'test_configurable_parameters'
if update_expected_output:
with resources.path(xo, xo_name) as xo_path:
with open(xo_path, 'w', encoding='utf-8') as f:
f.write(output)
else:
xo_str = resources.read_text(xo, xo_name)
if xo_str != output:
at = 0
while at < len(output):
if output[at] == xo_str[at]:
at += 1
continue
break
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
if __name__ == "__main__":
if '-u' in sys.argv:
update_expected_output = True
sys.argv.remove('-u')
unittest.main()

View File

@@ -21,7 +21,7 @@ import copy
from osmocom.utils import h2b, b2h
from pySim.esim.saip import *
from pySim.esim.saip import personalization
from pySim.esim.saip.personalization import *
from pprint import pprint as pp
@@ -55,56 +55,14 @@ class SaipTest(unittest.TestCase):
def test_personalization(self):
"""Test some of the personalization operations."""
pes = copy.deepcopy(self.pes)
params = [personalization.Puk1('01234567'),
personalization.Puk2(98765432),
personalization.Pin1('1111'),
personalization.Pin2(2222),
personalization.Adm1('11111111'),
personalization.K(h2b('000102030405060708090a0b0c0d0e0f')),
personalization.Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
params = [Puk1('01234567'), Puk2(98765432), Pin1('1111'), Pin2(2222), Adm1('11111111'),
K(h2b('000102030405060708090a0b0c0d0e0f')), Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
for p in params:
p.validate()
p.apply(pes)
# TODO: we don't actually test the results here, but we just verify there is no exception
pes.to_der()
def test_personalization2(self):
"""Test some of the personalization operations."""
cls = personalization.SdKeyScp80Kvn01DesEnc
pes = ProfileElementSequence.from_der(self.per_input)
prev_val = tuple(cls.get_values_from_pes(pes))
print(f'{prev_val=}')
self.assertTrue(prev_val)
set_val = '42342342342342342342342342342342'
param = cls(set_val)
param.validate()
param.apply(pes)
get_val1 = tuple(cls.get_values_from_pes(pes))
print(f'{get_val1=} {set_val=}')
self.assertEqual(get_val1, ({cls.name: set_val},))
get_val1b = tuple(cls.get_values_from_pes(pes))
print(f'{get_val1b=} {set_val=}')
self.assertEqual(get_val1b, ({cls.name: set_val},))
der = pes.to_der()
get_val1c = tuple(cls.get_values_from_pes(pes))
print(f'{get_val1c=} {set_val=}')
self.assertEqual(get_val1c, ({cls.name: set_val},))
# assertTrue to not dump the entire der.
# Expecting the modified DER to be different. If this assertion fails, then no change has happened in the output
# DER and the ConfigurableParameter subclass is buggy.
self.assertTrue(der != self.per_input)
pes2 = ProfileElementSequence.from_der(der)
get_val2 = tuple(cls.get_values_from_pes(pes2))
print(f'{get_val2=} {set_val=}')
self.assertEqual(get_val2, ({cls.name: set_val},))
def test_constructor_encode(self):
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
@@ -132,34 +90,5 @@ class OidTest(unittest.TestCase):
self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0'))
self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1'))
class NonMatchTest(unittest.TestCase):
def test_nonmatch(self):
# non-matches before, in between and after matches
match_list = [Match(a=10, b=10, size=5), Match(a=20, b=20, size=4)]
nm_list = NonMatch.from_matchlist(match_list, 26)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=10), NonMatch(a=15, b=15, size=5),
NonMatch(a=24, b=24, size=2)])
def test_nonmatch_beg(self):
# single match at beginning
match_list = [Match(a=0, b=0, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 20)
self.assertEqual(nm_list, [NonMatch(a=5, b=5, size=15)])
def test_nonmatch_end(self):
# single match at end
match_list = [Match(a=19, b=19, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=19)])
def test_nonmatch_none(self):
# no match at all
match_list = []
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=24)])
if __name__ == "__main__":
unittest.main()

View File

@@ -96,7 +96,7 @@ class LinFixed_Test(unittest.TestCase):
inst = c()
encoded, rec_num, decoded = self._parse_t(t)
logging.debug("Testing encode of %s", name)
re_enc = inst.encode_record_hex(decoded, rec_num, len(encoded)//2)
re_enc = inst.encode_record_hex(decoded, rec_num)
self.assertEqual(encoded.upper(), re_enc.upper())
def test_de_encode_record(self):
@@ -122,7 +122,7 @@ class LinFixed_Test(unittest.TestCase):
self.assertEqual(decoded, re_dec)
# re-encode the decoded data
logging.debug("Testing re-encode of %s", name)
re_enc = inst.encode_record_hex(re_dec, rec_num, len(encoded)//2)
re_enc = inst.encode_record_hex(re_dec, rec_num)
self.assertEqual(encoded.upper(), re_enc.upper())
if hasattr(c, '_test_no_pad') and c._test_no_pad:
continue
@@ -196,7 +196,7 @@ class TransRecEF_Test(unittest.TestCase):
self.assertEqual(decoded, re_dec)
# re-encode the decoded data
logging.debug("Testing re-encode of %s", name)
re_enc = inst.encode_record_hex(re_dec, len(encoded)//2)
re_enc = inst.encode_record_hex(re_dec)
self.assertEqual(encoded.upper(), re_enc.upper())
# there's no point in testing padded input, as TransRecEF have a fixed record
# size and we cannot ever receive more input data than that size.
@@ -256,8 +256,8 @@ class TransparentEF_Test(unittest.TestCase):
encoded = t[0]
decoded = t[1]
logging.debug("Testing encode of %s", name)
re_enc = inst.encode_hex(decoded, len(encoded)//2)
self.assertEqual(encoded, re_enc)
re_dec = inst.decode_hex(encoded)
self.assertEqual(decoded, re_dec)
def test_de_encode_file(self):
"""Test the decoder and encoder for a transparent EF. Performs first a decoder
@@ -280,7 +280,7 @@ class TransparentEF_Test(unittest.TestCase):
self.assertEqual(decoded, re_dec)
logging.debug("Testing re-encode of %s", name)
re_dec = inst.decode_hex(encoded)
re_enc = inst.encode_hex(re_dec, len(encoded)//2)
re_enc = inst.encode_hex(re_dec)
self.assertEqual(encoded.upper(), re_enc.upper())
if hasattr(c, '_test_no_pad') and c._test_no_pad:
continue

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH
# (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier <pmaier@sysmocom.de>
@@ -25,7 +25,7 @@ import io
import sys
from inspect import currentframe, getframeinfo
log = PySimLogger.get(__name__)
log = PySimLogger.get("TEST")
TEST_MSG_DEBUG = "this is a debug message"
TEST_MSG_INFO = "this is an info message"
@@ -82,15 +82,15 @@ class PySimLogger_Test(unittest.TestCase):
PySimLogger.setup(self._test_print_callback)
PySimLogger.set_verbose(True)
frame = currentframe()
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- DEBUG: " + TEST_MSG_DEBUG
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
log.debug(TEST_MSG_DEBUG)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- INFO: " + TEST_MSG_INFO
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- WARNING: " + TEST_MSG_WARNING
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- ERROR: " + TEST_MSG_ERROR
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- CRITICAL: " + TEST_MSG_CRITICAL
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL)
def test_04_level(self):

View File

@@ -1,216 +0,0 @@
#!/usr/bin/env python3
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
#
# Author: Neels Hofmeyr
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import math
from importlib import resources
import unittest
from pySim.esim.saip import param_source
import xo
update_expected_output = False
class D:
mandatory = set()
optional = set()
def __init__(self, **kwargs):
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
for k, v in kwargs.items():
setattr(self, k, v)
for k in self.optional:
if not hasattr(self, k):
setattr(self, k, None)
decimals = '0123456789'
hexadecimals = '0123456789abcdefABCDEF'
class FakeRandom:
vals = b'\xab\xcfm\xf0\x98J_\xcf\x96\x87fp5l\xe7f\xd1\xd6\x97\xc1\xf9]\x8c\x86+\xdb\t^ke\xc1r'
i = 0
@classmethod
def next(cls):
cls.i = (cls.i + 1) % len(cls.vals)
return cls.vals[cls.i]
@staticmethod
def randint(a, b):
d = b - a
n_bytes = math.ceil(math.log(d, 2))
r = int.from_bytes( bytes(FakeRandom.next() for i in range(n_bytes)) )
return a + (r % (b - a))
@staticmethod
def randbytes(n):
return bytes(FakeRandom.next() for i in range(n))
class ParamSourceTest(unittest.TestCase):
def test_param_source(self):
class ParamSourceTest(D):
mandatory = (
'param_source',
'n',
'expect',
)
optional = (
'expect_arg',
'csv_rows',
)
def expect_const(t, vals):
return tuple(t.expect_arg) == tuple(vals)
def expect_random(t, vals):
chars = t.expect_arg.get('digits')
repetitions = (t.n - len(set(vals)))
if repetitions:
raise RuntimeError(f'expect_random: there are {repetitions} repetitions in the returned values: {vals}')
for val_i in range(len(vals)):
v = vals[val_i]
val_minlen = t.expect_arg.get('val_minlen')
val_maxlen = t.expect_arg.get('val_maxlen')
if len(v) < val_minlen or len(v) > val_maxlen:
raise RuntimeError(f'expect_random: invalid length {len(v)} for value [{val_i}]: {v!r}, expecting'
f' {val_minlen}..{val_maxlen}')
if chars is not None and not all(c in chars for c in v):
raise RuntimeError(f'expect_random: invalid char in value [{val_i}]: {v!r}')
return True
param_source_tests = [
ParamSourceTest(param_source=param_source.ConstantSource.from_str('123'),
n=3,
expect=expect_const,
expect_arg=('123', '123', '123')
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('12345'),
n=3,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 5,
'val_maxlen': 5,
},
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('1..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 1,
'val_maxlen': 3,
},
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('001..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 3,
'val_maxlen': 3,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.IncDigitSource.from_str('10001'),
n=3,
expect=expect_const,
expect_arg=('10001', '10002', '10003')
),
ParamSourceTest(param_source=param_source.CsvSource('column_name'),
n=3,
expect=expect_const,
expect_arg=('first val', 'second val', 'third val'),
csv_rows=(
{'column_name': 'first val',},
{'column_name': 'second val',},
{'column_name': 'third val',},
)
),
]
outputs = []
for t in param_source_tests:
try:
if hasattr(t.param_source, 'random_impl'):
t.param_source.random_impl = FakeRandom
vals = []
for i in range(t.n):
csv_row = None
if t.csv_rows is not None:
csv_row = t.csv_rows[i]
vals.append( t.param_source.get_next(csv_row=csv_row) )
if not t.expect(t, vals):
raise RuntimeError(f'invalid values returned: returned {vals}')
output = f'ok: {t.param_source.__class__.__name__} {vals=!r}'
outputs.append(output)
print(output)
except RuntimeError as e:
raise RuntimeError(f'{t.param_source.__class__.__name__} {t.n=} {t.expect.__name__}({t.expect_arg!r}): {e}') from e
output = '\n'.join(outputs) + '\n'
xo_name = 'test_param_src'
if update_expected_output:
with resources.path(xo, xo_name) as xo_path:
with open(xo_path, 'w', encoding='utf-8') as f:
f.write(output)
else:
xo_str = resources.read_text(xo, xo_name)
if xo_str != output:
at = 0
while at < len(output):
if output[at] == xo_str[at]:
at += 1
continue
break
raise RuntimeError(f'output differs from expected output at position {at}: {xo_str[at:at+128]!r}')
if __name__ == "__main__":
if '-u' in sys.argv:
update_expected_output = True
sys.argv.remove('-u')
unittest.main()

View File

@@ -79,30 +79,6 @@ class DecTestCase(unittest.TestCase):
def testDecMNCfromPLMN_unused_str(self):
self.assertEqual(utils.dec_mnc_from_plmn_str("00f0ff"), "")
def testEncImsi(self):
#Type IMSI, odd number of identity digits
self.assertEqual(utils.enc_imsi("228062800000208"), "082982608200002080")
self.assertEqual(utils.enc_imsi("001010000123456"), "080910100000214365")
self.assertEqual(utils.enc_imsi("0010100001234"), "0709101000002143ff")
#Type IMSI, even number of identity digits
self.assertEqual(utils.enc_imsi("22806280000028"), "0821826082000020f8")
self.assertEqual(utils.enc_imsi("00101000012345"), "0801101000002143f5")
self.assertEqual(utils.enc_imsi("001010000123"), "07011010000021f3ff")
def testDecImsi(self):
#Type IMSI, odd number of identity digits
self.assertEqual(utils.dec_imsi("082982608200002080"), "228062800000208")
self.assertEqual(utils.dec_imsi("080910100000214365"), "001010000123456")
self.assertEqual(utils.dec_imsi("0709101000002143ff"), "0010100001234")
self.assertEqual(utils.dec_imsi("0709101000002143"), "0010100001234")
#Type IMSI, even number of identity digits
self.assertEqual(utils.dec_imsi("0821826082000020f8"), "22806280000028")
self.assertEqual(utils.dec_imsi("0801101000002143f5"), "00101000012345")
self.assertEqual(utils.dec_imsi("07011010000021f3ff"), "001010000123")
self.assertEqual(utils.dec_imsi("07011010000021f3"), "001010000123")
def test_enc_plmn(self):
with self.subTest("2-digit MCC"):
self.assertEqual(utils.enc_plmn("001", "01F"), "00F110")

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +0,0 @@
ok: ConstantSource vals=['123', '123', '123']
ok: RandomDigitSource vals=['13987', '49298', '55670']
ok: RandomDigitSource vals=['650', '580', '49', '885', '497', '195', '320', '137', '245', '663']
ok: RandomDigitSource vals=['638', '025', '232', '779', '826', '972', '650', '580', '049', '885']
ok: RandomHexDigitSource vals=['6b65c172', 'abcf6df0', '984a5fcf']
ok: RandomHexDigitSource vals=['96876670', '356ce766', 'd1d697c1']
ok: RandomHexDigitSource vals=['f95d8c86', '2bdb095e', '6b65c172']
ok: IncDigitSource vals=['10001', '10002', '10003']
ok: CsvSource vals=['first val', 'second val', 'third val']