Compare commits

..

7 Commits

Author SHA1 Message Date
Eric Wild
4e27b5107b smdpp: proper checks, proper check order, test mode
Change-Id: Ie4db65594b2eaf6c95ffc5e73ff9ae61c4d9d3a3
2025-06-25 10:25:35 +02:00
Eric Wild
0d24f35776 memory backed ephermal session store for easy concurrent runs
Change-Id: I05bfd6ff471ccf1c8c2b5f2b748b9d4125ddd4f7
2025-06-25 10:22:42 +02:00
Eric Wild
89e6e0b0bc smdpp: fix asn1tool OBJECT IDENTIFIER decoding
Change-Id: Ic678e6c4a4c1a01de87a8dce26f4a5e452e8562a
2025-06-25 10:22:42 +02:00
Eric Wild
3196f2fadf smdpp: add proper brp cert support
Change-Id: I6906732f7d193a9c2234075f4a82df5e0ed46100
2025-06-25 10:22:42 +02:00
Eric Wild
f98b1a0080 smdpp: verify cert chain
Change-Id: I1e4e4b1b032dc6a8b7d15bd80d533a50fe0cff15
2025-06-25 10:22:42 +02:00
Eric Wild
dc5fdd34bf x509 cert: fix weird cert check
Change-Id: I18beab0e1b24579724704c4141a2c457b2d4cf99
2025-06-25 10:22:42 +02:00
Eric Wild
67995146eb smdpp: less verbose by default
Those data blobs are huge.

Change-Id: I04a72b8f52417862d4dcba1f0743700dd942ef49
2025-06-25 10:22:42 +02:00
70 changed files with 1148 additions and 1735 deletions

View File

@@ -100,7 +100,6 @@ Please install the following dependencies:
- pyyaml >= 5.1 - pyyaml >= 5.1
- smpp.pdu (from `github.com/hologram-io/smpp.pdu`) - smpp.pdu (from `github.com/hologram-io/smpp.pdu`)
- termcolor - termcolor
- psycopg2-binary
Example for Debian: Example for Debian:
```sh ```sh

View File

@@ -24,12 +24,20 @@ import argparse
from Cryptodome.Cipher import AES from Cryptodome.Cipher import AES
from osmocom.utils import h2b, b2h, Hexstr from osmocom.utils import h2b, b2h, Hexstr
from pySim.card_key_provider import CardKeyFieldCryptor from pySim.card_key_provider import CardKeyProviderCsv
class CsvColumnEncryptor(CardKeyFieldCryptor): def dict_keys_to_upper(d: dict) -> dict:
return {k.upper():v for k,v in d.items()}
class CsvColumnEncryptor:
def __init__(self, filename: str, transport_keys: dict): def __init__(self, filename: str, transport_keys: dict):
self.filename = filename self.filename = filename
self.crypt = CardKeyFieldCryptor(transport_keys) self.transport_keys = dict_keys_to_upper(transport_keys)
def encrypt_col(self, colname:str, value: str) -> Hexstr:
key = self.transport_keys[colname]
cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV)
return b2h(cipher.encrypt(h2b(value)))
def encrypt(self) -> None: def encrypt(self) -> None:
with open(self.filename, 'r') as infile: with open(self.filename, 'r') as infile:
@@ -41,8 +49,9 @@ class CsvColumnEncryptor(CardKeyFieldCryptor):
cw.writeheader() cw.writeheader()
for row in cr: for row in cr:
for fieldname in cr.fieldnames: for key_colname in self.transport_keys:
row[fieldname] = self.crypt.encrypt_field(fieldname, row[fieldname]) if key_colname in row:
row[key_colname] = self.encrypt_col(key_colname, row[key_colname])
cw.writerow(row) cw.writerow(row)
if __name__ == "__main__": if __name__ == "__main__":
@@ -62,5 +71,9 @@ if __name__ == "__main__":
print("You must specify at least one key!") print("You must specify at least one key!")
sys.exit(1) sys.exit(1)
csv_column_keys = CardKeyProviderCsv.process_transport_keys(csv_column_keys)
for name, key in csv_column_keys.items():
print("Encrypting column %s using AES key %s" % (name, key))
cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys) cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
cce.encrypt() cce.encrypt()

View File

@@ -1,286 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import csv
import sys
import yaml
import psycopg2
from psycopg2.sql import Identifier, SQL
from pathlib import Path
from pySim.log import PySimLogger
from packaging import version
log = PySimLogger.get("CSV2PGQSL")
class CardKeyDatabase:
def __init__(self, config_filename: str, table_name: str, create_table: bool = False, admin: bool = False):
"""
Initialize database connection and set the table which shall be used as storage for the card key data.
In case the specified table does not exist yet it can be created using the create_table_type parameter.
New tables are always minimal tables which follow a pre-defined table scheme. The user may extend the table
with additional columns using the add_cols() later.
Args:
tablename : name of the database table to create.
create_table_type : type of the table to create ('UICC' or 'EUICC')
"""
def user_from_config_file(config, role: str) -> tuple[str, str]:
db_users = config.get('db_users')
user = db_users.get(role)
if user is None:
raise ValueError("user for role '%s' not set up in config file." % role)
return user.get('name'), user.get('pass')
log = PySimLogger.get("PQSQL")
self.table = table_name
self.cols = None
# Depending on the table type, the table name must contain either the substring "uicc_keys" or "euicc_keys".
# This convention will allow us to deduct the table type from the table name.
if "euicc_keys" not in table_name and "uicc_keys" not in table_name:
raise ValueError("Table name (%s) should contain the substring \"uicc_keys\" or \"euicc_keys\"" % table_name)
# Read config file
log.info("Using config file: %s", config_filename)
with open(config_filename, "r") as cfg:
config = yaml.load(cfg, Loader=yaml.FullLoader)
host = config.get('host')
log.info("Database host: %s", host)
db_name = config.get('db_name')
log.info("Database name: %s", db_name)
table_names = config.get('table_names')
username_admin, password_admin = user_from_config_file(config, 'admin')
username_importer, password_importer = user_from_config_file(config, 'importer')
username_reader, _ = user_from_config_file(config, 'reader')
# Switch between admin and importer user
if admin:
username, password = username_admin, password_admin
else:
username, password = username_importer, password_importer
# Create database connection
log.info("Database user: %s", username)
self.conn = psycopg2.connect(dbname=db_name, user=username, password=password, host=host)
self.cur = self.conn.cursor()
# In the context of this tool it is not relevant if the table name is present in the config file. However,
# pySim-shell.py will require the table name to be configured properly to access the database table.
if self.table not in table_names:
log.warning("Specified table name (%s) is not yet present in config file (required for access from pySim-shell.py)",
self.table)
# Create a new minimal database table of the specified table type.
if create_table:
if not admin:
raise ValueError("creation of new table refused, use option --admin and try again.")
if "euicc_keys" in self.table:
self.__create_table(username_reader, username_importer, ['EID'])
elif "uicc_keys" in self.table:
self.__create_table(username_reader, username_importer, ['ICCID', 'IMSI'])
# Ensure a table with the specified name exists
log.info("Database table: %s", self.table)
if self.get_cols() == []:
raise ValueError("Table name (%s) does not exist yet" % self.table)
log.info("Database table columns: %s", str(self.get_cols()))
def __create_table(self, user_reader:str, user_importer:str, cols:list[str]):
"""
Initialize a new table. New tables are always minimal tables with one primary key and additional index columns.
Non index-columns may be added later using method _update_cols().
"""
# Create table columns with primary key
query = SQL("CREATE TABLE {} ({} VARCHAR PRIMARY KEY").format(Identifier(self.table.lower()),
Identifier(cols[0].lower()))
for c in cols[1:]:
query += SQL(", {} VARCHAR").format(Identifier(c.lower()))
query += SQL(");")
self.cur.execute(query)
# Create indexes for all other columns
for c in cols[1:]:
self.cur.execute(query = SQL("CREATE INDEX {} ON {}({});").format(Identifier(c.lower()),
Identifier(self.table.lower()),
Identifier(c.lower())))
# Set permissions
self.cur.execute(SQL("GRANT INSERT ON {} TO {};").format(Identifier(self.table.lower()),
Identifier(user_importer)))
self.cur.execute(SQL("GRANT SELECT ON {} TO {};").format(Identifier(self.table.lower()),
Identifier(user_reader)))
log.info("New database table created: %s", str(self.table.lower()))
def get_cols(self) -> list[str]:
"""
Get a list of all columns available in the current table scheme.
Returns:
list with column names (in uppercase) of the database table
"""
# Return cached col list if present
if self.cols:
return self.cols
# Request a list of current cols from the database
self.cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (self.table.lower(),))
cols_result = self.cur.fetchall()
cols = []
for c in cols_result:
cols.append(c[0].upper())
self.cols = cols
return cols
def get_missing_cols(self, cols_expected:list[str]) -> list[str]:
"""
Check if the current table scheme lacks any of the given expected columns.
Returns:
list with the missing columns.
"""
cols_present = self.get_cols()
return list(set(cols_expected) - set(cols_present))
def add_cols(self, cols:list[str]):
"""
Update the current table scheme with additional columns. In case the updated columns are already exist, the
table schema is not changed.
Args:
table : name of the database table to alter
cols : list with updated colum names to add
"""
cols_missing = self.get_missing_cols(cols)
# Depending on the table type (see constructor), we either have a primary key 'ICCID' (for UICC data), or 'EID'
# (for eUICC data). Both table formats different types of data and have rather differen columns also. Let's
# prevent the excidentally mixing of both types.
if 'ICCID' in cols_missing:
raise ValueError("Table %s stores eUCCC key material, refusing to add UICC specific column 'ICCID'" % self.table)
if 'EID' in cols_missing:
raise ValueError("Table %s stores UCCC key material, refusing to add eUICC specific column 'EID'" % self.table)
# Add the missing columns to the table
self.cols = None
for c in cols_missing:
self.cur.execute(query = SQL("ALTER TABLE {} ADD {} VARCHAR;").format(Identifier(self.table.lower()),
Identifier(c.lower())))
def insert_row(self, row:dict[str, str]):
"""
Insert a new row into the database table.
Args:
row : dictionary with the colum names and their designated values
"""
# Check if the row is compatible with the current table scheme
cols_expected = list(row.keys())
cols_missing = self.get_missing_cols(cols_expected)
if cols_missing != []:
raise ValueError("table %s has incompatible format, the row %s contains unknown cols %s" %
(self.table, str(row), str(cols_missing)))
# Insert row into datbase table
row_keys = list(row.keys())
row_values = list(row.values())
query = SQL("INSERT INTO {} ").format(Identifier(self.table.lower()))
query += SQL("({} ").format(Identifier(row_keys[0].lower()))
for k in row_keys[1:]:
query += SQL(", {}").format(Identifier(k.lower()))
query += SQL(") VALUES (%s")
for v in row_values[1:]:
query += SQL(", %s")
query += SQL(");")
self.cur.execute(query, row_values)
def commit(self):
self.conn.commit()
log.info("Changes to table %s committed!", self.table)
def open_csv(opts: argparse.Namespace):
log.info("CSV file: %s", opts.csv)
csv_file = open(opts.csv, 'r')
cr = csv.DictReader(csv_file)
if not cr:
raise RuntimeError("could not open DictReader for CSV-File '%s'" % opts.csv)
cr.fieldnames = [field.upper() for field in cr.fieldnames]
log.info("CSV file columns: %s", str(cr.fieldnames))
return cr
def open_db(cr: csv.DictReader, opts: argparse.Namespace) -> CardKeyDatabase:
try:
db = CardKeyDatabase(opts.pqsql, opts.table_name, opts.create_table, opts.admin)
# Check CSV format against table schema, add missing columns
cols_missing = db.get_missing_cols(cr.fieldnames)
if cols_missing != [] and (opts.update_columns or opts.create_table):
log.info("Adding missing columns: %s", str(cols_missing))
db.add_cols(cols_missing)
cols_missing = db.get_missing_cols(cr.fieldnames)
# Make sure the table schema has no missing columns
if cols_missing != []:
log.error("Database table lacks CSV file columns: %s -- import aborted!", cols_missing)
sys.exit(2)
except Exception as e:
log.error(str(e).strip())
log.error("Database initialization aborted due to error!")
sys.exit(2)
return db
def import_from_csv(db: CardKeyDatabase, cr: csv.DictReader):
count = 0
for row in cr:
try:
db.insert_row(row)
count+=1
if count % 100 == 0:
log.info("CSV file import in progress, %d rows imported...", count)
except Exception as e:
log.error(str(e).strip())
log.error("CSV file import aborted due to error, no datasets committed!")
sys.exit(2)
log.info("CSV file import done, %d rows imported", count)
if __name__ == '__main__':
option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
option_parser.add_argument('--pqsql', metavar='FILE',
default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
help='Read card data from PostgreSQL database (config file)')
option_parser.add_argument('--csv', metavar='FILE', help='input CSV file with card data', required=True)
option_parser.add_argument("--table-name", help="name of the card key table", type=str, required=True)
option_parser.add_argument("--update-columns", help="add missing table columns", action='store_true', default=False)
option_parser.add_argument("--create-table", action='store_true', help="create new card key table", default=False)
option_parser.add_argument("--admin", action='store_true', help="perform action as admin", default=False)
opts = option_parser.parse_args()
PySimLogger.setup(print, {logging.WARN: "\033[33m"})
if (opts.verbose):
PySimLogger.set_verbose(True)
PySimLogger.set_level(logging.DEBUG)
# Open CSV file
cr = open_csv(opts)
# Open database, create initial table, update column scheme
db = open_db(cr, opts)
# Progress with import
if not opts.admin:
import_from_csv(db, cr)
# Commit changes to the database
db.commit()

View File

@@ -24,7 +24,7 @@ ICCID_HELP='The ICCID of the eSIM that shall be made available'
MATCHID_HELP='MatchingID that shall be used by profile download' MATCHID_HELP='MatchingID that shall be used by profile download'
parser = argparse.ArgumentParser(description=""" parser = argparse.ArgumentParser(description="""
Utility to manually issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""") Utility to manuall issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint') parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint')
parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+') parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+')
parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server') parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server')
@@ -63,7 +63,7 @@ if __name__ == '__main__':
data = {} data = {}
for k, v in vars(opts).items(): for k, v in vars(opts).items():
if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']: if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']:
# remove keys from dict that should not end up in JSON... # remove keys from dict that shold not end up in JSON...
continue continue
if v is not None: if v is not None:
data[k] = v data[k] = v

View File

@@ -68,7 +68,7 @@ parser_dl.add_argument('--confirmation-code',
# notification # notification
parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification') parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification')
parser_ntf.add_argument('operation', choices=['enable','disable','delete'], parser_ntf.add_argument('operation', choices=['enable','disable','delete'],
help='Profile Management Operation whoise occurrence shall be notififed') help='Profile Management Opreation whoise occurrence shall be notififed')
parser_ntf.add_argument('--sequence-nr', type=int, required=True, parser_ntf.add_argument('--sequence-nr', type=int, required=True,
help='eUICC global notification sequence number') help='eUICC global notification sequence number')
parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL') parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL')
@@ -123,8 +123,8 @@ class Es9pClient:
'profileManagementOperation': PMO(self.opts.operation).to_bitstring(), 'profileManagementOperation': PMO(self.opts.operation).to_bitstring(),
'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc, 'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc,
} }
if self.opts.iccid: if opts.iccid:
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid)) ntf_metadata['iccid'] = h2b(swap_nibbles(opts.iccid))
if self.opts.operation == 'download': if self.opts.operation == 'download':
pird = { pird = {

View File

@@ -1,40 +0,0 @@
#!/usr/bin/env python3
# (C) 2025 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
from osmocom.utils import h2b, swap_nibbles
from pySim.esim.es8p import ProfileMetadata
parser = argparse.ArgumentParser(description="""Utility program to generate profile metadata in the
StoreMetadataRequest format based on input values from the command line.""")
parser.add_argument('--iccid', required=True, help="ICCID of eSIM profile");
parser.add_argument('--spn', required=True, help="Service Provider Name");
parser.add_argument('--profile-name', required=True, help="eSIM Profile Name");
parser.add_argument('--profile-class', choices=['test', 'operational', 'provisioning'],
default='operational', help="Profile Class");
parser.add_argument('--outfile', required=True, help="Output File Name");
if __name__ == '__main__':
opts = parser.parse_args()
iccid_bin = h2b(swap_nibbles(opts.iccid))
pmd = ProfileMetadata(iccid_bin, spn=opts.spn, profile_name=opts.profile_name,
profile_class=opts.profile_class)
with open(opts.outfile, 'wb') as f:
f.write(pmd.gen_store_metadata_request())
print("Written StoreMetadataRequest to '%s'" % opts.outfile)

View File

@@ -82,6 +82,10 @@ case "$JOB_TYPE" in
pip install -r requirements.txt pip install -r requirements.txt
# XXX: workaround for https://github.com/python-cmd2/cmd2/issues/1414
# 2.4.3 was the last stable release not affected by this bug (OS#6776)
pip install cmd2==2.4.3
rm -rf docs/_build rm -rf docs/_build
make -C "docs" html latexpdf make -C "docs" html latexpdf

View File

@@ -56,7 +56,7 @@ parser_rpe.add_argument('--output-file', required=True, help='Output file name')
parser_rpe.add_argument('--identification', default=[], type=int, action='append', help='Remove PEs matching specified identification') parser_rpe.add_argument('--identification', default=[], type=int, action='append', help='Remove PEs matching specified identification')
parser_rpe.add_argument('--type', default=[], action='append', help='Remove PEs matching specified type') parser_rpe.add_argument('--type', default=[], action='append', help='Remove PEs matching specified type')
parser_rn = subparsers.add_parser('remove-naa', help='Remove specified NAAs from PE-Sequence') parser_rn = subparsers.add_parser('remove-naa', help='Remove speciifed NAAs from PE-Sequence')
parser_rn.add_argument('--output-file', required=True, help='Output file name') parser_rn.add_argument('--output-file', required=True, help='Output file name')
parser_rn.add_argument('--naa-type', required=True, choices=NAAs.keys(), help='Network Access Application type to remove') parser_rn.add_argument('--naa-type', required=True, choices=NAAs.keys(), help='Network Access Application type to remove')
# TODO: add an --naa-index or the like, so only one given instance can be removed # TODO: add an --naa-index or the like, so only one given instance can be removed
@@ -329,7 +329,7 @@ def do_info(pes: ProfileElementSequence, opts):
print("Security domain Instance AID: %s" % b2h(sd.decoded['instance']['instanceAID'])) print("Security domain Instance AID: %s" % b2h(sd.decoded['instance']['instanceAID']))
# FIXME: 'applicationSpecificParametersC9' parsing to figure out enabled SCP # FIXME: 'applicationSpecificParametersC9' parsing to figure out enabled SCP
for key in sd.keys: for key in sd.keys:
print("\t%s" % repr(key)) print("\tKVN=0x%02x, KID=0x%02x, %s" % (key.key_version_number, key.key_identifier, key.key_components))
# RFM # RFM
print() print()

View File

@@ -27,5 +27,5 @@ PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH add-app-i
# Display the contents of the resulting application PE: # Display the contents of the resulting application PE:
PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH info --apps PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH info --apps
# For an explanation of --uicc-toolkit-app-spec-pars, see: # For an explaination of --uicc-toolkit-app-spec-pars, see:
# ETSI TS 102 226, section 8.2.1.3.2.2.1 # ETSI TS 102 226, section 8.2.1.3.2.2.1

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# A more useful version of the 'unber' tool provided with asn1c: # A more useful verion of the 'unber' tool provided with asn1c:
# Give a hierarchical decode of BER/DER-encoded ASN.1 TLVs # Give a hierarchical decode of BER/DER-encoded ASN.1 TLVs
import sys import sys

View File

@@ -1,4 +1,4 @@
Retrieving card-individual keys via CardKeyProvider Retrieving card-individual keys via CardKeyProvider
=================================================== ===================================================
When working with a batch of cards, or more than one card in general, it When working with a batch of cards, or more than one card in general, it
@@ -20,11 +20,9 @@ example develop your own CardKeyProvider that queries some kind of
database for the key material, or that uses a key derivation function to database for the key material, or that uses a key derivation function to
derive card-specific key material from a global master key. derive card-specific key material from a global master key.
pySim already includes two CardKeyProvider implementations. One to retrieve The only actual CardKeyProvider implementation included in pySim is the
key material from a CSV file (`CardKeyProviderCsv`) and a second one that allows `CardKeyProviderCsv` which retrieves the key material from a
to retrieve the key material from a PostgreSQL database (`CardKeyProviderPgsql`). [potentially encrypted] CSV file.
Both implementations equally implement a column encryption scheme that allows
to protect sensitive columns using a *transport key*
The CardKeyProviderCsv The CardKeyProviderCsv
@@ -42,215 +40,11 @@ of pySim-shell. If you do not specify a CSV file, pySim will attempt to
open a CSV file from the default location at open a CSV file from the default location at
`~/.osmocom/pysim/card_data.csv`, and use that, if it exists. `~/.osmocom/pysim/card_data.csv`, and use that, if it exists.
The `CardKeyProviderCsv` is suitable to manage small amounts of key material
locally. However, if your card inventory is very large and the key material
must be made available on multiple sites, the `CardKeyProviderPgsql` is the
better option.
The CardKeyProviderPqsql
------------------------
With the `CardKeyProviderPsql` you can use a PostgreSQL database as storage
medium. The implementation comes with a CSV importer tool that consumes the
same CSV files you would normally use with the `CardKeyProviderCsv`, so you
can just use your existing CSV files and import them into the database.
Setting up the database
^^^^^^^^^^^^^^^^^^^^^^^
From the perspective of the database, the `CardKeyProviderPsql` has only
minimal requirements. You do not have to create any tables in advance. An empty
database and at least one user that may create, alter and insert into tables is
sufficient. However, for increased reliability and as a protection against
incorrect operation, the `CardKeyProviderPsql` supports a hierarchical model
with three users (or roles):
* **admin**:
This should be the owner of the database. It is intended to be used for
administrative tasks like adding new tables or adding new columns to existing
tables. This user should not be used to insert new data into tables or to access
data from within pySim-shell using the `CardKeyProviderPsql`
* **importer**:
This user is used when feeding new data into an existing table. It should only
be able to insert new rows into existing tables. It should not be used for
administrative tasks or to access data from within pySim-shell using the
`CardKeyProviderPsql`
* **reader**:
To access data from within pySim shell using the `CardKeyProviderPsql` the
reader user is the correct one to use. This user should have no write access
to the database or any of the tables.
Creating a config file
^^^^^^^^^^^^^^^^^^^^^^
The default location for the config file is `~/.osmocom/pysim/card_data_pqsql.cfg`
The file uses `yaml` syntax and should look like the example below:
::
host: "127.0.0.1"
db_name: "my_database"
table_names:
- "uicc_keys"
- "euicc_keys"
db_users:
admin:
name: "my_admin_user"
pass: "my_admin_password"
importer:
name: "my_importer_user"
pass: "my_importer_password"
reader:
name: "my_reader_user"
pass: "my_reader_password"
This file is used by pySim-shell and by the importer tool. Both expect the file
in the aforementioned location. In case you want to store the file in a
different location you may use the `--pgsql` commandline option to provide a
custom config file path.
The hostname and the database name for the PostgreSQL database is set with the
`host` and `db_name` fields. The field `db_users` sets the user names and
passwords for each of the aforementioned users (or roles). In case only a single
admin user is used, all three entries may be populated with the same user name
and password (not recommended)
The field `table_names` sets the tables that the `CardKeyProviderPsql` shall
use to query to locate card key data. You can set up as many tables as you
want, `CardKeyProviderPsql` will query them in order, one by one until a
matching entry is found.
NOTE: In case you do not want to disclose the admin and the importer credentials
to pySim-shell you may remove those lines. pySim-shell will only require the
`reader` entry under `db_users`.
Using the Importer
^^^^^^^^^^^^^^^^^^
Before data can be imported, you must first create a database table. Tables
are created with the provided importer tool, which can be found under
`contrib/csv-to-pgsql.py`. This tool is used to create the database table and
read the data from the provided CSV file into the database.
As mentioned before, all CSV file formats that work with `CardKeyProviderCsv`
may be used. To demonstrate how the import process works, let's assume you want
to import a CSV file format that looks like the following example. Let's also
assume that you didn't get the Global Platform keys from your card vendor for
this batch of UICC cards, so your CSV file lacks the columns for those fields.
::
"id","imsi","iccid","acc","pin1","puk1","pin2","puk2","ki","opc","adm1"
"card1","999700000000001","8900000000000000001","0001","1111","11111111","0101","01010101","11111111111111111111111111111111","11111111111111111111111111111111","11111111"
"card2","999700000000002","8900000000000000002","0002","2222","22222222","0202","02020202","22222222222222222222222222222222","22222222222222222222222222222222","22222222"
"card3","999700000000003","8900000000000000003","0003","3333","22222222","0303","03030303","33333333333333333333333333333333","33333333333333333333333333333333","33333333"
Since this is your first import, the database still lacks the table. To
instruct the importer to create a new table, you may use the `--create-table`
option. You also have to pick an appropriate name for the table. Any name may
be chosen as long as it contains the string `uicc_keys` or `euicc_keys`,
depending on the type of data (`UICC` or `eUICC`) you intend to store in the
table. The creation of the table is an administrative task and can only be done
with the `admin` user. The `admin` user is selected using the `--admin` switch.
::
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys --create-table --admin
INFO: CSV file: ./csv-to-pgsql_example_01.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_admin_user
INFO: New database table created: uicc_keys
INFO: Database table: uicc_keys
INFO: Database table columns: ['ICCID', 'IMSI']
INFO: Adding missing columns: ['PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
INFO: Changes to table uicc_keys committed!
The importer has created a new table with the name `uicc_keys`. The table is
now ready to be filled with data.
::
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys
INFO: CSV file: ./csv-to-pgsql_example_01.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_importer_user
INFO: Database table: uicc_keys
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
INFO: CSV file import done, 3 rows imported
INFO: Changes to table uicc_keys committed!
A quick `SELECT * FROM uicc_keys;` at the PostgreSQL console should now display
the contents of the CSV file you have fed into the importer.
Let's now assume that with your next batch of UICC cards your vendor includes
the Global Platform keys so your CSV format changes. It may now look like this:
::
"id","imsi","iccid","acc","pin1","puk1","pin2","puk2","ki","opc","adm1","scp02_dek_1","scp02_enc_1","scp02_mac_1"
"card4","999700000000004","8900000000000000004","0004","4444","44444444","0404","04040404","44444444444444444444444444444444","44444444444444444444444444444444","44444444","44444444444444444444444444444444","44444444444444444444444444444444","44444444444444444444444444444444"
"card5","999700000000005","8900000000000000005","0005","4444","55555555","0505","05050505","55555555555555555555555555555555","55555555555555555555555555555555","55555555","55555555555555555555555555555555","55555555555555555555555555555555","55555555555555555555555555555555"
"card6","999700000000006","8900000000000000006","0006","4444","66666666","0606","06060606","66666666666666666666666666666666","66666666666666666666666666666666","66666666","66666666666666666666666666666666","66666666666666666666666666666666","66666666666666666666666666666666"
When importing data from an updated CSV format the database table also has
to be updated. This is done using the `--update-columns` switch. Like when
creating new tables, this operation also requires admin privileges, so the
`--admin` switch is required again.
::
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys --update-columns --admin
INFO: CSV file: ./csv-to-pgsql_example_02.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_admin_user
INFO: Database table: uicc_keys
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
INFO: Adding missing columns: ['SCP02_ENC_1', 'SCP02_MAC_1', 'SCP02_DEK_1']
INFO: Changes to table uicc_keys committed!
When the new table columns are added, the import may be continued like the
first one:
::
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys
INFO: CSV file: ./csv-to-pgsql_example_02.csv
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pqsql.cfg
INFO: Database host: 127.0.0.1
INFO: Database name: my_database
INFO: Database user: my_importer_user
INFO: Database table: uicc_keys
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC', 'SCP02_ENC_1', 'SCP02_MAC_1', 'SCP02_DEK_1']
INFO: CSV file import done, 3 rows imported
INFO: Changes to table uicc_keys committed!
On the PostgreSQL console a `SELECT * FROM uicc_keys;` should now show the
imported data with the added columns. All important data should now also be
available from within pySim-shell via the `CardKeyProviderPgsql`.
Column-Level CSV encryption Column-Level CSV encryption
--------------------------- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
pySim supports column-level CSV encryption. This feature will make sure pySim supports column-level CSV encryption. This feature will make sure
that your key material is not stored in plaintext in the CSV file (or that your key material is not stored in plaintext in the CSV file.
database).
The encryption mechanism uses AES in CBC mode. You can use any key The encryption mechanism uses AES in CBC mode. You can use any key
length permitted by AES (128/192/256 bit). length permitted by AES (128/192/256 bit).
@@ -278,8 +72,6 @@ by all columns of the set:
* `SCP03_ISDA` is a group alias for `SCP03_ENC_ISDA`, `SCP03_MAC_ISDA`, `SCP03_DEK_ISDA` * `SCP03_ISDA` is a group alias for `SCP03_ENC_ISDA`, `SCP03_MAC_ISDA`, `SCP03_DEK_ISDA`
* `SCP03_ISDR` is a group alias for `SCP03_ENC_ISDR`, `SCP03_MAC_ISDR`, `SCP03_DEK_ISDR` * `SCP03_ISDR` is a group alias for `SCP03_ENC_ISDR`, `SCP03_MAC_ISDR`, `SCP03_DEK_ISDR`
NOTE: When using `CardKeyProviderPqsl`, the input CSV files must be encrypted
before import.
Field naming Field naming
------------ ------------
@@ -290,9 +82,9 @@ Field naming
* For look-up of eUICC specific key material (like SCP03 keys for the * For look-up of eUICC specific key material (like SCP03 keys for the
ISD-R, ECASD), pySim uses the `EID` field as lookup key. ISD-R, ECASD), pySim uses the `EID` field as lookup key.
As soon as the CardKeyProvider finds a line (row) in your CSV file As soon as the CardKeyProviderCsv finds a line (row) in your CSV where
(or database) where the ICCID or EID match, it looks for the column containing the ICCID or EID match, it looks for the column containing the requested
the requested data. data.
ADM PIN ADM PIN

View File

@@ -18,17 +18,9 @@ sys.path.insert(0, os.path.abspath('..'))
# -- Project information ----------------------------------------------------- # -- Project information -----------------------------------------------------
project = 'osmopysim-usermanual' project = 'osmopysim-usermanual'
copyright = '2009-2025 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta' copyright = '2009-2023 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta' author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
# PDF: Avoid that the authors list exceeds the page by inserting '\and'
# manually as line break (https://github.com/sphinx-doc/sphinx/issues/6875)
latex_elements = {
"maketitle":
r"""\author{Sylvain Munaut, Harald Welte, Philipp Maier, \and Supreeth Herle, Merlin Chlosta}
\sphinxmaketitle
"""
}
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------

View File

@@ -49,7 +49,7 @@ Two modes are possible:
Ki and OPc will be generated during each programming cycle. This means fresh keys are generated, even when the Ki and OPc will be generated during each programming cycle. This means fresh keys are generated, even when the
``--num`` remains unchanged. ``--num`` remains unchanged.
The parameter ``--num`` specifies a card individual number. This number will be managed into the random seed so that The parameter ``--num`` specifies a card individual number. This number will be manged into the random seed so that
it serves as an identifier for a particular set of randomly generated parameters. it serves as an identifier for a particular set of randomly generated parameters.
In the example above the parameters ``--mcc``, and ``--mnc`` are specified as well, since they identify the GSM In the example above the parameters ``--mcc``, and ``--mnc`` are specified as well, since they identify the GSM
@@ -77,7 +77,7 @@ the parameter ``--type``. The following card types are supported:
Specifying the card reader: Specifying the card reader:
It is most common to use ``pySim-prog`` together with a PCSC reader. The PCSC reader number is specified via the It is most common to use ``pySim-prog`` together whith a PCSC reader. The PCSC reader number is specified via the
``--pcsc-device`` or ``-p`` option. However, other reader types (such as serial readers and modems) are supported. Use ``--pcsc-device`` or ``-p`` option. However, other reader types (such as serial readers and modems) are supported. Use
the ``--help`` option of ``pySim-prog`` for more information. the ``--help`` option of ``pySim-prog`` for more information.

View File

@@ -21,9 +21,9 @@ osmo-smdpp currently
* [by default] uses test certificates copied from GSMA SGP.26 into `./smdpp-data/certs`, assuming that your * [by default] uses test certificates copied from GSMA SGP.26 into `./smdpp-data/certs`, assuming that your
osmo-smdpp would be running at the host name `testsmdpplus1.example.com`. You can of course replace those osmo-smdpp would be running at the host name `testsmdpplus1.example.com`. You can of course replace those
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with matching eUICCs. certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with mathcing eUICCs.
* doesn't understand profile state. Any profile can always be downloaded any number of times, irrespective * doesn't understand profile state. Any profile can always be downloaded any number of times, irrespective
of the EID or whether it was downloaded before. This is actually very useful for R&D and testing, as it of the EID or whether it was donwloaded before. This is actually very useful for R&D and testing, as it
doesn't require you to generate new profiles all the time. This logic of course is unsuitable for doesn't require you to generate new profiles all the time. This logic of course is unsuitable for
production usage. production usage.
* doesn't perform any personalization, so the IMSI/ICCID etc. are always identical (the ones that are stored in * doesn't perform any personalization, so the IMSI/ICCID etc. are always identical (the ones that are stored in
@@ -40,21 +40,16 @@ osmo-smdpp currently
Running osmo-smdpp Running osmo-smdpp
------------------ ------------------
osmo-smdpp comes with built-in TLS support which is enabled by default. However, it is always possible to osmo-smdpp does not have built-in TLS support as the used *twisted* framework appears to have
disable the built-in TLS support if needed. problems when using the example elliptic curve certificates (both NIST and Brainpool) from GSMA.
In order to use osmo-smdpp without the built-in TLS support, it has to be put behind a TLS reverse proxy,
which terminates the ES9+ HTTPS traffic from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
NOTE: The built in TLS support in osmo-smdpp makes use of the python *twisted* framework. Older versions
of this framework appear to have problems when using the example elliptic curve certificates (both NIST and
Brainpool) from GSMA.
So in order to use it, you have to put it behind a TLS reverse proxy, which terminates the ES9+
HTTPS from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
nginx as TLS proxy nginx as TLS proxy
~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~
If you chose to use `nginx` as TLS reverse proxy, you can use the following configuration snippet:: If you use `nginx` as web server, you can use the following configuration snippet::
upstream smdpp { upstream smdpp {
server localhost:8000; server localhost:8000;
@@ -97,43 +92,32 @@ The `smdpp-data/upp` directory contains the UPP (Unprotected Profile Package) us
commandline options commandline options
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
Typically, you just run osmo-smdpp without any arguments, and it will bind its built-in HTTPS ES9+ interface to Typically, you just run it without any arguments, and it will bind its plain-HTTP ES9+ interface to
`localhost` TCP port 443. In this case an external TLS reverse proxy is not needed. `localhost` TCP port 8000.
osmo-smdpp currently doesn't have any configuration file. osmo-smdpp currently doesn't have any configuration file.
There are command line options for binding: There are command line options for binding:
Bind the HTTPS ES9+ to a port other than 443:: Bind the HTTP ES9+ to a port other than 8000::
./osmo-smdpp.py -p 8443 ./osmo-smdpp.py -p 8001
Disable the built-in TLS support and bind the plain-HTTP ES9+ to a port 8000::
./osmo-smdpp.py -p 8000 --nossl
Bind the HTTP ES9+ to a different local interface:: Bind the HTTP ES9+ to a different local interface::
./osmo-smdpp.py -H 127.0.0.2 ./osmo-smdpp.py -H 127.0.0.1
DNS setup for your LPA DNS setup for your LPA
~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~
The LPA must resolve `testsmdpplus1.example.com` to the IP address of your TLS proxy. The LPA must resolve `testsmdpplus1.example.com` to the IP address of your TLS proxy.
It must also accept the TLS certificates used by your TLS proxy. In case osmo-smdpp is used with built-in TLS support, It must also accept the TLS certificates used by your TLS proxy.
it will use the certificates provided in smdpp-data.
NOTE: The HTTPS ES9+ interface cannot be addressed by the LPA directly via its IP address. The reason for this is that
the included SGP.26 (DPtls) test certificates explicitly restrict the hostname to `testsmdpplus1.example.com` in the
`X509v3 Subject Alternative Name` extension. Using a bare IP address as hostname may cause the certificate to be
rejected by the LPA.
Supported eUICC Supported eUICC
~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~
If you run osmo-smdpp with the included SGP.26 (DPauth, DPpb) certificates, you must use an eUICC with matching SGP.26 If you run osmo-smdpp with the included SGP.26 certificates, you must use an eUICC with matching SGP.26
certificates, i.e. the EUM certificate must be signed by a SGP.26 test root CA and the eUICC certificate certificates, i.e. the EUM certificate must be signed by a SGP.26 test root CA and the eUICC certificate
in turn must be signed by that SGP.26 EUM certificate. in turn must be signed by that SGP.26 EUM certificate.

View File

@@ -75,7 +75,7 @@ The response body is a JSON document, either
#. key freshness failure #. key freshness failure
#. unspecified card error #. unspecified card error
Example (success): Example (succcess):
:: ::
{ {

View File

@@ -17,7 +17,7 @@ In any case, in order to operate a SUCI-enabled 5G SA network, you will have to
#. deploy the public key on your USIMs #. deploy the public key on your USIMs
#. deploy the private key on your 5GC, specifically the UDM function #. deploy the private key on your 5GC, specifically the UDM function
pysim contains (in its `contrib` directory) a small utility program that can make it easy to generate pysim contains (int its `contrib` directory) a small utility program that can make it easy to generate
such keys: `suci-keytool.py` such keys: `suci-keytool.py`
Generating keys Generating keys

View File

@@ -30,7 +30,7 @@ This guide covers the basic workflow of provisioning SIM cards with the 5G SUCI
For specific information on sysmocom SIM cards, refer to For specific information on sysmocom SIM cards, refer to
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the current * the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the curent
sysmoISIM-SJA5 product sysmoISIM-SJA5 product
* the `sysmoISIM-SJA2 User Manual <https://sysmocom.de/manuals/sysmousim-manual.pdf>`__ for the older * the `sysmoISIM-SJA2 User Manual <https://sysmocom.de/manuals/sysmousim-manual.pdf>`__ for the older
sysmoISIM-SJA2 product sysmoISIM-SJA2 product

File diff suppressed because it is too large Load Diff

View File

@@ -586,7 +586,7 @@ def read_params_csv(opts, imsi=None, iccid=None):
else: else:
row['mnc'] = row.get('mnc', mnc_from_imsi(row.get('imsi'), False)) row['mnc'] = row.get('mnc', mnc_from_imsi(row.get('imsi'), False))
# NOTE: We might consider to specify a new CSV field "mnclen" in our # NOTE: We might concider to specify a new CSV field "mnclen" in our
# CSV files for a better automatization. However, this only makes sense # CSV files for a better automatization. However, this only makes sense
# when the tools and databases we export our files from will also add # when the tools and databases we export our files from will also add
# such a field. # such a field.

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# #
# Utility to display some information about a SIM card # Utility to display some informations about a SIM card
# #
# #
# Copyright (C) 2009 Sylvain Munaut <tnt@246tNt.com> # Copyright (C) 2009 Sylvain Munaut <tnt@246tNt.com>

View File

@@ -22,25 +22,19 @@ from typing import List, Optional
import json import json
import traceback import traceback
import re import re
import cmd2 import cmd2
from packaging import version from packaging import version
from cmd2 import style from cmd2 import style
import logging
from pySim.log import PySimLogger
from osmocom.utils import auto_uint8
# cmd2 >= 2.3.0 has deprecated the bg/fg in favor of Bg/Fg :( # cmd2 >= 2.3.0 has deprecated the bg/fg in favor of Bg/Fg :(
if version.parse(cmd2.__version__) < version.parse("2.3.0"): if version.parse(cmd2.__version__) < version.parse("2.3.0"):
from cmd2 import fg, bg # pylint: disable=no-name-in-module from cmd2 import fg, bg # pylint: disable=no-name-in-module
RED = fg.red RED = fg.red
YELLOW = fg.yellow
LIGHT_RED = fg.bright_red LIGHT_RED = fg.bright_red
LIGHT_GREEN = fg.bright_green LIGHT_GREEN = fg.bright_green
else: else:
from cmd2 import Fg, Bg # pylint: disable=no-name-in-module from cmd2 import Fg, Bg # pylint: disable=no-name-in-module
RED = Fg.RED RED = Fg.RED
YELLOW = Fg.YELLOW
LIGHT_RED = Fg.LIGHT_RED LIGHT_RED = Fg.LIGHT_RED
LIGHT_GREEN = Fg.LIGHT_GREEN LIGHT_GREEN = Fg.LIGHT_GREEN
from cmd2 import CommandSet, with_default_category, with_argparser from cmd2 import CommandSet, with_default_category, with_argparser
@@ -69,12 +63,10 @@ from pySim.ts_102_222 import Ts102222Commands
from pySim.gsm_r import DF_EIRENE from pySim.gsm_r import DF_EIRENE
from pySim.cat import ProactiveCommand from pySim.cat import ProactiveCommand
from pySim.card_key_provider import CardKeyProviderCsv, CardKeyProviderPgsql from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
from pySim.app import init_card from pySim.app import init_card
log = PySimLogger.get("main")
class Cmd2Compat(cmd2.Cmd): class Cmd2Compat(cmd2.Cmd):
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer """Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
@@ -100,19 +92,15 @@ class PysimApp(Cmd2Compat):
(C) 2021-2023 by Harald Welte, sysmocom - s.f.m.c. GmbH and contributors (C) 2021-2023 by Harald Welte, sysmocom - s.f.m.c. GmbH and contributors
Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/shell.html """ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/shell.html """
def __init__(self, verbose, card, rs, sl, ch, script=None): def __init__(self, card, rs, sl, ch, script=None):
if version.parse(cmd2.__version__) < version.parse("2.0.0"): if version.parse(cmd2.__version__) < version.parse("2.0.0"):
kwargs = {'use_ipython': True} kwargs = {'use_ipython': True}
else: else:
kwargs = {'include_ipy': True} kwargs = {'include_ipy': True}
self.verbose = verbose
self._onchange_verbose('verbose', False, self.verbose);
# pylint: disable=unexpected-keyword-arg # pylint: disable=unexpected-keyword-arg
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False, super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
auto_load_commands=False, startup_script=script, **kwargs) auto_load_commands=False, startup_script=script, **kwargs)
PySimLogger.setup(self.poutput, {logging.WARN: YELLOW})
self.intro = style(self.BANNER, fg=RED) self.intro = style(self.BANNER, fg=RED)
self.default_category = 'pySim-shell built-in commands' self.default_category = 'pySim-shell built-in commands'
self.card = None self.card = None
@@ -138,9 +126,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
self.add_settable(Settable2Compat('apdu_strict', bool, self.add_settable(Settable2Compat('apdu_strict', bool,
'Enforce APDU responses according to ISO/IEC 7816-3, table 12', self, 'Enforce APDU responses according to ISO/IEC 7816-3, table 12', self,
onchange_cb=self._onchange_apdu_strict)) onchange_cb=self._onchange_apdu_strict))
self.add_settable(Settable2Compat('verbose', bool,
'Enable/disable verbose logging', self,
onchange_cb=self._onchange_verbose))
self.equip(card, rs) self.equip(card, rs)
def equip(self, card, rs): def equip(self, card, rs):
@@ -225,13 +210,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
else: else:
self.card._scc._tp.apdu_strict = False self.card._scc._tp.apdu_strict = False
def _onchange_verbose(self, param_name, old, new):
PySimLogger.set_verbose(new)
if new == True:
PySimLogger.set_level(logging.DEBUG)
else:
PySimLogger.set_level(logging.INFO)
class Cmd2ApduTracer(ApduTracer): class Cmd2ApduTracer(ApduTracer):
def __init__(self, cmd2_app): def __init__(self, cmd2_app):
self.cmd2 = cmd2_app self.cmd2 = cmd2_app
@@ -287,7 +265,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
def do_apdu(self, opts): def do_apdu(self, opts):
"""Send a raw APDU to the card, and print SW + Response. """Send a raw APDU to the card, and print SW + Response.
CAUTION: this command bypasses the logical channel handling of pySim-shell and card state changes are not CAUTION: this command bypasses the logical channel handling of pySim-shell and card state changes are not
tracked. Depending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select tracked. Dpending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
a different file.""" a different file."""
# When sending raw APDUs we access the scc object through _scc member of the card object. It should also be # When sending raw APDUs we access the scc object through _scc member of the card object. It should also be
@@ -358,7 +336,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
def _process_card(self, first, script_path): def _process_card(self, first, script_path):
# Early phase of card initialization (this part may fail with an exception) # Early phase of card initialzation (this part may fail with an exception)
try: try:
rs, card = init_card(self.sl) rs, card = init_card(self.sl)
rc = self.equip(card, rs) rc = self.equip(card, rs)
@@ -399,7 +377,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
bulk_script_parser = argparse.ArgumentParser() bulk_script_parser = argparse.ArgumentParser()
bulk_script_parser.add_argument('SCRIPT_PATH', help="path to the script file") bulk_script_parser.add_argument('SCRIPT_PATH', help="path to the script file")
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exception occurs', bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exeption occurs',
action='store_true') action='store_true')
bulk_script_parser.add_argument('--tries', type=int, default=2, bulk_script_parser.add_argument('--tries', type=int, default=2,
help='how many tries before trying the next card') help='how many tries before trying the next card')
@@ -499,23 +477,6 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
"""Echo (print) a string on the console""" """Echo (print) a string on the console"""
self.poutput(' '.join(opts.STRING)) self.poutput(' '.join(opts.STRING))
query_card_key_parser = argparse.ArgumentParser()
query_card_key_parser.add_argument('FIELDS', help="fields to query", type=str, nargs='+')
query_card_key_parser.add_argument('--key', help='lookup key (typically \'ICCID\' or \'EID\')',
type=str, required=True)
query_card_key_parser.add_argument('--value', help='lookup key match value (e.g \'8988211000000123456\')',
type=str, required=True)
@cmd2.with_argparser(query_card_key_parser)
@cmd2.with_category(CUSTOM_CATEGORY)
def do_query_card_key(self, opts):
"""Manually query the Card Key Provider"""
result = card_key_provider_get(opts.FIELDS, opts.key, opts.value)
self.poutput("Result:")
if result == {}:
self.poutput(" (none)")
for k in result.keys():
self.poutput(" %s: %s" % (str(k), str(result.get(k))))
@cmd2.with_category(CUSTOM_CATEGORY) @cmd2.with_category(CUSTOM_CATEGORY)
def do_version(self, opts): def do_version(self, opts):
"""Print the pySim software version.""" """Print the pySim software version."""
@@ -770,7 +731,7 @@ class PySimCommands(CommandSet):
body = {} body = {}
for t in tags: for t in tags:
result = self._cmd.lchan.retrieve_data(t) result = self._cmd.lchan.retrieve_data(t)
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0])) (tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
body[t] = b2h(val) body[t] = b2h(val)
else: else:
raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename)) raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename))
@@ -954,53 +915,36 @@ class Iso7816Commands(CommandSet):
raise RuntimeError("cannot find %s for ICCID '%s'" % (field, iccid)) raise RuntimeError("cannot find %s for ICCID '%s'" % (field, iccid))
return result return result
@staticmethod
def __select_pin_nr(pin_type:str, pin_nr:int) -> int:
if pin_type:
# pylint: disable=unsubscriptable-object
return pin_names.inverse[pin_type]
return pin_nr
@staticmethod
def __add_pin_nr_to_ArgumentParser(chv_parser):
group = chv_parser.add_mutually_exclusive_group()
group.add_argument('--pin-type',
choices=[x for x in pin_names.values()
if (x.startswith('PIN') or x.startswith('2PIN'))],
help='Specifiy pin type (default is PIN1)')
group.add_argument('--pin-nr', type=auto_uint8, default=0x01,
help='PIN Number, 1=PIN1, 0x81=2PIN1 or custom value (see also TS 102 221, Table 9.3")')
verify_chv_parser = argparse.ArgumentParser() verify_chv_parser = argparse.ArgumentParser()
verify_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
verify_chv_parser.add_argument('PIN', nargs='?', type=is_decimal, verify_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried') help='PIN code value. If none given, CSV file will be queried')
__add_pin_nr_to_ArgumentParser(verify_chv_parser)
@cmd2.with_argparser(verify_chv_parser) @cmd2.with_argparser(verify_chv_parser)
def do_verify_chv(self, opts): def do_verify_chv(self, opts):
"""Verify (authenticate) using specified CHV (PIN) code, which is how the specifications """Verify (authenticate) using specified CHV (PIN) code, which is how the specifications
call it if you authenticate yourself using the specified PIN. There usually is at least PIN1 and call it if you authenticate yourself using the specified PIN. There usually is at least PIN1 and
2PIN1 (see also TS 102 221 Section 9.5.1 / Table 9.3).""" PIN2."""
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr) pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr)) (data, sw) = self._cmd.lchan.scc.verify_chv(opts.pin_nr, h2b(pin))
(data, sw) = self._cmd.lchan.scc.verify_chv(pin_nr, h2b(pin))
self._cmd.poutput("CHV verification successful") self._cmd.poutput("CHV verification successful")
unblock_chv_parser = argparse.ArgumentParser() unblock_chv_parser = argparse.ArgumentParser()
unblock_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
unblock_chv_parser.add_argument('PUK', nargs='?', type=is_decimal, unblock_chv_parser.add_argument('PUK', nargs='?', type=is_decimal,
help='PUK code value. If none given, CSV file will be queried') help='PUK code value. If none given, CSV file will be queried')
unblock_chv_parser.add_argument('NEWPIN', nargs='?', type=is_decimal, unblock_chv_parser.add_argument('NEWPIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried') help='PIN code value. If none given, CSV file will be queried')
__add_pin_nr_to_ArgumentParser(unblock_chv_parser)
@cmd2.with_argparser(unblock_chv_parser) @cmd2.with_argparser(unblock_chv_parser)
def do_unblock_chv(self, opts): def do_unblock_chv(self, opts):
"""Unblock PIN code using specified PUK code""" """Unblock PIN code using specified PUK code"""
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr) new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr)) puk = self.get_code(opts.PUK, "PUK" + str(opts.pin_nr))
puk = self.get_code(opts.PUK, "PUK" + str(pin_nr))
(data, sw) = self._cmd.lchan.scc.unblock_chv( (data, sw) = self._cmd.lchan.scc.unblock_chv(
pin_nr, h2b(puk), h2b(new_pin)) opts.pin_nr, h2b(puk), h2b(new_pin))
self._cmd.poutput("CHV unblock successful") self._cmd.poutput("CHV unblock successful")
change_chv_parser = argparse.ArgumentParser() change_chv_parser = argparse.ArgumentParser()
@@ -1008,42 +952,42 @@ class Iso7816Commands(CommandSet):
help='PIN code value. If none given, CSV file will be queried') help='PIN code value. If none given, CSV file will be queried')
change_chv_parser.add_argument('PIN', nargs='?', type=is_decimal, change_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried') help='PIN code value. If none given, CSV file will be queried')
__add_pin_nr_to_ArgumentParser(change_chv_parser) change_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
@cmd2.with_argparser(change_chv_parser) @cmd2.with_argparser(change_chv_parser)
def do_change_chv(self, opts): def do_change_chv(self, opts):
"""Change PIN code to a new PIN code""" """Change PIN code to a new PIN code"""
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr) new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr)) pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
(data, sw) = self._cmd.lchan.scc.change_chv( (data, sw) = self._cmd.lchan.scc.change_chv(
pin_nr, h2b(pin), h2b(new_pin)) opts.pin_nr, h2b(pin), h2b(new_pin))
self._cmd.poutput("CHV change successful") self._cmd.poutput("CHV change successful")
disable_chv_parser = argparse.ArgumentParser() disable_chv_parser = argparse.ArgumentParser()
disable_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
disable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal, disable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried') help='PIN code value. If none given, CSV file will be queried')
__add_pin_nr_to_ArgumentParser(disable_chv_parser)
@cmd2.with_argparser(disable_chv_parser) @cmd2.with_argparser(disable_chv_parser)
def do_disable_chv(self, opts): def do_disable_chv(self, opts):
"""Disable PIN code using specified PIN code""" """Disable PIN code using specified PIN code"""
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr) pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr)) (data, sw) = self._cmd.lchan.scc.disable_chv(opts.pin_nr, h2b(pin))
(data, sw) = self._cmd.lchan.scc.disable_chv(pin_nr, h2b(pin))
self._cmd.poutput("CHV disable successful") self._cmd.poutput("CHV disable successful")
enable_chv_parser = argparse.ArgumentParser() enable_chv_parser = argparse.ArgumentParser()
__add_pin_nr_to_ArgumentParser(enable_chv_parser) enable_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
enable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal, enable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried') help='PIN code value. If none given, CSV file will be queried')
@cmd2.with_argparser(enable_chv_parser) @cmd2.with_argparser(enable_chv_parser)
def do_enable_chv(self, opts): def do_enable_chv(self, opts):
"""Enable PIN code using specified PIN code""" """Enable PIN code using specified PIN code"""
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr) pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr)) (data, sw) = self._cmd.lchan.scc.enable_chv(opts.pin_nr, h2b(pin))
(data, sw) = self._cmd.lchan.scc.enable_chv(pin_nr, h2b(pin))
self._cmd.poutput("CHV enable successful") self._cmd.poutput("CHV enable successful")
def do_deactivate_file(self, opts): def do_deactivate_file(self, opts):
@@ -1127,26 +1071,16 @@ argparse_add_reader_args(option_parser)
global_group = option_parser.add_argument_group('General Options') global_group = option_parser.add_argument_group('General Options')
global_group.add_argument('--script', metavar='PATH', default=None, global_group.add_argument('--script', metavar='PATH', default=None,
help='script with pySim-shell commands to be executed automatically at start-up') help='script with pySim-shell commands to be executed automatically at start-up')
global_group.add_argument('--csv', metavar='FILE',
default=None, help='Read card data from CSV file')
global_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help='per-CSV-column AES transport key')
global_group.add_argument("--card_handler", dest="card_handler_config", metavar="FILE", global_group.add_argument("--card_handler", dest="card_handler_config", metavar="FILE",
help="Use automatic card handling machine") help="Use automatic card handling machine")
global_group.add_argument("--noprompt", help="Run in non interactive mode", global_group.add_argument("--noprompt", help="Run in non interactive mode",
action='store_true', default=False) action='store_true', default=False)
global_group.add_argument("--skip-card-init", help="Skip all card/profile initialization", global_group.add_argument("--skip-card-init", help="Skip all card/profile initialization",
action='store_true', default=False) action='store_true', default=False)
global_group.add_argument("--verbose", help="Enable verbose logging",
action='store_true', default=False)
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
card_key_group.add_argument('--csv', metavar='FILE',
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
help='Read card data from CSV file')
card_key_group.add_argument('--pqsql', metavar='FILE',
default=str(Path.home()) + "/.osmocom/pysim/card_data_pqsql.cfg",
help='Read card data from PostgreSQL database (config file)')
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help=argparse.SUPPRESS, dest='column_key')
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help='per-column AES transport key', dest='column_key')
adm_group = global_group.add_mutually_exclusive_group() adm_group = global_group.add_mutually_exclusive_group()
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None, adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
@@ -1161,29 +1095,23 @@ option_parser.add_argument("command", nargs='?',
option_parser.add_argument('command_args', nargs=argparse.REMAINDER, option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
help="Optional Arguments for command") help="Optional Arguments for command")
if __name__ == '__main__': if __name__ == '__main__':
startup_errors = False startup_errors = False
opts = option_parser.parse_args() opts = option_parser.parse_args()
# Ensure that we are able to print formatted warnings from the beginning.
PySimLogger.setup(print, {logging.WARN: YELLOW})
if (opts.verbose):
PySimLogger.set_verbose(True)
PySimLogger.set_level(logging.DEBUG)
else:
PySimLogger.set_verbose(False)
PySimLogger.set_level(logging.INFO)
# Register csv-file as card data provider, either from specified CSV # Register csv-file as card data provider, either from specified CSV
# or from CSV file in home directory # or from CSV file in home directory
column_keys = {} csv_column_keys = {}
for par in opts.column_key: for par in opts.csv_column_key:
name, key = par.split(':') name, key = par.split(':')
column_keys[name] = key csv_column_keys[name] = key
if os.path.isfile(opts.csv): csv_default = str(Path.home()) + "/.osmocom/pysim/card_data.csv"
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys)) if opts.csv:
if os.path.isfile(opts.pqsql): card_key_provider_register(CardKeyProviderCsv(opts.csv, csv_column_keys))
card_key_provider_register(CardKeyProviderPgsql(opts.pqsql, column_keys)) if os.path.isfile(csv_default):
card_key_provider_register(CardKeyProviderCsv(csv_default, csv_column_keys))
# Init card reader driver # Init card reader driver
sl = init_reader(opts, proactive_handler = Proact()) sl = init_reader(opts, proactive_handler = Proact())
@@ -1199,7 +1127,7 @@ if __name__ == '__main__':
# able to tolerate and recover from that. # able to tolerate and recover from that.
try: try:
rs, card = init_card(sl, opts.skip_card_init) rs, card = init_card(sl, opts.skip_card_init)
app = PysimApp(opts.verbose, card, rs, sl, ch) app = PysimApp(card, rs, sl, ch)
except: except:
startup_errors = True startup_errors = True
print("Card initialization (%s) failed with an exception:" % str(sl)) print("Card initialization (%s) failed with an exception:" % str(sl))
@@ -1211,7 +1139,7 @@ if __name__ == '__main__':
print(" it should also be noted that some readers may behave strangely when no card") print(" it should also be noted that some readers may behave strangely when no card")
print(" is inserted.)") print(" is inserted.)")
print("") print("")
app = PysimApp(opts.verbose, None, None, sl, ch) app = PysimApp(None, None, sl, ch)
# If the user supplies an ADM PIN at via commandline args authenticate # If the user supplies an ADM PIN at via commandline args authenticate
# immediately so that the user does not have to use the shell commands # immediately so that the user does not have to use the shell commands

View File

@@ -84,7 +84,7 @@ def tcp_connected_callback(p: protocol.Protocol):
logger.error("%s: connected!" % p) logger.error("%s: connected!" % p)
class ProactChannel: class ProactChannel:
"""Representation of a single protective channel.""" """Representation of a single proective channel."""
def __init__(self, channels: 'ProactChannels', chan_nr: int): def __init__(self, channels: 'ProactChannels', chan_nr: int):
self.channels = channels self.channels = channels
self.chan_nr = chan_nr self.chan_nr = chan_nr

View File

@@ -151,7 +151,7 @@ global_group.add_argument('--no-suppress-select', action='store_false', dest='su
global_group.add_argument('--no-suppress-status', action='store_false', dest='suppress_status', global_group.add_argument('--no-suppress-status', action='store_false', dest='suppress_status',
help=""" help="""
Don't suppress displaying STATUS APDUs. We normally suppress them as they don't provide any Don't suppress displaying STATUS APDUs. We normally suppress them as they don't provide any
information that was not already received in response to the most recent SEELCT.""") information that was not already received in resposne to the most recent SEELCT.""")
global_group.add_argument('--show-raw-apdu', action='store_true', dest='show_raw_apdu', global_group.add_argument('--show-raw-apdu', action='store_true', dest='show_raw_apdu',
help="""Show the raw APDU in addition to its parsed form.""") help="""Show the raw APDU in addition to its parsed form.""")
@@ -188,7 +188,7 @@ parser_rspro_pyshark_live.add_argument('-i', '--interface', required=True,
parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help=""" parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
Read APDUs from a TCA Loader log file.""") Read APDUs from a TCA Loader log file.""")
parser_tcaloader_log.add_argument('-f', '--log-file', required=True, parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
help='Name of the log file to be read') help='Name of te log file to be read')
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -317,7 +317,7 @@ class ADF_ARAM(CardADF):
store_ref_ar_do_parse = argparse.ArgumentParser() store_ref_ar_do_parse = argparse.ArgumentParser()
# REF-DO # REF-DO
store_ref_ar_do_parse.add_argument( store_ref_ar_do_parse.add_argument(
'--device-app-id', required=True, help='Identifies the specific device application that the rule applies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)') '--device-app-id', required=True, help='Identifies the specific device application that the rule appplies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
aid_grp = store_ref_ar_do_parse.add_mutually_exclusive_group() aid_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
aid_grp.add_argument( aid_grp.add_argument(
'--aid', help='Identifies the specific SE application for which rules are to be stored. Can be a partial AID, containing for example only the RID. (5-16 or 0 hex bytes)') '--aid', help='Identifies the specific SE application for which rules are to be stored. Can be a partial AID, containing for example only the RID. (5-16 or 0 hex bytes)')
@@ -399,7 +399,7 @@ class ADF_ARAM(CardADF):
sw_aram = { sw_aram = {
'ARA-M': { 'ARA-M': {
'6381': 'Rule successfully stored but an access rule already exists', '6381': 'Rule successfully stored but an access rule already exists',
'6382': 'Rule successfully stored but contained at least one unknown (discarded) BER-TLV', '6382': 'Rule successfully stored bu contained at least one unknown (discarded) BER-TLV',
'6581': 'Memory Problem', '6581': 'Memory Problem',
'6700': 'Wrong Length in Lc', '6700': 'Wrong Length in Lc',
'6981': 'DO is not supported by the ARA-M/ARA-C', '6981': 'DO is not supported by the ARA-M/ARA-C',

View File

@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
operation with pySim-shell. operation with pySim-shell.
""" """
# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH # (C) 2021-2024 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# Author: Philipp Maier, Harald Welte # Author: Philipp Maier, Harald Welte
@@ -31,225 +31,128 @@ operation with pySim-shell.
from typing import List, Dict, Optional from typing import List, Dict, Optional
from Cryptodome.Cipher import AES from Cryptodome.Cipher import AES
from osmocom.utils import h2b, b2h from osmocom.utils import h2b, b2h
from pySim.log import PySimLogger
import abc import abc
import csv import csv
import logging
import yaml
import psycopg2
from psycopg2.sql import Identifier, SQL
log = PySimLogger.get("CARDKEY")
card_key_providers = [] # type: List['CardKeyProvider'] card_key_providers = [] # type: List['CardKeyProvider']
class CardKeyFieldCryptor: # well-known groups of columns relate to a given functionality. This avoids having
""" # to specify the same transport key N number of times, if the same key is used for multiple
A Card key field encryption class that may be used by Card key provider implementations to add support for # fields of one group, like KIC+KID+KID of one SD.
a column-based encryption to protect sensitive material (cryptographic key material, ADM keys, etc.). CRYPT_GROUPS = {
The sensitive material is encrypted using a "key-encryption key", occasionally also known as "transport key" 'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
before it is stored into a file or database (see also GSMA FS.28). The "transport key" is then used to decrypt 'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
the key material on demand. 'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
""" 'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
# well-known groups of columns relate to a given functionality. This avoids having
# to specify the same transport key N number of times, if the same key is used for multiple
# fields of one group, like KIC+KID+KID of one SD.
__CRYPT_GROUPS = {
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
} }
__IV = b'\x23' * 16
@staticmethod
def __dict_keys_to_upper(d: dict) -> dict:
return {k.upper():v for k,v in d.items()}
@staticmethod
def __process_transport_keys(transport_keys: dict, crypt_groups: dict):
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
new_dict = {}
for name, key in transport_keys.items():
if name in crypt_groups:
for field in crypt_groups[name]:
new_dict[field] = key
else:
new_dict[name] = key
return new_dict
def __init__(self, transport_keys: dict):
"""
Create new field encryptor/decryptor object and set transport keys, usually one for each column. In some cases
it is also possible to use a single key for multiple columns (see also __CRYPT_GROUPS)
Args:
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
respective field (column) of the CSV. This is done so that different fields
(columns) can use different transport keys, which is strongly recommended by
GSMA FS.28
"""
self.transport_keys = self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys),
self.__CRYPT_GROUPS)
for name, key in self.transport_keys.items():
log.debug("Encrypting/decrypting field %s using AES key %s" % (name, key))
def decrypt_field(self, field_name: str, encrypted_val: str) -> str:
"""
Decrypt a single field. The decryption is only applied if we have a transport key is known under the provided
field name, otherwise the field is treated as plaintext and passed through as it is.
Args:
field_name : name of the field to decrypt (used to identify which key to use)
encrypted_val : encrypted field value
Returns:
plaintext field value
"""
if not field_name.upper() in self.transport_keys:
return encrypted_val
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
return b2h(cipher.decrypt(h2b(encrypted_val)))
def encrypt_field(self, field_name: str, plaintext_val: str) -> str:
"""
Encrypt a single field. The encryption is only applied if we have a transport key is known under the provided
field name, otherwise the field is treated as non sensitive and passed through as it is.
Args:
field_name : name of the field to decrypt (used to identify which key to use)
encrypted_val : encrypted field value
Returns:
plaintext field value
"""
if not field_name.upper() in self.transport_keys:
return plaintext_val
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
return b2h(cipher.encrypt(h2b(plaintext_val)))
class CardKeyProvider(abc.ABC): class CardKeyProvider(abc.ABC):
"""Base class, not containing any concrete implementation.""" """Base class, not containing any concrete implementation."""
@abc.abstractmethod VALID_KEY_FIELD_NAMES = ['ICCID', 'EID', 'IMSI' ]
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
""" # check input parameters, but do nothing concrete yet
Get multiple card-individual fields for identified card. This method should not fail with an exception in def _verify_get_data(self, fields: List[str] = [], key: str = 'ICCID', value: str = "") -> Dict[str, str]:
case the entry, columns or even the key column itsself is not found. """Verify multiple fields for identified card.
Args: Args:
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
key : look-up key to identify card data, such as 'ICCID' key : look-up key to identify card data, such as 'ICCID'
value : value for look-up key to identify card data value : value for look-up key to identify card data
Returns: Returns:
dictionary of {field : value, ...} strings for each requested field from 'fields'. In case nothing is dictionary of {field, value} strings for each requested field from 'fields'
fond None shall be returned. """
if key not in self.VALID_KEY_FIELD_NAMES:
raise ValueError("Key field name '%s' is not a valid field name, valid field names are: %s" %
(key, str(self.VALID_KEY_FIELD_NAMES)))
return {}
def get_field(self, field: str, key: str = 'ICCID', value: str = "") -> Optional[str]:
"""get a single field from CSV file using a specified key/value pair"""
fields = [field]
result = self.get(fields, key, value)
return result.get(field)
@abc.abstractmethod
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
"""Get multiple card-individual fields for identified card.
Args:
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
key : look-up key to identify card data, such as 'ICCID'
value : value for look-up key to identify card data
Returns:
dictionary of {field, value} strings for each requested field from 'fields'
""" """
def __str__(self):
return type(self).__name__
class CardKeyProviderCsv(CardKeyProvider): class CardKeyProviderCsv(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified CSV file.""" """Card key provider implementation that allows to query against a specified CSV file.
Supports column-based encryption as it is generally a bad idea to store cryptographic key material in
plaintext. Instead, the key material should be encrypted by a "key-encryption key", occasionally also
known as "transport key" (see GSMA FS.28)."""
IV = b'\x23' * 16
csv_file = None
filename = None
def __init__(self, csv_filename: str, transport_keys: dict): def __init__(self, filename: str, transport_keys: dict):
""" """
Args: Args:
csv_filename : file name (path) of CSV file containing card-individual key/data filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor) transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
respective field (column) of the CSV. This is done so that different fields
(columns) can use different transport keys, which is strongly recommended by
GSMA FS.28
""" """
log.info("Using CSV file as card key data source: %s" % csv_filename) self.csv_file = open(filename, 'r')
self.csv_file = open(csv_filename, 'r')
if not self.csv_file: if not self.csv_file:
raise RuntimeError("Could not open CSV file '%s'" % csv_filename) raise RuntimeError("Could not open CSV file '%s'" % filename)
self.csv_filename = csv_filename self.filename = filename
self.crypt = CardKeyFieldCryptor(transport_keys) self.transport_keys = self.process_transport_keys(transport_keys)
@staticmethod
def process_transport_keys(transport_keys: dict):
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
new_dict = {}
for name, key in transport_keys.items():
if name in CRYPT_GROUPS:
for field in CRYPT_GROUPS[name]:
new_dict[field] = key
else:
new_dict[name] = key
return new_dict
def _decrypt_field(self, field_name: str, encrypted_val: str) -> str:
"""decrypt a single field, if we have a transport key for the field of that name."""
if not field_name in self.transport_keys:
return encrypted_val
cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, self.IV)
return b2h(cipher.decrypt(h2b(encrypted_val)))
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]: def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
super()._verify_get_data(fields, key, value)
self.csv_file.seek(0) self.csv_file.seek(0)
cr = csv.DictReader(self.csv_file) cr = csv.DictReader(self.csv_file)
if not cr: if not cr:
raise RuntimeError("Could not open DictReader for CSV-File '%s'" % self.csv_filename) raise RuntimeError(
"Could not open DictReader for CSV-File '%s'" % self.filename)
cr.fieldnames = [field.upper() for field in cr.fieldnames] cr.fieldnames = [field.upper() for field in cr.fieldnames]
if key not in cr.fieldnames: rc = {}
return None
return_dict = {}
for row in cr: for row in cr:
if row[key] == value: if row[key] == value:
for f in fields: for f in fields:
if f in row: if f in row:
return_dict.update({f: self.crypt.decrypt_field(f, row[f])}) rc.update({f: self._decrypt_field(f, row[f])})
else: else:
raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.csv_filename, f)) raise RuntimeError("CSV-File '%s' lacks column '%s'" %
if return_dict == {}: (self.filename, f))
return None return rc
return return_dict
class CardKeyProviderPgsql(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified PostgreSQL database table."""
def __init__(self, config_filename: str, transport_keys: dict):
"""
Args:
config_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor)
"""
log.info("Using SQL database as card key data source: %s" % config_filename)
with open(config_filename, "r") as cfg:
config = yaml.load(cfg, Loader=yaml.FullLoader)
log.info("Card key database name: %s" % config.get('db_name'))
db_users = config.get('db_users')
user = db_users.get('reader')
if user is None:
raise ValueError("user for role 'reader' not set up in config file.")
self.conn = psycopg2.connect(dbname=config.get('db_name'),
user=user.get('name'),
password=user.get('pass'),
host=config.get('host'))
self.tables = config.get('table_names')
log.info("Card key database tables: %s" % str(self.tables))
self.crypt = CardKeyFieldCryptor(transport_keys)
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
db_result = None
for t in self.tables:
self.conn.rollback()
cur = self.conn.cursor()
# Make sure that the database table and the key column actually exists. If not, move on to the next table
cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (t,))
cols_result = cur.fetchall()
if cols_result == []:
log.warning("Card Key database seems to lack table %s, check config file!" % t)
continue
if (key.lower(),) not in cols_result:
continue
# Query requested columns from database table
query = SQL("SELECT {}").format(Identifier(fields[0].lower()))
for f in fields[1:]:
query += SQL(", {}").format(Identifier(f.lower()))
query += SQL(" FROM {} WHERE {} = %s LIMIT 1;").format(Identifier(t.lower()),
Identifier(key.lower()))
cur.execute(query, (value,))
db_result = cur.fetchone()
cur.close()
if db_result:
break
if db_result is None:
return None
result = dict(zip(fields, db_result))
for k in result.keys():
result[k] = self.crypt.decrypt_field(k, result.get(k))
return result
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers): def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
@@ -260,11 +163,11 @@ def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key
provider_list : override the list of providers from the global default provider_list : override the list of providers from the global default
""" """
if not isinstance(provider, CardKeyProvider): if not isinstance(provider, CardKeyProvider):
raise ValueError("provider is not a card data provider") raise ValueError("provider is not a card data provier")
provider_list.append(provider) provider_list.append(provider)
def card_key_provider_get(fields: list[str], key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]: def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
"""Query all registered card data providers for card-individual [key] data. """Query all registered card data providers for card-individual [key] data.
Args: Args:
@@ -275,21 +178,17 @@ def card_key_provider_get(fields: list[str], key: str, value: str, provider_list
Returns: Returns:
dictionary of {field, value} strings for each requested field from 'fields' dictionary of {field, value} strings for each requested field from 'fields'
""" """
key = key.upper()
fields = [f.upper() for f in fields]
for p in provider_list: for p in provider_list:
if not isinstance(p, CardKeyProvider): if not isinstance(p, CardKeyProvider):
raise ValueError("Provider list contains element which is not a card data provider") raise ValueError(
log.debug("Searching for card key data (key=%s, value=%s, provider=%s)" % (key, value, str(p))) "provider list contains element which is not a card data provier")
result = p.get(fields, key, value) result = p.get(fields, key, value)
if result: if result:
log.debug("Found card data: %s" % (str(result)))
return result return result
return {}
raise ValueError("Unable to find card key data (key=%s, value=%s, fields=%s)" % (key, value, str(fields)))
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> str: def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> Optional[str]:
"""Query all registered card data providers for a single field. """Query all registered card data providers for a single field.
Args: Args:
@@ -300,7 +199,11 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
Returns: Returns:
dictionary of {field, value} strings for the requested field dictionary of {field, value} strings for the requested field
""" """
for p in provider_list:
fields = [field] if not isinstance(p, CardKeyProvider):
result = card_key_provider_get(fields, key, value, card_key_providers) raise ValueError(
return result.get(field.upper()) "provider list contains element which is not a card data provier")
result = p.get_field(field, key, value)
if result:
return result
return None

View File

@@ -112,8 +112,6 @@ class UiccCardBase(SimCardBase):
def probe(self) -> bool: def probe(self) -> bool:
# EF.DIR is a mandatory EF on all ICCIDs; however it *may* also exist on a TS 51.011 SIM # EF.DIR is a mandatory EF on all ICCIDs; however it *may* also exist on a TS 51.011 SIM
ef_dir = EF_DIR() ef_dir = EF_DIR()
# select MF first
self.file_exists("3f00")
return self.file_exists(ef_dir.fid) return self.file_exists(ef_dir.fid)
def read_aids(self) -> List[Hexstr]: def read_aids(self) -> List[Hexstr]:

View File

@@ -316,19 +316,19 @@ class FileList(COMPR_TLV_IE, tag=0x92):
_construct = Struct('number_of_files'/Int8ub, _construct = Struct('number_of_files'/Int8ub,
'files'/GreedyRange(FileId)) 'files'/GreedyRange(FileId))
# TS 102 223 Section 8.19 # TS 102 223 Secton 8.19
class LocationInformation(COMPR_TLV_IE, tag=0x93): class LocationInformation(COMPR_TLV_IE, tag=0x93):
pass pass
# TS 102 223 Section 8.20 # TS 102 223 Secton 8.20
class IMEI(COMPR_TLV_IE, tag=0x94): class IMEI(COMPR_TLV_IE, tag=0x94):
_construct = BcdAdapter(GreedyBytes) _construct = BcdAdapter(GreedyBytes)
# TS 102 223 Section 8.21 # TS 102 223 Secton 8.21
class HelpRequest(COMPR_TLV_IE, tag=0x95): class HelpRequest(COMPR_TLV_IE, tag=0x95):
pass pass
# TS 102 223 Section 8.22 # TS 102 223 Secton 8.22
class NetworkMeasurementResults(COMPR_TLV_IE, tag=0x96): class NetworkMeasurementResults(COMPR_TLV_IE, tag=0x96):
_construct = BcdAdapter(GreedyBytes) _construct = BcdAdapter(GreedyBytes)

View File

@@ -141,7 +141,7 @@ class SimCardCommands:
Returns: Returns:
Tuple of (decoded_data, sw) Tuple of (decoded_data, sw)
""" """
cmd = cmd_constr.build(cmd_data) if cmd_data else b'' cmd = cmd_constr.build(cmd_data) if cmd_data else ''
lc = i2h([len(cmd)]) if cmd_data else '' lc = i2h([len(cmd)]) if cmd_data else ''
le = '00' if resp_constr else '' le = '00' if resp_constr else ''
pdu = ''.join([cla, ins, p1, p2, lc, b2h(cmd), le]) pdu = ''.join([cla, ins, p1, p2, lc, b2h(cmd), le])
@@ -285,7 +285,7 @@ class SimCardCommands:
return self.send_apdu_checksw(self.cla_byte + "a40304") return self.send_apdu_checksw(self.cla_byte + "a40304")
def select_adf(self, aid: Hexstr) -> ResTuple: def select_adf(self, aid: Hexstr) -> ResTuple:
"""Execute SELECT a given Application ADF. """Execute SELECT a given Applicaiton ADF.
Args: Args:
aid : application identifier as hex string aid : application identifier as hex string
@@ -577,7 +577,7 @@ class SimCardCommands:
Args: Args:
rand : 16 byte random data as hex string (RAND) rand : 16 byte random data as hex string (RAND)
autn : 8 byte Authentication Token (AUTN) autn : 8 byte Autentication Token (AUTN)
context : 16 byte random data ('3g' or 'gsm') context : 16 byte random data ('3g' or 'gsm')
""" """
# 3GPP TS 31.102 Section 7.1.2.1 # 3GPP TS 31.102 Section 7.1.2.1

View File

@@ -282,7 +282,7 @@ class BspInstance:
def mac_only_one(self, tag: int, plaintext: bytes) -> bytes: def mac_only_one(self, tag: int, plaintext: bytes) -> bytes:
"""MAC a single plaintext TLV. Returns the protected ciphertext.""" """MAC a single plaintext TLV. Returns the protected ciphertext."""
assert tag <= 255 assert tag <= 255
assert len(plaintext) <= self.max_payload_size assert len(plaintext) < self.max_payload_size
maced = self.m_algo.auth(tag, plaintext) maced = self.m_algo.auth(tag, plaintext)
# The data block counter for ICV calculation is incremented also for each segment with C-MAC only. # The data block counter for ICV calculation is incremented also for each segment with C-MAC only.
self.c_algo.block_nr += 1 self.c_algo.block_nr += 1

View File

@@ -116,7 +116,7 @@ class param:
pass pass
class Es2PlusApiFunction(JsonHttpApiFunction): class Es2PlusApiFunction(JsonHttpApiFunction):
"""Base class for representing an ES2+ API Function.""" """Base classs for representing an ES2+ API Function."""
pass pass
# ES2+ DownloadOrder function (SGP.22 section 5.3.1) # ES2+ DownloadOrder function (SGP.22 section 5.3.1)

View File

@@ -76,11 +76,10 @@ def gen_replace_session_keys(ppk_enc: bytes, ppk_cmac: bytes, initial_mcv: bytes
class ProfileMetadata: class ProfileMetadata:
"""Representation of Profile metadata. Right now only the mandatory bits are """Representation of Profile metadata. Right now only the mandatory bits are
supported, but in general this should follow the StoreMetadataRequest of SGP.22 5.5.3""" supported, but in general this should follow the StoreMetadataRequest of SGP.22 5.5.3"""
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str, profile_class = 'operational'): def __init__(self, iccid_bin: bytes, spn: str, profile_name: str):
self.iccid_bin = iccid_bin self.iccid_bin = iccid_bin
self.spn = spn self.spn = spn
self.profile_name = profile_name self.profile_name = profile_name
self.profile_class = profile_class
self.icon = None self.icon = None
self.icon_type = None self.icon_type = None
self.notifications = [] self.notifications = []
@@ -106,14 +105,6 @@ class ProfileMetadata:
'serviceProviderName': self.spn, 'serviceProviderName': self.spn,
'profileName': self.profile_name, 'profileName': self.profile_name,
} }
if self.profile_class == 'test':
smr['profileClass'] = 0
elif self.profile_class == 'provisioning':
smr['profileClass'] = 1
elif self.profile_class == 'operational':
smr['profileClass'] = 2
else:
raise ValueError('Unsupported Profile Class %s' % self.profile_class)
if self.icon: if self.icon:
smr['icon'] = self.icon smr['icon'] = self.icon
smr['iconType'] = self.icon_type smr['iconType'] = self.icon_type
@@ -208,12 +199,12 @@ class BoundProfilePackage(ProfilePackage):
# 'initialiseSecureChannelRequest' # 'initialiseSecureChannelRequest'
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr) bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
# firstSequenceOf87 # firstSequenceOf87
logger.debug("BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys") logger.debug(f"BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}") logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}") logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin)) bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
# sequenceOF88 # sequenceOF88
logger.debug("BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys") logger.debug(f"BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin)) bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
if self.ppp: # we have to use session keys if self.ppp: # we have to use session keys

View File

@@ -1,4 +1,4 @@
"""GSMA eSIM RSP ES9+ interface according to SGP.22 v2.5""" """GSMA eSIM RSP ES9+ interface according ot SGP.22 v2.5"""
# (C) 2024 by Harald Welte <laforge@osmocom.org> # (C) 2024 by Harald Welte <laforge@osmocom.org>
# #

View File

@@ -159,7 +159,7 @@ class ApiError(Exception):
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")' return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
class JsonHttpApiFunction(abc.ABC): class JsonHttpApiFunction(abc.ABC):
"""Base class for representing an HTTP[s] API Function.""" """Base classs for representing an HTTP[s] API Function."""
# the below class variables are expected to be overridden in derived classes # the below class variables are expected to be overridden in derived classes
path = None path = None

View File

@@ -90,7 +90,7 @@ class RspSessionState:
# FIXME: how to add the public key from smdp_otpk to an instance of EllipticCurvePrivateKey? # FIXME: how to add the public key from smdp_otpk to an instance of EllipticCurvePrivateKey?
del state['_smdp_otsk'] del state['_smdp_otsk']
del state['_smdp_ot_curve'] del state['_smdp_ot_curve']
# automatically recover all the remaining state # automatically recover all the remainig state
self.__dict__.update(state) self.__dict__.update(state)

View File

@@ -183,7 +183,7 @@ class File:
self.file_type = template.file_type self.file_type = template.file_type
self.fid = template.fid self.fid = template.fid
self.sfi = template.sfi self.sfi = template.sfi
self.arr = template.arr.to_bytes(1, 'big') self.arr = template.arr.to_bytes(1)
if hasattr(template, 'rec_len'): if hasattr(template, 'rec_len'):
self.rec_len = template.rec_len self.rec_len = template.rec_len
else: else:
@@ -227,7 +227,7 @@ class File:
fileDescriptor['shortEFID'] = bytes([self.sfi]) fileDescriptor['shortEFID'] = bytes([self.sfi])
if self.df_name: if self.df_name:
fileDescriptor['dfName'] = self.df_name fileDescriptor['dfName'] = self.df_name
if self.arr and self.arr != self.template.arr.to_bytes(1, 'big'): if self.arr and self.arr != self.template.arr.to_bytes(1):
fileDescriptor['securityAttributesReferenced'] = self.arr fileDescriptor['securityAttributesReferenced'] = self.arr
if self.file_type in ['LF', 'CY']: if self.file_type in ['LF', 'CY']:
fdb_dec['file_type'] = 'working_ef' fdb_dec['file_type'] = 'working_ef'
@@ -264,7 +264,7 @@ class File:
if self.read_and_update_when_deact: if self.read_and_update_when_deact:
spfi |= 0x40 # TS 102 222 Table 5 spfi |= 0x40 # TS 102 222 Table 5
if spfi != 0x00: if spfi != 0x00:
pefi['specialFileInformation'] = spfi.to_bytes(1, 'big') pefi['specialFileInformation'] = spfi.to_bytes(1)
if self.fill_pattern: if self.fill_pattern:
if not self.fill_pattern_repeat: if not self.fill_pattern_repeat:
pefi['fillPattern'] = self.fill_pattern pefi['fillPattern'] = self.fill_pattern
@@ -334,7 +334,7 @@ class File:
self.fill_pattern = pefi['fillPattern'] self.fill_pattern = pefi['fillPattern']
self.fill_pattern_repeat = False self.fill_pattern_repeat = False
elif fdb_dec['file_type'] == 'df': elif fdb_dec['file_type'] == 'df':
# only set it, if an earlier call to from_template() didn't already set it, as # only set it, if an earlier call to from_template() didn't alrady set it, as
# the template can differentiate between MF, DF and ADF (unlike FDB) # the template can differentiate between MF, DF and ADF (unlike FDB)
if not self.file_type: if not self.file_type:
self.file_type = 'DF' self.file_type = 'DF'
@@ -427,7 +427,7 @@ class File:
class ProfileElement: class ProfileElement:
"""Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly, """Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly,
but it's more likely sub-classed with a specific class for the specific profile element type, like e.g but ist more likely sub-classed with a specific class for the specific profile element type, like e.g
ProfileElementHeader, ProfileElementMF, ... ProfileElementHeader, ProfileElementMF, ...
""" """
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access', FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
@@ -440,7 +440,7 @@ class ProfileElement:
'genericFileManagement': 'gfm-header', 'genericFileManagement': 'gfm-header',
'akaParameter': 'aka-header', 'akaParameter': 'aka-header',
'cdmaParameter': 'cdma-header', 'cdmaParameter': 'cdma-header',
# note how they couldn't even consistently capitalize the 'header' suffix :( # note how they couldn't even consistently captialize the 'header' suffix :(
'application': 'app-Header', 'application': 'app-Header',
'pukCodes': 'puk-Header', 'pukCodes': 'puk-Header',
'pinCodes': 'pin-Header', 'pinCodes': 'pin-Header',
@@ -628,7 +628,7 @@ class FsProfileElement(ProfileElement):
# this is a template that belongs into the [A]DF of another template # this is a template that belongs into the [A]DF of another template
# 1) find the PE for the referenced template # 1) find the PE for the referenced template
parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid) parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid)
# 2) resolve the [A]DF that forms the base of that parent PE # 2) resolve te [A]DF that forms the base of that parent PE
pe_df = parent_pe.files[template.parent.base_df().pe_name].node pe_df = parent_pe.files[template.parent.base_df().pe_name].node
self.pe_sequence.cur_df = pe_df self.pe_sequence.cur_df = pe_df
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file) self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
@@ -649,7 +649,7 @@ class FsProfileElement(ProfileElement):
self.add_file(file) self.add_file(file)
def create_file(self, pename: str) -> File: def create_file(self, pename: str) -> File:
"""Programmatically create a file by its PE-Name.""" """Programatically create a file by its PE-Name."""
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID) template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
file = File(pename, None, template.files_by_pename.get(pename, None)) file = File(pename, None, template.files_by_pename.get(pename, None))
self.add_file(file) self.add_file(file)
@@ -985,9 +985,9 @@ class SecurityDomainKey:
self.key_components = key_components self.key_components = key_components
def __repr__(self) -> str: def __repr__(self) -> str:
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=0x%x, Comp=%s)' % (self.key_version_number, return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=%s, Comp=%s)' % (self.key_version_number,
self.key_identifier, self.key_identifier,
build_construct(KeyUsageQualifier, self.key_usage_qualifier)[0], self.key_usage_qualifier,
repr(self.key_components)) repr(self.key_components))
@classmethod @classmethod
@@ -1409,7 +1409,7 @@ class ProfileElementHeader(ProfileElement):
iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None, iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None,
**kwargs): **kwargs):
"""You would usually initialize an instance either with a "decoded" argument (as read from """You would usually initialize an instance either with a "decoded" argument (as read from
a DER-encoded SAIP file via asn1tools), or [some of] the other arguments in case you're a DER-encoded SAIP file via asn1tools), or [some of] the othe arguments in case you're
constructing a Profile Header from scratch. constructing a Profile Header from scratch.
Args: Args:
@@ -1562,7 +1562,7 @@ class ProfileElementSequence:
def _rebuild_pes_by_naa(self) -> None: def _rebuild_pes_by_naa(self) -> None:
"""rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form, """rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form,
which basically means for every NAA there's a list of instances, and each consists which basically means for every NAA there's a lsit of instances, and each consists
of a list of a list of PEs.""" of a list of a list of PEs."""
self.pres_by_naa = {} self.pres_by_naa = {}
petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end'] petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end']
@@ -1690,7 +1690,7 @@ class ProfileElementSequence:
i += 1 i += 1
def get_index_by_pe(self, pe: ProfileElement) -> int: def get_index_by_pe(self, pe: ProfileElement) -> int:
"""Return a list with the indices of all instances of PEs of petype.""" """Return a list with the indicies of all instances of PEs of petype."""
ret = [] ret = []
i = 0 i = 0
for cur in self.pe_list: for cur in self.pe_list:
@@ -1711,7 +1711,7 @@ class ProfileElementSequence:
self.insert_at_index(idx+1, pe_new) self.insert_at_index(idx+1, pe_new)
def get_index_by_type(self, petype: str) -> List[int]: def get_index_by_type(self, petype: str) -> List[int]:
"""Return a list with the indices of all instances of PEs of petype.""" """Return a list with the indicies of all instances of PEs of petype."""
ret = [] ret = []
i = 0 i = 0
for pe in self.pe_list: for pe in self.pe_list:
@@ -1736,7 +1736,7 @@ class ProfileElementSequence:
for service in naa.mandatory_services: for service in naa.mandatory_services:
if service in hdr.decoded['eUICC-Mandatory-services']: if service in hdr.decoded['eUICC-Mandatory-services']:
del hdr.decoded['eUICC-Mandatory-services'][service] del hdr.decoded['eUICC-Mandatory-services'][service]
# remove any associated mandatory filesystem templates # remove any associaed mandatory filesystem templates
for template in naa.templates: for template in naa.templates:
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']: if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)] hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
@@ -2040,8 +2040,7 @@ class FsNodeADF(FsNodeDF):
super().__init__(fid, parent, file, name) super().__init__(fid, parent, file, name)
def __str__(self): def __str__(self):
# self.df_name is usually None for an ADF like ADF.USIM or ADF.ISIM so we need to guard against it return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name))
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name) if self.df_name else None)
class FsNodeMF(FsNodeDF): class FsNodeMF(FsNodeDF):
"""The MF (Master File) in the filesystem hierarchy.""" """The MF (Master File) in the filesystem hierarchy."""

View File

@@ -68,7 +68,7 @@ class CheckBasicStructure(ProfileConstraintChecker):
def check_optional_ordering(self, pes: ProfileElementSequence): def check_optional_ordering(self, pes: ProfileElementSequence):
"""Check the ordering of optional PEs following the respective mandatory ones.""" """Check the ordering of optional PEs following the respective mandatory ones."""
# ordering and required dependencies # ordering and required depenencies
self._is_after_if_exists(pes,'opt-usim', 'usim') self._is_after_if_exists(pes,'opt-usim', 'usim')
self._is_after_if_exists(pes,'opt-isim', 'isim') self._is_after_if_exists(pes,'opt-isim', 'isim')
self._is_after_if_exists(pes,'gsm-access', 'usim') self._is_after_if_exists(pes,'gsm-access', 'usim')

View File

@@ -25,6 +25,7 @@ from cryptography.hazmat.primitives.serialization import load_pem_private_key, E
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
from pySim.utils import b2h from pySim.utils import b2h
from . import x509_err
def check_signed(signed: x509.Certificate, signer: x509.Certificate) -> bool: def check_signed(signed: x509.Certificate, signer: x509.Certificate) -> bool:
"""Verify if 'signed' certificate was signed using 'signer'.""" """Verify if 'signed' certificate was signed using 'signer'."""
@@ -64,9 +65,6 @@ class oid:
id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6') id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7') id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
class VerifyError(Exception):
"""An error during certificate verification,"""
class CertificateSet: class CertificateSet:
"""A set of certificates consisting of a trusted [self-signed] CA root certificate, """A set of certificates consisting of a trusted [self-signed] CA root certificate,
and an optional number of intermediate certificates. Can be used to verify the certificate chain and an optional number of intermediate certificates. Can be used to verify the certificate chain
@@ -135,7 +133,7 @@ class CertificateSet:
# we cannot check if there's no CRL # we cannot check if there's no CRL
return return
if self.crl.get_revoked_certificate_by_serial_number(cert.serial_nr): if self.crl.get_revoked_certificate_by_serial_number(cert.serial_nr):
raise VerifyError('Certificate is present in CRL, verification failed') raise x509_err.CertificateRevoked()
def verify_cert_chain(self, cert: x509.Certificate, max_depth: int = 100): def verify_cert_chain(self, cert: x509.Certificate, max_depth: int = 100):
"""Verify if a given certificate's signature chain can be traced back to the root CA of this """Verify if a given certificate's signature chain can be traced back to the root CA of this
@@ -150,13 +148,13 @@ class CertificateSet:
return return
parent_cert = self.intermediate_certs.get(aki, None) parent_cert = self.intermediate_certs.get(aki, None)
if not parent_cert: if not parent_cert:
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki)) raise x509_err.MissingIntermediateCert(b2h(aki))
check_signed(c, parent_cert) check_signed(c, parent_cert)
# if we reach here, we passed (no exception raised) # if we reach here, we passed (no exception raised)
c = parent_cert c = parent_cert
depth += 1 depth += 1
if depth > max_depth: if depth > max_depth:
raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth) raise x509_err.MaxDepthExceeded(max_depth, depth)
def ecdsa_dss_to_tr03111(sig: bytes) -> bytes: def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:

58
pySim/esim/x509_err.py Normal file
View File

@@ -0,0 +1,58 @@
"""X.509 certificate verification exceptions for GSMA eSIM."""
class VerifyError(Exception):
"""Base class for certificate verification errors."""
pass
class MissingIntermediateCert(VerifyError):
"""Raised when an intermediate certificate in the chain cannot be found."""
def __init__(self, auth_key_id: str):
self.auth_key_id = auth_key_id
super().__init__(f'Could not find intermediate certificate for AuthKeyId {auth_key_id}')
class CertificateRevoked(VerifyError):
"""Raised when a certificate is found in the CRL."""
def __init__(self, cert_serial: str = None):
self.cert_serial = cert_serial
msg = 'Certificate is present in CRL, verification failed'
if cert_serial:
msg += f' (serial: {cert_serial})'
super().__init__(msg)
class MaxDepthExceeded(VerifyError):
"""Raised when certificate chain depth exceeds the maximum allowed."""
def __init__(self, max_depth: int, actual_depth: int):
self.max_depth = max_depth
self.actual_depth = actual_depth
super().__init__(f'Maximum depth {max_depth} exceeded while verifying certificate chain (actual: {actual_depth})')
class SignatureVerification(VerifyError):
"""Raised when certificate signature verification fails."""
def __init__(self, cert_subject: str = None, signer_subject: str = None):
self.cert_subject = cert_subject
self.signer_subject = signer_subject
msg = 'Certificate signature verification failed'
if cert_subject and signer_subject:
msg += f': {cert_subject} not signed by {signer_subject}'
super().__init__(msg)
class InvalidCertificate(VerifyError):
"""Raised when a certificate is invalid (missing required fields, wrong type, etc)."""
def __init__(self, reason: str):
self.reason = reason
super().__init__(f'Invalid certificate: {reason}')
class CertificateExpired(VerifyError):
"""Raised when a certificate has expired."""
def __init__(self, cert_subject: str = None):
self.cert_subject = cert_subject
msg = 'Certificate has expired'
if cert_subject:
msg += f': {cert_subject}'
super().__init__(msg)

View File

@@ -39,7 +39,7 @@ from osmocom.utils import h2b, b2h, is_hex, auto_int, auto_uint8, auto_uint16, i
from osmocom.tlv import bertlv_parse_one from osmocom.tlv import bertlv_parse_one
from osmocom.construct import filter_dict, parse_construct, build_construct from osmocom.construct import filter_dict, parse_construct, build_construct
from pySim.utils import sw_match, decomposeATR from pySim.utils import sw_match
from pySim.jsonpath import js_path_modify from pySim.jsonpath import js_path_modify
from pySim.commands import SimCardCommands from pySim.commands import SimCardCommands
from pySim.exceptions import SwMatchError from pySim.exceptions import SwMatchError
@@ -86,7 +86,7 @@ class CardFile:
self.service = service self.service = service
self.shell_commands = [] # type: List[CommandSet] self.shell_commands = [] # type: List[CommandSet]
# Note: the basic properties (fid, name, etc.) are verified when # Note: the basic properties (fid, name, ect.) are verified when
# the file is attached to a parent file. See method add_file() in # the file is attached to a parent file. See method add_file() in
# class Card DF # class Card DF
@@ -266,7 +266,7 @@ class CardFile:
def get_profile(self): def get_profile(self):
"""Get the profile associated with this file. If this file does not have any """Get the profile associated with this file. If this file does not have any
profile assigned, try to find a file above (usually the MF) in the filesystem profile assigned, try to find a file above (usually the MF) in the filesystem
hierarchy that has a profile assigned hirarchy that has a profile assigned
""" """
# If we have a profile set, return it # If we have a profile set, return it
@@ -679,7 +679,7 @@ class TransparentEF(CardEF):
Args: Args:
fid : File Identifier (4 hex digits) fid : File Identifier (4 hex digits)
sfid : Short File Identifier (2 hex digits, optional) sfid : Short File Identifier (2 hex digits, optional)
name : Brief name of the file, like EF_ICCID name : Brief name of the file, lik EF_ICCID
desc : Description of the file desc : Description of the file
parent : Parent CardFile object within filesystem hierarchy parent : Parent CardFile object within filesystem hierarchy
size : tuple of (minimum_size, recommended_size) size : tuple of (minimum_size, recommended_size)
@@ -982,11 +982,11 @@ class LinFixedEF(CardEF):
Args: Args:
fid : File Identifier (4 hex digits) fid : File Identifier (4 hex digits)
sfid : Short File Identifier (2 hex digits, optional) sfid : Short File Identifier (2 hex digits, optional)
name : Brief name of the file, like EF_ICCID name : Brief name of the file, lik EF_ICCID
desc : Description of the file desc : Description of the file
parent : Parent CardFile object within filesystem hierarchy parent : Parent CardFile object within filesystem hierarchy
rec_len : Tuple of (minimum_length, recommended_length) rec_len : Tuple of (minimum_length, recommended_length)
leftpad: On write, data must be padded from the left to fit physical record length leftpad: On write, data must be padded from the left to fit pysical record length
""" """
super().__init__(fid=fid, sfid=sfid, name=name, desc=desc, parent=parent, **kwargs) super().__init__(fid=fid, sfid=sfid, name=name, desc=desc, parent=parent, **kwargs)
self.rec_len = rec_len self.rec_len = rec_len
@@ -1422,7 +1422,7 @@ class BerTlvEF(CardEF):
Args: Args:
fid : File Identifier (4 hex digits) fid : File Identifier (4 hex digits)
sfid : Short File Identifier (2 hex digits, optional) sfid : Short File Identifier (2 hex digits, optional)
name : Brief name of the file, like EF_ICCID name : Brief name of the file, lik EF_ICCID
desc : Description of the file desc : Description of the file
parent : Parent CardFile object within filesystem hierarchy parent : Parent CardFile object within filesystem hierarchy
size : tuple of (minimum_size, recommended_size) size : tuple of (minimum_size, recommended_size)
@@ -1455,7 +1455,7 @@ class BerTlvEF(CardEF):
export_str += "delete_all\n" export_str += "delete_all\n"
for t in tags: for t in tags:
result = lchan.retrieve_data(t) result = lchan.retrieve_data(t)
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0])) (tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
export_str += ("set_data 0x%02x %s\n" % (t, b2h(val))) export_str += ("set_data 0x%02x %s\n" % (t, b2h(val)))
return export_str.strip() return export_str.strip()
@@ -1495,7 +1495,7 @@ class CardApplication:
self.name = name self.name = name
self.adf = adf self.adf = adf
self.sw = sw or {} self.sw = sw or {}
# back-reference from ADF to Application # back-reference from ADF to Applicaiton
if self.adf: if self.adf:
self.aid = aid or self.adf.aid self.aid = aid or self.adf.aid
self.adf.application = self self.adf.application = self
@@ -1545,13 +1545,6 @@ class CardModel(abc.ABC):
if atr == card_atr: if atr == card_atr:
print("Detected CardModel:", cls.__name__) print("Detected CardModel:", cls.__name__)
return True return True
# if nothing found try to just compare the Historical Bytes of the ATR
card_atr_hb = decomposeATR(card_atr)['hb']
for atr in cls._atrs:
atr_hb = decomposeATR(atr)['hb']
if atr_hb == card_atr_hb:
print("Detected CardModel:", cls.__name__)
return True
return False return False
@staticmethod @staticmethod
@@ -1572,7 +1565,7 @@ class Path:
p = p.split('/') p = p.split('/')
elif len(p) and isinstance(p[0], int): elif len(p) and isinstance(p[0], int):
p = ['%04x' % x for x in p] p = ['%04x' % x for x in p]
# make sure internal representation always is uppercase only # make sure internal representation alwas is uppercase only
self.list = [x.upper() for x in p] self.list = [x.upper() for x in p]
def __str__(self) -> str: def __str__(self) -> str:

View File

@@ -627,7 +627,7 @@ class ADF_SD(CardADF):
kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b'' kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b''
kcv = b2h(kcv_bin) kcv = b2h(kcv_bin)
if self._cmd.lchan.scc.scp: if self._cmd.lchan.scc.scp:
# encrypted key data with DEK of current SCP # encrypte key data with DEK of current SCP
kcb = b2h(self._cmd.lchan.scc.scp.encrypt_key(h2b(opts.key_data[i]))) kcb = b2h(self._cmd.lchan.scc.scp.encrypt_key(h2b(opts.key_data[i])))
else: else:
# (for example) during personalization, DEK might not be required) # (for example) during personalization, DEK might not be required)
@@ -755,7 +755,7 @@ class ADF_SD(CardADF):
inst_load_parser = argparse.ArgumentParser() inst_load_parser = argparse.ArgumentParser()
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True, inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
help='AID of the loaded file') help='AID of the loded file')
inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='', inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='',
help='AID of the Security Domain into which the file shalle be added') help='AID of the Security Domain into which the file shalle be added')
inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='', inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='',
@@ -845,7 +845,7 @@ class ADF_SD(CardADF):
# TODO:tune chunk_len based on the overhead of the used SCP? # TODO:tune chunk_len based on the overhead of the used SCP?
# build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case # build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
# transfer this in various chunks to the card # transfer this in vaious chunks to the card
total_size = len(remainder) total_size = len(remainder)
block_nr = 0 block_nr = 0
while len(remainder): while len(remainder):

View File

@@ -104,4 +104,4 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
# KID 0x02: SK.CASD.AUT (PK) and KS.CASD.AUT (Non-PK) # KID 0x02: SK.CASD.AUT (PK) and KS.CASD.AUT (Non-PK)
# KID 0x03: SK.CASD.CT (P) and KS.CASD.CT (Non-PK) # KID 0x03: SK.CASD.CT (P) and KS.CASD.CT (Non-PK)
# KVN 0x75 KID 0x01: 16-byte DES key for Ciphered Load File Data Block # KVN 0x75 KID 0x01: 16-byte DES key for Ciphered Load File Data Block
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s support # KVN 0xFF reserved for ISD with SCP02 without SCP80 s upport

View File

@@ -97,7 +97,7 @@ class CapFile():
raise ValueError("invalid cap file, %s missing!" % required_components[component]) raise ValueError("invalid cap file, %s missing!" % required_components[component])
def get_loadfile(self) -> bytes: def get_loadfile(self) -> bytes:
"""Get the executable loadfile as hexstring""" """Get the executeable loadfile as hexstring"""
# Concatenate all cap file components in the specified order # Concatenate all cap file components in the specified order
# see also: Java Card Platform Virtual Machine Specification, v3.2, section 6.3 # see also: Java Card Platform Virtual Machine Specification, v3.2, section 6.3
loadfile = self.__component['Header'] loadfile = self.__component['Header']

View File

@@ -495,7 +495,7 @@ class IsimCard(UiccCardBase):
class MagicSimBase(abc.ABC, SimCard): class MagicSimBase(abc.ABC, SimCard):
""" """
These cards uses several record based EFs to store the provider infos, Theses cards uses several record based EFs to store the provider infos,
each possible provider uses a specific record number in each EF. The each possible provider uses a specific record number in each EF. The
indexes used are ( where N is the number of providers supported ) : indexes used are ( where N is the number of providers supported ) :
- [2 .. N+1] for the operator name - [2 .. N+1] for the operator name
@@ -644,7 +644,7 @@ class MagicSim(MagicSimBase):
class FakeMagicSim(SimCard): class FakeMagicSim(SimCard):
""" """
These cards have a record based EF 3f00/000c that contains the provider Theses cards have a record based EF 3f00/000c that contains the provider
information. See the program method for its format. The records go from information. See the program method for its format. The records go from
1 to N. 1 to N.
""" """

View File

@@ -296,7 +296,7 @@ def dec_addr_tlv(hexstr):
elif addr_type == 0x01: # IPv4 elif addr_type == 0x01: # IPv4
# Skip address tye byte i.e. first byte in value list # Skip address tye byte i.e. first byte in value list
# Skip the unused byte in Octet 4 after address type byte as per 3GPP TS 31.102 # Skip the unused byte in Octect 4 after address type byte as per 3GPP TS 31.102
ipv4 = tlv[2][2:] ipv4 = tlv[2][2:]
content = '.'.join(str(x) for x in ipv4) content = '.'.join(str(x) for x in ipv4)
return (content, '01') return (content, '01')

View File

@@ -1,128 +0,0 @@
# -*- coding: utf-8 -*-
""" pySim: Logging
"""
#
# (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier <pmaier@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
from cmd2 import style
class _PySimLogHandler(logging.Handler):
def __init__(self, log_callback):
super().__init__()
self.log_callback = log_callback
def emit(self, record):
formatted_message = self.format(record)
self.log_callback(formatted_message, record)
class PySimLogger:
"""
Static class to centralize the log output of PySim applications. This class can be used to print log messages from
any pySim module. Configuration of the log behaviour (see setup and set_ methods) is entirely optional. In case no
print callback is set (see setup method), the logger will pass the log messages directly to print() without applying
any formatting to the original log message.
"""
LOG_FMTSTR = "%(levelname)s: %(message)s"
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
__formatter = logging.Formatter(LOG_FMTSTR)
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
# No print callback by default, means that log messages are passed directly to print()
print_callback = None
# No specific color scheme by default
colors = {}
# The logging default is non-verbose logging on logging level DEBUG. This is a safe default that works for
# applications that ignore the presence of the PySimLogger class.
verbose = False
logging.root.setLevel(logging.DEBUG)
def __init__(self):
raise RuntimeError('static class, do not instantiate')
@staticmethod
def setup(print_callback = None, colors:dict = {}):
"""
Set a print callback function and color scheme. This function call is optional. In case this method is not
called, default settings apply.
Args:
print_callback : A callback function that accepts the resulting log string as input. The callback should
have the following format: print_callback(message:str)
colors : An optional dict through which certain log levels can be assigned a color.
(e.g. {logging.WARN: YELLOW})
"""
PySimLogger.print_callback = print_callback
PySimLogger.colors = colors
@staticmethod
def set_verbose(verbose:bool = False):
"""
Enable/disable verbose logging. (has no effect in case no print callback is set, see method setup)
Args:
verbose: verbosity (True = verbose logging, False = normal logging)
"""
PySimLogger.verbose = verbose;
@staticmethod
def set_level(level:int = logging.DEBUG):
"""
Set the logging level.
Args:
level: Logging level, valis log leves are: DEBUG, INFO, WARNING, ERROR and CRITICAL
"""
logging.root.setLevel(level)
@staticmethod
def _log_callback(message, record):
if not PySimLogger.print_callback:
# In case no print callback has been set display the message as if it were printed trough a normal
# python print statement.
print(record.message)
else:
# When a print callback is set, use it to display the log line. Apply color if the API user chose one
if PySimLogger.verbose:
formatted_message = logging.Formatter.format(PySimLogger.__formatter_verbose, record)
else:
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
color = PySimLogger.colors.get(record.levelno)
if color:
if type(color) is str:
PySimLogger.print_callback(color + formatted_message + "\033[0m")
else:
PySimLogger.print_callback(style(formatted_message, fg = color))
else:
PySimLogger.print_callback(formatted_message)
@staticmethod
def get(log_facility: str):
"""
Set up and return a new python logger object
Args:
log_facility : Name of log facility (e.g. "MAIN", "RUNTIME"...)
"""
logger = logging.getLogger(log_facility)
handler = _PySimLogHandler(log_callback=PySimLogger._log_callback)
logger.addHandler(handler)
return logger

View File

@@ -110,7 +110,7 @@ class CardProfile:
@abc.abstractmethod @abc.abstractmethod
def _try_match_card(cls, scc: SimCardCommands) -> None: def _try_match_card(cls, scc: SimCardCommands) -> None:
"""Try to see if the specific profile matches the card. This method is a """Try to see if the specific profile matches the card. This method is a
placeholder that is overloaded by specific derived classes. The method placeholder that is overloaded by specific dirived classes. The method
actively probes the card to make sure the profile class matches the actively probes the card to make sure the profile class matches the
physical card. This usually also means that the card is reset during physical card. This usually also means that the card is reset during
the process, so this method must not be called at random times. It may the process, so this method must not be called at random times. It may

View File

@@ -23,9 +23,6 @@ from osmocom.tlv import bertlv_parse_one
from pySim.exceptions import * from pySim.exceptions import *
from pySim.filesystem import * from pySim.filesystem import *
from pySim.log import PySimLogger
log = PySimLogger.get("RUNTIME")
def lchan_nr_from_cla(cla: int) -> int: def lchan_nr_from_cla(cla: int) -> int:
"""Resolve the logical channel number from the CLA byte.""" """Resolve the logical channel number from the CLA byte."""
@@ -47,7 +44,6 @@ class RuntimeState:
card : pysim.cards.Card instance card : pysim.cards.Card instance
profile : CardProfile instance profile : CardProfile instance
""" """
self.mf = CardMF(profile=profile) self.mf = CardMF(profile=profile)
self.card = card self.card = card
self.profile = profile self.profile = profile
@@ -64,13 +60,10 @@ class RuntimeState:
self.card.set_apdu_parameter( self.card.set_apdu_parameter(
cla=self.profile.cla, sel_ctrl=self.profile.sel_ctrl) cla=self.profile.cla, sel_ctrl=self.profile.sel_ctrl)
# make sure MF is selected before probing for Addons
self.lchan[0].select('MF')
for addon_cls in self.profile.addons: for addon_cls in self.profile.addons:
addon = addon_cls() addon = addon_cls()
if addon.probe(self.card): if addon.probe(self.card):
log.info("Detected %s Add-on \"%s\"" % (self.profile, addon)) print("Detected %s Add-on \"%s\"" % (self.profile, addon))
for f in addon.files_in_mf: for f in addon.files_in_mf:
self.mf.add_file(f) self.mf.add_file(f)
@@ -104,18 +97,18 @@ class RuntimeState:
apps_taken = [] apps_taken = []
if aids_card: if aids_card:
aids_taken = [] aids_taken = []
log.info("AIDs on card:") print("AIDs on card:")
for a in aids_card: for a in aids_card:
for f in apps_profile: for f in apps_profile:
if f.aid in a: if f.aid in a:
log.info(" %s: %s (EF.DIR)" % (f.name, a)) print(" %s: %s (EF.DIR)" % (f.name, a))
aids_taken.append(a) aids_taken.append(a)
apps_taken.append(f) apps_taken.append(f)
aids_unknown = set(aids_card) - set(aids_taken) aids_unknown = set(aids_card) - set(aids_taken)
for a in aids_unknown: for a in aids_unknown:
log.info(" unknown: %s (EF.DIR)" % a) print(" unknown: %s (EF.DIR)" % a)
else: else:
log.warn("EF.DIR seems to be empty!") print("warning: EF.DIR seems to be empty!")
# Some card applications may not be registered in EF.DIR, we will actively # Some card applications may not be registered in EF.DIR, we will actively
# probe for those applications # probe for those applications
@@ -130,7 +123,7 @@ class RuntimeState:
_data, sw = self.card.select_adf_by_aid(f.aid) _data, sw = self.card.select_adf_by_aid(f.aid)
self.selected_adf = f self.selected_adf = f
if sw == "9000": if sw == "9000":
log.info(" %s: %s" % (f.name, f.aid)) print(" %s: %s" % (f.name, f.aid))
apps_taken.append(f) apps_taken.append(f)
except (SwMatchError, ProtocolError): except (SwMatchError, ProtocolError):
pass pass
@@ -154,7 +147,7 @@ class RuntimeState:
# select MF to reset internal state and to verify card really works # select MF to reset internal state and to verify card really works
self.lchan[0].select('MF', cmd_app) self.lchan[0].select('MF', cmd_app)
self.lchan[0].selected_adf = None self.lchan[0].selected_adf = None
# store ATR as part of our card identities dict # store ATR as part of our card identies dict
self.identity['ATR'] = atr self.identity['ATR'] = atr
return atr return atr
@@ -328,7 +321,7 @@ class RuntimeLchan:
# If we succeed, we know that the file exists on the card and we may # If we succeed, we know that the file exists on the card and we may
# proceed with creating a new CardEF object in the local file model at # proceed with creating a new CardEF object in the local file model at
# run time. In case the file does not exist on the card, we just abort. # run time. In case the file does not exist on the card, we just abort.
# The state on the card (selected file/application) won't be changed, # The state on the card (selected file/application) wont't be changed,
# so we do not have to update any state in that case. # so we do not have to update any state in that case.
(data, _sw) = self.scc.select_file(fid) (data, _sw) = self.scc.select_file(fid)
except SwMatchError as swm: except SwMatchError as swm:
@@ -477,15 +470,11 @@ class RuntimeLchan:
def get_file_for_filename(self, name: str): def get_file_for_filename(self, name: str):
"""Get the related CardFile object for a specified filename.""" """Get the related CardFile object for a specified filename."""
if is_hex(name):
name = name.lower()
sels = self.selected_file.get_selectables() sels = self.selected_file.get_selectables()
return sels[name] return sels[name]
def activate_file(self, name: str): def activate_file(self, name: str):
"""Request ACTIVATE FILE of specified file.""" """Request ACTIVATE FILE of specified file."""
if is_hex(name):
name = name.lower()
sels = self.selected_file.get_selectables() sels = self.selected_file.get_selectables()
f = sels[name] f = sels[name]
data, sw = self.scc.activate_file(f.fid) data, sw = self.scc.activate_file(f.fid)
@@ -518,47 +507,6 @@ class RuntimeLchan:
dec_data = self.selected_file.decode_hex(data) dec_data = self.selected_file.decode_hex(data)
return (dec_data, sw) return (dec_data, sw)
def __get_writeable_size(self):
""" Determine the writable size (file or record) using the cached FCP parameters of the currently selected
file. Return None in case the writeable size cannot be determined (no FCP available, FCP lacks size
information).
"""
fcp = self.selected_file_fcp
if not fcp:
return None
structure = fcp.get('file_descriptor', {}).get('file_descriptor_byte', {}).get('structure')
if not structure:
return None
if structure == 'transparent':
return fcp.get('file_size')
elif structure == 'linear_fixed':
return fcp.get('file_descriptor', {}).get('record_len')
else:
return None
def __check_writeable_size(self, data_len):
""" Guard against unsuccessful writes caused by attempts to write data that exceeds the file limits. """
writeable_size = self.__get_writeable_size()
if not writeable_size:
return
if isinstance(self.selected_file, TransparentEF):
writeable_name = "file"
elif isinstance(self.selected_file, LinFixedEF):
writeable_name = "record"
else:
writeable_name = "object"
if data_len > writeable_size:
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
(data_len, writeable_name, writeable_size, data_len - writeable_size))
elif data_len < writeable_size:
log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
def update_binary(self, data_hex: str, offset: int = 0): def update_binary(self, data_hex: str, offset: int = 0):
"""Update transparent EF binary data. """Update transparent EF binary data.
@@ -569,7 +517,6 @@ class RuntimeLchan:
if not isinstance(self.selected_file, TransparentEF): if not isinstance(self.selected_file, TransparentEF):
raise TypeError("Only works with TransparentEF, but %s is %s" % (self.selected_file, raise TypeError("Only works with TransparentEF, but %s is %s" % (self.selected_file,
self.selected_file.__class__.__mro__)) self.selected_file.__class__.__mro__))
self.__check_writeable_size(len(data_hex) // 2 + offset)
return self.scc.update_binary(self.selected_file.fid, data_hex, offset, conserve=self.rs.conserve_write) return self.scc.update_binary(self.selected_file.fid, data_hex, offset, conserve=self.rs.conserve_write)
def update_binary_dec(self, data: dict): def update_binary_dec(self, data: dict):
@@ -617,7 +564,6 @@ class RuntimeLchan:
if not isinstance(self.selected_file, LinFixedEF): if not isinstance(self.selected_file, LinFixedEF):
raise TypeError("Only works with Linear Fixed EF, but %s is %s" % (self.selected_file, raise TypeError("Only works with Linear Fixed EF, but %s is %s" % (self.selected_file,
self.selected_file.__class__.__mro__)) self.selected_file.__class__.__mro__))
self.__check_writeable_size(len(data_hex) // 2)
return self.scc.update_record(self.selected_file.fid, rec_nr, data_hex, return self.scc.update_record(self.selected_file.fid, rec_nr, data_hex,
conserve=self.rs.conserve_write, conserve=self.rs.conserve_write,
leftpad=self.selected_file.leftpad) leftpad=self.selected_file.leftpad)

View File

@@ -32,7 +32,7 @@ class SecureChannel(abc.ABC):
pass pass
def send_apdu_wrapper(self, send_fn: callable, pdu: Hexstr, *args, **kwargs) -> ResTuple: def send_apdu_wrapper(self, send_fn: callable, pdu: Hexstr, *args, **kwargs) -> ResTuple:
"""Wrapper function to wrap command APDU and unwrap response APDU around send_apdu callable.""" """Wrapper function to wrap command APDU and unwrap repsonse APDU around send_apdu callable."""
pdu_wrapped = b2h(self.wrap_cmd_apdu(h2b(pdu))) pdu_wrapped = b2h(self.wrap_cmd_apdu(h2b(pdu)))
res, sw = send_fn(pdu_wrapped, *args, **kwargs) res, sw = send_fn(pdu_wrapped, *args, **kwargs)
res_unwrapped = b2h(self.unwrap_rsp_apdu(h2b(sw), h2b(res))) res_unwrapped = b2h(self.unwrap_rsp_apdu(h2b(sw), h2b(res)))

View File

@@ -200,7 +200,7 @@ class LinkBase(abc.ABC):
# It *was* successful after all -- the extra pieces FETCH handled # It *was* successful after all -- the extra pieces FETCH handled
# need not concern the caller. # need not concern the caller.
rv = (rv[0], '9000') rv = (rv[0], '9000')
# proactive sim as per TS 102 221 Section 7.4.2 # proactive sim as per TS 102 221 Setion 7.4.2
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place) # TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw) fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
# Setting this in case we later decide not to send a terminal # Setting this in case we later decide not to send a terminal
@@ -228,7 +228,7 @@ class LinkBase(abc.ABC):
# Structure as per TS 102 223 V4.4.0 Section 6.8 # Structure as per TS 102 223 V4.4.0 Section 6.8
# Testing hint: The value of tail does not influence the behavior # Testing hint: The value of tail does not influence the behavior
# of an SJA2 that sent an SMS, so this is implemented only # of an SJA2 that sent ans SMS, so this is implemented only
# following TS 102 223, and not fully tested. # following TS 102 223, and not fully tested.
ti_list_bin = [x.to_tlv() for x in ti_list] ti_list_bin = [x.to_tlv() for x in ti_list]
tail = b''.join(ti_list_bin) tail = b''.join(ti_list_bin)

View File

@@ -750,7 +750,7 @@ class EF_ARR(LinFixedEF):
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser) @cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
def do_read_arr_record(self, opts): def do_read_arr_record(self, opts):
"""Read one EF.ARR record in flattened, human-friendly form.""" """Read one EF.ARR record in flattened, human-friendly form."""
(data, _sw) = self._cmd.lchan.read_record_dec(opts.RECORD_NR) (data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
data = self._cmd.lchan.selected_file.flatten(data) data = self._cmd.lchan.selected_file.flatten(data)
self._cmd.poutput_json(data, opts.oneline) self._cmd.poutput_json(data, opts.oneline)

View File

@@ -208,7 +208,7 @@ EF_5G_PROSE_ST_map = {
5: '5G ProSe configuration data for usage information reporting', 5: '5G ProSe configuration data for usage information reporting',
} }
# Mapping between USIM Enabled Service Number and its description # Mapping between USIM Enbled Service Number and its description
EF_EST_map = { EF_EST_map = {
1: 'Fixed Dialling Numbers (FDN)', 1: 'Fixed Dialling Numbers (FDN)',
2: 'Barred Dialling Numbers (BDN)', 2: 'Barred Dialling Numbers (BDN)',

View File

@@ -119,7 +119,7 @@ class EF_AC_GBAUAPI(LinFixedEF):
"""The use of this EF is eescribed in 3GPP TS 31.130""" """The use of this EF is eescribed in 3GPP TS 31.130"""
class AppletNafAccessControl(BER_TLV_IE, tag=0x80): class AppletNafAccessControl(BER_TLV_IE, tag=0x80):
# the use of Int8ub as length field in Prefixed is strictly speaking incorrect, as it is a BER-TLV # the use of Int8ub as length field in Prefixed is strictly speaking incorrect, as it is a BER-TLV
# length field which will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can # length field whihc will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
# safely be assumed shorter than that # safely be assumed shorter than that
_construct = Struct('aid'/Prefixed(Int8ub, GreedyBytes), _construct = Struct('aid'/Prefixed(Int8ub, GreedyBytes),
'naf_id'/Prefixed(Int8ub, GreedyBytes)) 'naf_id'/Prefixed(Int8ub, GreedyBytes))

View File

@@ -267,11 +267,11 @@ class EF_SMSP(LinFixedEF):
raise ValueError raise ValueError
def _encode(self, obj, context, path): def _encode(self, obj, context, path):
if obj <= 12*60: if obj <= 12*60:
return obj // 5 - 1 return obj/5 - 1
elif obj <= 24*60: elif obj <= 24*60:
return 143 + ((obj - (12 * 60)) // 30) return 143 + ((obj - (12 * 60)) // 30)
elif obj <= 30 * 24 * 60: elif obj <= 30 * 24 * 60:
return 166 + (obj // (24 * 60)) return 166 + (obj / (24 * 60))
elif obj <= 63 * 7 * 24 * 60: elif obj <= 63 * 7 * 24 * 60:
return 192 + (obj // (7 * 24 * 60)) return 192 + (obj // (7 * 24 * 60))
else: else:
@@ -280,7 +280,7 @@ class EF_SMSP(LinFixedEF):
def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs): def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs):
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs) super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs)
ScAddr = Struct('length'/Int8ub, 'ton_npi'/TonNpi, 'call_number'/BcdAdapter(Rpad(Bytes(10)))) ScAddr = Struct('length'/Int8ub, 'ton_npi'/TonNpi, 'call_number'/BcdAdapter(Rpad(Bytes(10))))
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))), self._construct = Struct('alpha_id'/COptional(GsmStringAdapter(Rpad(Bytes(this._.total_len-28)))),
'parameter_indicators'/InvertAdapter(FlagsEnum(Byte, tp_dest_addr=1, tp_sc_addr=2, 'parameter_indicators'/InvertAdapter(FlagsEnum(Byte, tp_dest_addr=1, tp_sc_addr=2,
tp_pid=3, tp_dcs=4, tp_vp=5)), tp_pid=3, tp_dcs=4, tp_vp=5)),
'tp_dest_addr'/ScAddr, 'tp_dest_addr'/ScAddr,
@@ -1007,7 +1007,7 @@ class EF_ICCID(TransparentEF):
def _encode_hex(self, abstract, **kwargs): def _encode_hex(self, abstract, **kwargs):
return enc_iccid(abstract['iccid']) return enc_iccid(abstract['iccid'])
# TS 102 221 Section 13.3 / TS 31.101 Section 13 / TS 51.011 Section 10.1.2 # TS 102 221 Section 13.3 / TS 31.101 Secction 13 / TS 51.011 Section 10.1.2
class EF_PL(TransRecEF): class EF_PL(TransRecEF):
_test_de_encode = [ _test_de_encode = [
( '6465', "de" ), ( '6465', "de" ),

View File

@@ -15,7 +15,6 @@ from osmocom.tlv import bertlv_encode_tag, bertlv_encode_len
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com> # Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org> # Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
# Copyright (C) 2009-2022 Ludovic Rousseau
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
@@ -331,7 +330,7 @@ def derive_mnc(digit1: int, digit2: int, digit3: int = 0x0f) -> int:
mnc = 0 mnc = 0
# 3-rd digit is optional for the MNC. If present # 3-rd digit is optional for the MNC. If present
# the algorithm is the same as for the MCC. # the algorythm is the same as for the MCC.
if digit3 != 0x0f: if digit3 != 0x0f:
return derive_mcc(digit1, digit2, digit3) return derive_mcc(digit1, digit2, digit3)
@@ -411,7 +410,7 @@ def get_addr_type(addr):
fqdn_flag = True fqdn_flag = True
for i in addr_list: for i in addr_list:
# Only Alphanumeric characters and hyphen - RFC 1035 # Only Alpha-numeric characters and hyphen - RFC 1035
import re import re
if not re.match("^[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)?$", i): if not re.match("^[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)?$", i):
fqdn_flag = False fqdn_flag = False
@@ -477,7 +476,7 @@ def expand_hex(hexstring, length):
"""Expand a given hexstring to a specified length by replacing "." or ".." """Expand a given hexstring to a specified length by replacing "." or ".."
with a filler that is derived from the neighboring nibbles respective with a filler that is derived from the neighboring nibbles respective
bytes. Usually this will be the nibble respective byte before "." or bytes. Usually this will be the nibble respective byte before "." or
"..", except when the string begins with "." or "..", then the nibble "..", execpt when the string begins with "." or "..", then the nibble
respective byte after "." or ".." is used.". In case the string cannot respective byte after "." or ".." is used.". In case the string cannot
be expanded for some reason, the input string is returned unmodified. be expanded for some reason, the input string is returned unmodified.
@@ -586,138 +585,10 @@ def parse_command_apdu(apdu: bytes) -> int:
raise ValueError('invalid APDU (%s), too short!' % b2h(apdu)) raise ValueError('invalid APDU (%s), too short!' % b2h(apdu))
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
def normalizeATR(atr):
"""Transform an ATR in list of integers.
valid input formats are
"3B A7 00 40 18 80 65 A2 08 01 01 52"
"3B:A7:00:40:18:80:65:A2:08:01:01:52"
Args:
atr: string
Returns:
list of bytes
>>> normalize("3B:A7:00:40:18:80:65:A2:08:01:01:52")
[59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82]
"""
atr = atr.replace(":", "")
atr = atr.replace(" ", "")
res = []
while len(atr) >= 2:
byte, atr = atr[:2], atr[2:]
res.append(byte)
if len(atr) > 0:
raise ValueError("warning: odd string, remainder: %r" % atr)
atr = [int(x, 16) for x in res]
return atr
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
def decomposeATR(atr_txt):
"""Decompose the ATR in elementary fields
Args:
atr_txt: ATR as a hex bytes string
Returns:
dictionary of field and values
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
{ 'T0': {'value': 167},
'TB': {1: {'value': 0}},
'TC': {2: {'value': 24}},
'TD': {1: {'value': 64}},
'TS': {'value': 59},
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
'hbn': 7}
"""
ATR_PROTOCOL_TYPE_T0 = 0
atr_txt = normalizeATR(atr_txt)
atr = {}
# the ATR itself as a list of integers
atr["atr"] = atr_txt
# store TS and T0
atr["TS"] = {"value": atr_txt[0]}
TDi = atr_txt[1]
atr["T0"] = {"value": TDi}
hb_length = TDi & 15
pointer = 1
# protocol number
pn = 1
# store number of historical bytes
atr["hbn"] = TDi & 0xF
while pointer < len(atr_txt):
# Check TAi is present
if (TDi | 0xEF) == 0xFF:
pointer += 1
if "TA" not in atr:
atr["TA"] = {}
atr["TA"][pn] = {"value": atr_txt[pointer]}
# Check TBi is present
if (TDi | 0xDF) == 0xFF:
pointer += 1
if "TB" not in atr:
atr["TB"] = {}
atr["TB"][pn] = {"value": atr_txt[pointer]}
# Check TCi is present
if (TDi | 0xBF) == 0xFF:
pointer += 1
if "TC" not in atr:
atr["TC"] = {}
atr["TC"][pn] = {"value": atr_txt[pointer]}
# Check TDi is present
if (TDi | 0x7F) == 0xFF:
pointer += 1
if "TD" not in atr:
atr["TD"] = {}
TDi = atr_txt[pointer]
atr["TD"][pn] = {"value": TDi}
if (TDi & 0x0F) != ATR_PROTOCOL_TYPE_T0:
atr["TCK"] = True
pn += 1
else:
break
# Store historical bytes
atr["hb"] = {"value": atr_txt[pointer + 1 : pointer + 1 + hb_length]}
# Store TCK
last = pointer + 1 + hb_length
if "TCK" in atr:
try:
atr["TCK"] = {"value": atr_txt[last]}
except IndexError:
atr["TCK"] = {"value": -1}
last += 1
if len(atr_txt) > last:
atr["extra"] = atr_txt[last:]
if len(atr["hb"]["value"]) < hb_length:
missing = hb_length - len(atr["hb"]["value"])
if missing > 1:
(t1, t2) = ("s", "are")
else:
(t1, t2) = ("", "is")
atr["warning"] = "ATR is truncated: %d byte%s %s missing" % (missing, t1, t2)
return atr
class DataObject(abc.ABC): class DataObject(abc.ABC):
"""A DataObject (DO) in the sense of ISO 7816-4. Contrary to 'normal' TLVs where one """A DataObject (DO) in the sense of ISO 7816-4. Contrary to 'normal' TLVs where one
simply has any number of different TLVs that may occur in any order at any point, ISO 7816 simply has any number of different TLVs that may occur in any order at any point, ISO 7816
has the habit of specifying TLV data but with very specific ordering, or specific choices of has the habit of specifying TLV data but with very spcific ordering, or specific choices of
tags at specific points in a stream. This class tries to represent this.""" tags at specific points in a stream. This class tries to represent this."""
def __init__(self, name: str, desc: Optional[str] = None, tag: Optional[int] = None): def __init__(self, name: str, desc: Optional[str] = None, tag: Optional[int] = None):
@@ -839,7 +710,7 @@ class TL0_DataObject(DataObject):
class DataObjectCollection: class DataObjectCollection:
"""A DataObjectCollection consists of multiple Data Objects identified by their tags. """A DataObjectCollection consits of multiple Data Objects identified by their tags.
A given encoded DO may contain any of them in any order, and may contain multiple instances A given encoded DO may contain any of them in any order, and may contain multiple instances
of each DO.""" of each DO."""

View File

@@ -1,6 +1,3 @@
[build-system] [build-system]
requires = ["setuptools", "wheel"] requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[tool.pylint.main]
ignored-classes = ["twisted.internet.reactor"]

View File

@@ -1,7 +1,7 @@
pyscard pyscard
pyserial pyserial
pytlv pytlv
cmd2>=2.6.2,<3.0 cmd2>=1.5
jsonpath-ng jsonpath-ng
construct>=2.10.70 construct>=2.10.70
bidict bidict
@@ -15,4 +15,3 @@ git+https://github.com/osmocom/asn1tools
packaging packaging
git+https://github.com/hologram-io/smpp.pdu git+https://github.com/hologram-io/smpp.pdu
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
psycopg2-binary

View File

@@ -21,7 +21,7 @@ setup(
"pyscard", "pyscard",
"pyserial", "pyserial",
"pytlv", "pytlv",
"cmd2 >= 1.5.0, < 3.0", "cmd2 >= 1.5.0",
"jsonpath-ng", "jsonpath-ng",
"construct >= 2.10.70", "construct >= 2.10.70",
"bidict", "bidict",
@@ -34,7 +34,6 @@ setup(
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu", "smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
"asn1tools", "asn1tools",
"smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted", "smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted",
"psycopg2-binary"
], ],
scripts=[ scripts=[
'pySim-prog.py', 'pySim-prog.py',

View File

@@ -1,7 +1,7 @@
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIICgzCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ MIICgjCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB SVQwHhcNMjQwNzA5MTUyOTM2WhcNMjUwODExMTUyOTM2WjAzMQ0wCwYDVQQKDARB
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI
zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n
MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV
@@ -10,7 +10,7 @@ A1UdIAQNMAswCQYHZ4ESAQIBAzAdBgNVHQ4EFgQUPTMJg/OfzFvS5K1ophmnR0iu
i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ
dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG
JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0 JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0
cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0gA cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0cA
MEUCIQCfaGcMk+kuSJsbIyRPWttwWNftwQdHCQuu346PaiA2FAIgUrqhPw2um9gV MEQCIHHmXEy9mgudh/VbK0hJwmX7eOgbvHLnlujrpQzvUd4uAiBFVJgSdzYvrmJ9
C+eWHaXio7WQh5L6VgLZzNifTQcldD4= 5yeIvmjHwxSMBgQp2dde7OtdVEK8Kw==
-----END CERTIFICATE----- -----END CERTIFICATE-----

View File

@@ -1,7 +1,7 @@
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIICgjCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ MIICgzCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB SVQwHhcNMjQwNzA5MTUyODMzWhcNMjUwODExMTUyODMzWjAzMQ0wCwYDVQQKDARB
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
@@ -10,7 +10,7 @@ VR0gBA0wCzAJBgdngRIBAgEDMB0GA1UdDgQWBBQn/vHyKRh+x4Pt9uApZKRRjVfU
qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0 qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSAAw Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSQAw
RQIhAL+1lp/hGsj87/5RqOX2u3hS/VSftDN7EPrHJJFnTXLRAiBVxemKIKmC7+W1 RgIhAL1qQ/cnrCZC7UnnLJ8WeK+0aWUJFWh1cOlBEzw0NlTVAiEA25Vf4WHzwmJi
+RsTY5I51R+Cyoq4l5TEU49eplo5bw== zkARzxJ1qB0qfBofuJrtfPM4gNJ4Quw=
-----END CERTIFICATE----- -----END CERTIFICATE-----

View File

@@ -0,0 +1 @@
/tmp/sm-dp-sessions-BRP

View File

@@ -0,0 +1 @@
/tmp/sm-dp-sessions-NIST

View File

@@ -2,7 +2,7 @@ Detected UICC Add-on "SIM"
Detected UICC Add-on "GSM-R" Detected UICC Add-on "GSM-R"
Detected UICC Add-on "RUIM" Detected UICC Add-on "RUIM"
Can't read AIDs from SIM -- 'list' object has no attribute 'lower' Can't read AIDs from SIM -- 'list' object has no attribute 'lower'
EF.DIR seems to be empty! warning: EF.DIR seems to be empty!
ADF.ECASD: a0000005591010ffffffff8900000200 ADF.ECASD: a0000005591010ffffffff8900000200
ADF.ISD-R: a0000005591010ffffffff8900000100 ADF.ISD-R: a0000005591010ffffffff8900000100
ISIM: a0000000871004 ISIM: a0000000871004

View File

@@ -1,5 +0,0 @@
"card_type_id","formfactor_id","imsi","iccid","pin1","puk1","pin2","puk2","ki","adm1","adm2","proprietary","kic1","kic2","kic3","kid1","kid2","kid3","kik1","kik2","kik3","msisdn","acc","opc"
"myCardType","3FF","901700000000001","8988211000000000001","1234","12345678","1223","12345678","AAAAAAAAAAA5435425AAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 01","BBBBBBBBBB3324BBBBBBBB21212BBBBB","CC7654CCCCCCCCCCCCCCCCCCCCCCCCCC","DDDD90DDDDDDDDDDDDDDDDDD767DDDDD","EEEEEE567657567567EEEEEEEEEEEEEE","FFFFFFFFFFFFFFFFFFF56765765FFFFF","11111567811111111111111111111111","22222222222222222227669999222222","33333333333333333333333333333333","44444444444444445234544444444444","55555555555","0001","66666666666666666666666666666666"
"myCardType","3FF","901700000000002","8988211000000000002","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAA3425AAAA","10101010","9999999999999999","proprietary data 02","BBBBBB421BBBBBBBBBB12BBBBBBBBBBB","CCCCCCCCCC3456CCCCCCCCCCCCCCCCCC","DDDDDDDDD567657DDDD2DDDDDDDDDDDD","EEEEEEEE56756EEEEEEEEE567657EEEE","FFFFF567657FFFFFFFFFFFFFFFFFFFFF","11111111111146113433411576511111","22222222222223432225765222222222","33333333333333523453333333333333","44425435234444444444446544444444","55555555555","0001","66666666666666266666666666666666"
"myCardType","3FF","901700000000003","8988211000000000003","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 03","BBBBBBB45678BBBB756765BBBBBBBBBB","CCCCCCCCCCCCCC76543CCCC56765CCCC","DDDDDDDDDDDDDDDDDD5676575DDDDDDD","EEEEEEEEEEEEEEEEEE56765EEEEEEEEE","FFFFFFFFFFFFFFF567657FFFFFFFFFFF","11111111119876511111111111111111","22222222222444422222222222576522","33333332543333576733333333333333","44444444444567657567444444444444","55555555555","0001","66666675676575666666666666666666"
1 card_type_id formfactor_id imsi iccid pin1 puk1 pin2 puk2 ki adm1 adm2 proprietary kic1 kic2 kic3 kid1 kid2 kid3 kik1 kik2 kik3 msisdn acc opc
2 myCardType 3FF 901700000000001 8988211000000000001 1234 12345678 1223 12345678 AAAAAAAAAAA5435425AAAAAAAAAAAAAA 10101010 9999999999999999 proprietary data 01 BBBBBBBBBB3324BBBBBBBB21212BBBBB CC7654CCCCCCCCCCCCCCCCCCCCCCCCCC DDDD90DDDDDDDDDDDDDDDDDD767DDDDD EEEEEE567657567567EEEEEEEEEEEEEE FFFFFFFFFFFFFFFFFFF56765765FFFFF 11111567811111111111111111111111 22222222222222222227669999222222 33333333333333333333333333333333 44444444444444445234544444444444 55555555555 0001 66666666666666666666666666666666
3 myCardType 3FF 901700000000002 8988211000000000002 1234 12345678 1223 12345678 AAAAAAAAAAAAAAAAAAAAAAAA3425AAAA 10101010 9999999999999999 proprietary data 02 BBBBBB421BBBBBBBBBB12BBBBBBBBBBB CCCCCCCCCC3456CCCCCCCCCCCCCCCCCC DDDDDDDDD567657DDDD2DDDDDDDDDDDD EEEEEEEE56756EEEEEEEEE567657EEEE FFFFF567657FFFFFFFFFFFFFFFFFFFFF 11111111111146113433411576511111 22222222222223432225765222222222 33333333333333523453333333333333 44425435234444444444446544444444 55555555555 0001 66666666666666266666666666666666
4 myCardType 3FF 901700000000003 8988211000000000003 1234 12345678 1223 12345678 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA 10101010 9999999999999999 proprietary data 03 BBBBBBB45678BBBB756765BBBBBBBBBB CCCCCCCCCCCCCC76543CCCC56765CCCC DDDDDDDDDDDDDDDDDD5676575DDDDDDD EEEEEEEEEEEEEEEEEE56765EEEEEEEEE FFFFFFFFFFFFFFF567657FFFFFFFFFFF 11111111119876511111111111111111 22222222222444422222222222576522 33333332543333576733333333333333 44444444444567657567444444444444 55555555555 0001 66666675676575666666666666666666

View File

@@ -1,152 +0,0 @@
#!/usr/bin/env python3
import unittest
import os
from pySim.card_key_provider import *
class TestCardKeyProviderCsv(unittest.TestCase):
def __init__(self, *args, **kwargs):
column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
"OPC" : "000102030405065545645645645D0E0F",
"KIC1" : "06410203546406456456450B0C0D0E0F",
"KID1" : "00040267840507667609045645645E0F",
"KIK1" : "0001020307687607668678678C0D0E0F",
"KIC2" : "000142457594860706090A0B0688678F",
"KID2" : "600102030405649468690A0B0C0D648F",
"KIK2" : "00010203330506070496330B08640E0F",
"KIC3" : "000104030405064684686A068C0D0E0F",
"KID3" : "00010243048468070809060B0C0D0E0F",
"KIK3" : "00010204040506070809488B0C0D0E0F"}
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
card_key_provider_register(CardKeyProviderCsv(csv_file_path, column_keys))
super().__init__(*args, **kwargs)
def test_card_key_provider_get(self):
test_data = [{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
'KI': '48a6d5f60567d45299e3ba08594009e7', 'ADM1': '10101010',
'ADM2': '9999999999999999', 'KIC1': '3eb8567fa0b4b1e63bcab13bff5f2702',
'KIC2': 'fd6c173a5b3f04b563808da24237fb46',
'KIC3': '66c8c848e5dff69d70689d155d44f323',
'KID1': 'd78accce870332dced467c173244dd94',
'KID2': 'b3bf050969747b2d2c9389e127a3d791',
'KID3': '40a77deb50d260b3041bbde1b5040625',
'KIK1': '451b503239d818ea34421aa9c2a8887a',
'KIK2': '967716f5fca8ae179f87f76524d1ae6b',
'KIK3': '0884db5eee5409a00fc1bbc57ac52541',
'OPC': '81817574c1961dd272ad080eb2caf279'}, 'ICCID' :"8988211000000000001"},
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
'KI': 'e94d7fa6fb92375dae86744ff6ecef49', 'ADM1': '10101010',
'ADM2': '9999999999999999', 'KIC1': '79b4e39387c66253da68f653381ded44',
'KIC2': '560561b5dba89c1da8d1920049e5e4f7',
'KIC3': '79ff35e84e39305a119af8c79f84e8e5',
'KID1': '233baf89122159553d67545ecedcf8e0',
'KID2': '8fc2874164d7a8e40d72c968bc894ab8',
'KID3': '2e3320f0dda85054d261be920fbfa065',
'KIK1': 'd51b1b17630103d1672a3e9e0e4827ed',
'KIK2': 'd01edbc48be555139506b0d7982bf7ff',
'KIK3': 'a6487a5170849e8e0a03026afea91f5a',
'OPC': '6b0d19ef28bd12f2daac31828d426939'}, 'ICCID' :"8988211000000000002"},
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
'KI': '3cdec1552ef433a89f327905213c5a6e', 'ADM1': '10101010',
'ADM2': '9999999999999999', 'KIC1': '72986b13ce505e12653ad42df5cfca13',
'KIC2': '8f0d1e58b01e833773e5562c4940674d',
'KIC3': '9c72ba5a14d54f489edbffd3d8802f03',
'KID1': 'd23a42995df9ca83f74b2cfd22695526',
'KID2': '5c3a189d12aa1ac6614883d7de5e6c8c',
'KID3': 'a6ace0d303a2b38a96b418ab83c16725',
'KIK1': 'bf2319467d859c12527aa598430caef2',
'KIK2': '6a4c459934bea7e40787976b8881ab01',
'KIK3': '91cd02c38b5f68a98cc90a1f2299538f',
'OPC': '6df46814b1697daca003da23808bbbc3'}, 'ICCID' :"8988211000000000003"}]
for t in test_data:
result = card_key_provider_get(["PIN1","PUK1","PIN2","PUK2","KI","ADM1","ADM2","KIC1",
"KIC2","KIC3","KID1","KID2","KID3","KIK1","KIK2","KIK3","OPC"],
"ICCID", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
result = card_key_provider_get(["PIN1","puk1","PIN2","PUK2","KI","adm1","ADM2","KIC1",
"KIC2","kic3","KID1","KID2","KID3","kik1","KIK2","KIK3","OPC"],
"iccid", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
def test_card_key_provider_get_field(self):
test_data = [{'EXPECTED' : "3eb8567fa0b4b1e63bcab13bff5f2702", 'ICCID' :"8988211000000000001"},
{'EXPECTED' : "79b4e39387c66253da68f653381ded44", 'ICCID' :"8988211000000000002"},
{'EXPECTED' : "72986b13ce505e12653ad42df5cfca13", 'ICCID' :"8988211000000000003"}]
for t in test_data:
result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
for t in test_data:
result = card_key_provider_get_field("kic1", "iccid", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
class TestCardKeyFieldCryptor(unittest.TestCase):
def __init__(self, *args, **kwargs):
transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
"OPC" : "000102030405065545645645645D0E0F",
"KIC1" : "06410203546406456456450B0C0D0E0F",
"UICC_SCP03" : "00040267840507667609045645645E0F"}
self.crypt = CardKeyFieldCryptor(transport_keys)
super().__init__(*args, **kwargs)
def test_encrypt_field(self):
test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "OPC"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIC1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KID1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIK1"},
{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "opc"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kic1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kid1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kik1"}]
for t in test_data:
result = self.crypt.encrypt_field(t.get('FIELDNAME'), t.get('PLAINTEXT_VAL'))
self.assertEqual(result, t.get('EXPECTED'))
def test_decrypt_field(self):
test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "OPC"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIC1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KID1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIK1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "opc"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kic1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kid1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kik1"}]
for t in test_data:
result = self.crypt.decrypt_field(t.get('FIELDNAME'), t.get('ENCRYPTED_VAL'))
self.assertEqual(result, t.get('EXPECTED'))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,121 +0,0 @@
#!/usr/bin/env python3
# (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier <pmaier@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import logging
from pySim.log import PySimLogger
import io
import sys
from inspect import currentframe, getframeinfo
log = PySimLogger.get("TEST")
TEST_MSG_DEBUG = "this is a debug message"
TEST_MSG_INFO = "this is an info message"
TEST_MSG_WARNING = "this is a warning message"
TEST_MSG_ERROR = "this is an error message"
TEST_MSG_CRITICAL = "this is a critical message"
expected_message = None
class PySimLogger_Test(unittest.TestCase):
def __test_01_safe_defaults_one(self, callback, message:str):
# When log messages are sent to an unconfigured PySimLogger class, we expect the unmodified message being
# logged to stdout, just as if it were printed via a normal print() statement.
log_output = io.StringIO()
sys.stdout = log_output
callback(message)
assert(log_output.getvalue().strip() == message)
sys.stdout = sys.__stdout__
def test_01_safe_defaults(self):
# When log messages are sent to an unconfigured PySimLogger class, we expect that all messages are logged,
# regardless of the logging level.
self.__test_01_safe_defaults_one(log.debug, TEST_MSG_DEBUG)
self.__test_01_safe_defaults_one(log.info, TEST_MSG_INFO)
self.__test_01_safe_defaults_one(log.warning, TEST_MSG_WARNING)
self.__test_01_safe_defaults_one(log.error, TEST_MSG_ERROR)
self.__test_01_safe_defaults_one(log.critical, TEST_MSG_CRITICAL)
@staticmethod
def _test_print_callback(message):
assert(message.strip() == expected_message)
def test_02_normal(self):
# When the PySimLogger is set up with its default values, we expect formatted log messages on all logging
# levels.
global expected_message
PySimLogger.setup(self._test_print_callback)
expected_message = "DEBUG: " + TEST_MSG_DEBUG
log.debug(TEST_MSG_DEBUG)
expected_message = "INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO)
expected_message = "WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING)
expected_message = "ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR)
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL)
def test_03_verbose(self):
# When the PySimLogger is set up with its default values, we expect verbose formatted log messages on all
# logging levels.
global expected_message
PySimLogger.setup(self._test_print_callback)
PySimLogger.set_verbose(True)
frame = currentframe()
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
log.debug(TEST_MSG_DEBUG)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL)
def test_04_level(self):
# When the PySimLogger is set up with its default values, we expect formatted log messages but since we will
# limit the log level to INFO, we should not see any messages of level DEBUG
global expected_message
PySimLogger.setup(self._test_print_callback)
PySimLogger.set_level(logging.INFO)
# We test this in non verbose mode, this will also confirm that disabeling the verbose mode works.
PySimLogger.set_verbose(False)
# Debug messages should not appear
expected_message = None
log.debug(TEST_MSG_DEBUG)
# All other messages should appear normally
expected_message = "INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO)
expected_message = "WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING)
expected_message = "ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR)
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL)
if __name__ == '__main__':
unittest.main()