mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-16 18:38:32 +03:00
Compare commits
43 Commits
pmaier/put
...
neels/saip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3677e0432e | ||
|
|
d16d8c61c4 | ||
|
|
f8fb3cfdeb | ||
|
|
575d1a3158 | ||
|
|
3662285b4b | ||
|
|
b4b8582c0b | ||
|
|
e59a623201 | ||
|
|
6e31fd85f2 | ||
|
|
00fa37ebda | ||
|
|
14347ad6d4 | ||
|
|
501f237e37 | ||
|
|
2a6e498e82 | ||
|
|
4d555f4b8d | ||
|
|
c831b3c3c3 | ||
|
|
647af01c41 | ||
|
|
7d0cde74a0 | ||
|
|
f3251d3214 | ||
|
|
6b68e7b54d | ||
|
|
58aafe36c7 | ||
|
|
a9d3cf370d | ||
|
|
8785747d24 | ||
|
|
1ec0263ffc | ||
|
|
9baafc1771 | ||
|
|
588d06cd9d | ||
|
|
565deff488 | ||
|
|
dc97895447 | ||
|
|
52e84a0bad | ||
|
|
065377eb0e | ||
|
|
7711bd26fb | ||
|
|
a62b58ce2c | ||
|
|
1c622a6101 | ||
|
|
7cc607e73b | ||
|
|
b697cc497e | ||
|
|
a8f3962be3 | ||
|
|
dd42978285 | ||
|
|
90c8fa63d8 | ||
|
|
d2373008f6 | ||
|
|
c8e18ece80 | ||
|
|
50b2619a2d | ||
|
|
85145e0b6b | ||
|
|
d638757af2 | ||
|
|
22da7b1a96 | ||
|
|
8e6a19d9f0 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,5 +1,5 @@
|
||||
*.pyc
|
||||
.*.sw?
|
||||
.*.swp
|
||||
|
||||
/docs/_*
|
||||
/docs/generated
|
||||
|
||||
@@ -1,112 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# A tool to analyze the eUICC simaResponse (series of EUICCResponse)
|
||||
#
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import argparse
|
||||
from osmocom.utils import h2b, b2h
|
||||
from osmocom.tlv import bertlv_parse_one, bertlv_encode_tag, bertlv_encode_len
|
||||
from pySim.esim.saip import *
|
||||
|
||||
parser = argparse.ArgumentParser(description="""Utility program to analyze the contents of an eUICC simaResponse.""")
|
||||
parser.add_argument('SIMA_RESPONSE', help='Hexstring containing the simaResponse as received from the eUICC')
|
||||
|
||||
def split_sima_response(sima_response):
|
||||
"""split an eUICC simaResponse field into a list of EUICCResponse fields"""
|
||||
|
||||
remainder = sima_response
|
||||
result = []
|
||||
while len(remainder):
|
||||
tdict, l, v, next_remainder = bertlv_parse_one(remainder)
|
||||
rawtag = bertlv_encode_tag(tdict)
|
||||
rawlen = bertlv_encode_len(l)
|
||||
result = result + [remainder[0:len(rawtag) + len(rawlen) + l]]
|
||||
remainder = next_remainder
|
||||
return result
|
||||
|
||||
def analyze_status(status):
|
||||
"""
|
||||
Convert a status code (integer) into a human readable string
|
||||
(see eUICC Profile Package: Interoperable Format Technical Specification, section 8.11)
|
||||
"""
|
||||
|
||||
# SIMA status codes
|
||||
string_values = {0 : 'ok',
|
||||
1 : 'pe-not-supported',
|
||||
2 : 'memory-failure',
|
||||
3 : 'bad-values',
|
||||
4 : 'not-enough-memory',
|
||||
5 : 'invalid-request-format',
|
||||
6 : 'invalid-parameter',
|
||||
7 : 'runtime-not-supported',
|
||||
8 : 'lib-not-supported',
|
||||
9 : 'template-not-supported ',
|
||||
10 : 'feature-not-supported',
|
||||
11 : 'pin-code-missing',
|
||||
31 : 'unsupported-profile-version'}
|
||||
|
||||
string_value = string_values.get(status, None)
|
||||
if string_value is not None:
|
||||
return "%d = %s (SIMA status code)" % (status, string_value)
|
||||
|
||||
# ISO 7816 status words
|
||||
if status >= 24576 and status <= 28671:
|
||||
return "%d = %04x (ISO7816 status word)" % (status, status)
|
||||
elif status >= 36864 and status <= 40959:
|
||||
return "%d = %04x (ISO7816 status word)" % (status, status)
|
||||
|
||||
# Proprietary status codes
|
||||
elif status >= 40960 and status <= 65535:
|
||||
return "%d = %04x (proprietary)" % (status, status)
|
||||
|
||||
# Unknown status codes
|
||||
return "%d (unknown, proprietary?)" % status
|
||||
|
||||
def analyze_euicc_response(euicc_response):
|
||||
"""Analyze and display the contents of an EUICCResponse"""
|
||||
|
||||
print(" EUICCResponse: %s" % b2h(euicc_response))
|
||||
euicc_response_decoded = asn1.decode('EUICCResponse', euicc_response)
|
||||
|
||||
pe_status = euicc_response_decoded.get('peStatus')
|
||||
print(" peStatus:")
|
||||
for s in pe_status:
|
||||
print(" status: %s" % analyze_status(s.get('status')))
|
||||
print(" identification: %s" % str(s.get('identification', None)))
|
||||
print(" additional-information: %s" % str(s.get('additional-information', None)))
|
||||
print(" offset: %s" % str(s.get('offset', None)))
|
||||
|
||||
if euicc_response_decoded.get('profileInstallationAborted', False) is None:
|
||||
# This type is defined as profileInstallationAborted NULL OPTIONAL, so when it is present it
|
||||
# will have the value None, otherwise it is simply not present.
|
||||
print(" profileInstallationAborted: True")
|
||||
else:
|
||||
print(" profileInstallationAborted: False")
|
||||
|
||||
status_message = euicc_response_decoded.get('statusMessage', None)
|
||||
print(" statusMessage: %s" % str(status_message))
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = parser.parse_args()
|
||||
sima_response = h2b(opts.SIMA_RESPONSE);
|
||||
|
||||
print("simaResponse: %s" % b2h(sima_response))
|
||||
euicc_response_list = split_sima_response(sima_response)
|
||||
|
||||
for euicc_response in euicc_response_list:
|
||||
analyze_euicc_response(euicc_response)
|
||||
@@ -1,304 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import csv
|
||||
import sys
|
||||
import os
|
||||
import yaml
|
||||
import psycopg2
|
||||
from psycopg2.sql import Identifier, SQL
|
||||
from pathlib import Path
|
||||
from pySim.log import PySimLogger
|
||||
from packaging import version
|
||||
|
||||
log = PySimLogger.get(Path(__file__).stem)
|
||||
|
||||
class CardKeyDatabase:
|
||||
def __init__(self, config_filename: str, table_name: str, create_table: bool = False, admin: bool = False):
|
||||
"""
|
||||
Initialize database connection and set the table which shall be used as storage for the card key data.
|
||||
In case the specified table does not exist yet it can be created using the create_table_type parameter.
|
||||
|
||||
New tables are always minimal tables which follow a pre-defined table scheme. The user may extend the table
|
||||
with additional columns using the add_cols() later.
|
||||
|
||||
Args:
|
||||
tablename : name of the database table to create.
|
||||
create_table_type : type of the table to create ('UICC' or 'EUICC')
|
||||
"""
|
||||
|
||||
def user_from_config_file(config, role: str) -> tuple[str, str]:
|
||||
db_users = config.get('db_users')
|
||||
user = db_users.get(role)
|
||||
if user is None:
|
||||
raise ValueError("user for role '%s' not set up in config file." % role)
|
||||
return user.get('name'), user.get('pass')
|
||||
|
||||
self.table = table_name.lower()
|
||||
self.cols = None
|
||||
|
||||
# Depending on the table type, the table name must contain either the substring "uicc_keys" or "euicc_keys".
|
||||
# This convention will allow us to deduct the table type from the table name.
|
||||
if "euicc_keys" not in table_name and "uicc_keys" not in table_name:
|
||||
raise ValueError("Table name (%s) should contain the substring \"uicc_keys\" or \"euicc_keys\"" % table_name)
|
||||
|
||||
# Read config file
|
||||
log.info("Using config file: %s", config_filename)
|
||||
with open(config_filename, "r") as cfg:
|
||||
config = yaml.load(cfg, Loader=yaml.FullLoader)
|
||||
host = config.get('host')
|
||||
log.info("Database host: %s", host)
|
||||
db_name = config.get('db_name')
|
||||
log.info("Database name: %s", db_name)
|
||||
table_names = config.get('table_names')
|
||||
username_admin, password_admin = user_from_config_file(config, 'admin')
|
||||
username_importer, password_importer = user_from_config_file(config, 'importer')
|
||||
username_reader, _ = user_from_config_file(config, 'reader')
|
||||
|
||||
# Switch between admin and importer user
|
||||
if admin:
|
||||
username, password = username_admin, password_admin
|
||||
else:
|
||||
username, password = username_importer, password_importer
|
||||
|
||||
# Create database connection
|
||||
log.info("Database user: %s", username)
|
||||
self.conn = psycopg2.connect(dbname=db_name, user=username, password=password, host=host)
|
||||
self.cur = self.conn.cursor()
|
||||
|
||||
# In the context of this tool it is not relevant if the table name is present in the config file. However,
|
||||
# pySim-shell.py will require the table name to be configured properly to access the database table.
|
||||
if self.table not in table_names:
|
||||
log.warning("Specified table name (%s) is not yet present in config file (required for access from pySim-shell.py)",
|
||||
self.table)
|
||||
|
||||
# Create a new minimal database table of the specified table type.
|
||||
if create_table:
|
||||
if not admin:
|
||||
raise ValueError("creation of new table refused, use option --admin and try again.")
|
||||
if "euicc_keys" in self.table:
|
||||
self.__create_table(username_reader, username_importer, ['EID'])
|
||||
elif "uicc_keys" in self.table:
|
||||
self.__create_table(username_reader, username_importer, ['ICCID', 'IMSI'])
|
||||
|
||||
# Ensure a table with the specified name exists
|
||||
log.info("Database table: %s", self.table)
|
||||
if self.get_cols() == []:
|
||||
raise ValueError("Table name (%s) does not exist yet" % self.table)
|
||||
log.info("Database table columns: %s", str(self.get_cols()))
|
||||
|
||||
def __create_table(self, user_reader:str, user_importer:str, cols:list[str]):
|
||||
"""
|
||||
Initialize a new table. New tables are always minimal tables with one primary key and additional index columns.
|
||||
Non index-columns may be added later using method _update_cols().
|
||||
"""
|
||||
|
||||
# Create table columns with primary key
|
||||
query = SQL("CREATE TABLE {} ({} VARCHAR PRIMARY KEY").format(Identifier(self.table),
|
||||
Identifier(cols[0].lower()))
|
||||
for c in cols[1:]:
|
||||
query += SQL(", {} VARCHAR").format(Identifier(c.lower()))
|
||||
query += SQL(");")
|
||||
self.cur.execute(query)
|
||||
|
||||
# Create indexes for all other columns
|
||||
for c in cols[1:]:
|
||||
self.cur.execute(query = SQL("CREATE INDEX {} ON {}({});").format(Identifier(c.lower()),
|
||||
Identifier(self.table),
|
||||
Identifier(c.lower())))
|
||||
|
||||
# Set permissions
|
||||
self.cur.execute(SQL("GRANT INSERT ON {} TO {};").format(Identifier(self.table),
|
||||
Identifier(user_importer)))
|
||||
self.cur.execute(SQL("GRANT SELECT ON {} TO {};").format(Identifier(self.table),
|
||||
Identifier(user_reader)))
|
||||
|
||||
log.info("New database table created: %s", self.table)
|
||||
|
||||
def get_cols(self) -> list[str]:
|
||||
"""
|
||||
Get a list of all columns available in the current table scheme.
|
||||
|
||||
Returns:
|
||||
list with column names (in uppercase) of the database table
|
||||
"""
|
||||
|
||||
# Return cached col list if present
|
||||
if self.cols:
|
||||
return self.cols
|
||||
|
||||
# Request a list of current cols from the database
|
||||
self.cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (self.table,))
|
||||
|
||||
cols_result = self.cur.fetchall()
|
||||
cols = []
|
||||
for c in cols_result:
|
||||
cols.append(c[0].upper())
|
||||
self.cols = cols
|
||||
return cols
|
||||
|
||||
def get_missing_cols(self, cols_expected:list[str]) -> list[str]:
|
||||
"""
|
||||
Check if the current table scheme lacks any of the given expected columns.
|
||||
|
||||
Returns:
|
||||
list with the missing columns.
|
||||
"""
|
||||
|
||||
cols_present = self.get_cols()
|
||||
return list(set(cols_expected) - set(cols_present))
|
||||
|
||||
def add_cols(self, cols:list[str]):
|
||||
"""
|
||||
Update the current table scheme with additional columns. In case the updated columns are already exist, the
|
||||
table schema is not changed.
|
||||
|
||||
Args:
|
||||
table : name of the database table to alter
|
||||
cols : list with updated colum names to add
|
||||
"""
|
||||
|
||||
cols_missing = self.get_missing_cols(cols)
|
||||
|
||||
# Depending on the table type (see constructor), we either have a primary key 'ICCID' (for UICC data), or 'EID'
|
||||
# (for eUICC data). Both table formats different types of data and have rather differen columns also. Let's
|
||||
# prevent the excidentally mixing of both types.
|
||||
if 'ICCID' in cols_missing:
|
||||
raise ValueError("Table %s stores eUCCC key material, refusing to add UICC specific column 'ICCID'" % self.table)
|
||||
if 'EID' in cols_missing:
|
||||
raise ValueError("Table %s stores UCCC key material, refusing to add eUICC specific column 'EID'" % self.table)
|
||||
|
||||
# Add the missing columns to the table
|
||||
self.cols = None
|
||||
for c in cols_missing:
|
||||
self.cur.execute(query = SQL("ALTER TABLE {} ADD {} VARCHAR;").format(Identifier(self.table),
|
||||
Identifier(c.lower())))
|
||||
|
||||
def insert_row(self, row:dict[str, str]):
|
||||
"""
|
||||
Insert a new row into the database table.
|
||||
|
||||
Args:
|
||||
row : dictionary with the colum names and their designated values
|
||||
"""
|
||||
|
||||
# Check if the row is compatible with the current table scheme
|
||||
cols_expected = list(row.keys())
|
||||
cols_missing = self.get_missing_cols(cols_expected)
|
||||
if cols_missing != []:
|
||||
raise ValueError("table %s has incompatible format, the row %s contains unknown cols %s" %
|
||||
(self.table, str(row), str(cols_missing)))
|
||||
|
||||
# Insert row into datbase table
|
||||
row_keys = list(row.keys())
|
||||
row_values = list(row.values())
|
||||
query = SQL("INSERT INTO {} ").format(Identifier(self.table))
|
||||
query += SQL("({} ").format(Identifier(row_keys[0].lower()))
|
||||
for k in row_keys[1:]:
|
||||
query += SQL(", {}").format(Identifier(k.lower()))
|
||||
query += SQL(") VALUES (%s")
|
||||
for v in row_values[1:]:
|
||||
query += SQL(", %s")
|
||||
query += SQL(");")
|
||||
self.cur.execute(query, row_values)
|
||||
|
||||
def commit(self):
|
||||
self.conn.commit()
|
||||
log.info("Changes to table %s committed!", self.table)
|
||||
|
||||
def open_csv(opts: argparse.Namespace):
|
||||
log.info("CSV file: %s", opts.csv)
|
||||
csv_file = open(opts.csv, 'r')
|
||||
cr = csv.DictReader(csv_file)
|
||||
if not cr:
|
||||
raise RuntimeError("could not open DictReader for CSV-File '%s'" % opts.csv)
|
||||
cr.fieldnames = [field.upper() for field in cr.fieldnames]
|
||||
log.info("CSV file columns: %s", str(cr.fieldnames))
|
||||
return cr
|
||||
|
||||
def open_db(cr: csv.DictReader, opts: argparse.Namespace) -> CardKeyDatabase:
|
||||
try:
|
||||
db = CardKeyDatabase(os.path.expanduser(opts.pgsql), opts.table_name, opts.create_table, opts.admin)
|
||||
|
||||
# Check CSV format against table schema, add missing columns
|
||||
cols_missing = db.get_missing_cols(cr.fieldnames)
|
||||
if cols_missing != [] and (opts.update_columns or opts.create_table):
|
||||
log.info("Adding missing columns: %s", str(cols_missing))
|
||||
db.add_cols(cols_missing)
|
||||
cols_missing = db.get_missing_cols(cr.fieldnames)
|
||||
|
||||
# Make sure the table schema has no missing columns
|
||||
if cols_missing != []:
|
||||
log.error("Database table lacks CSV file columns: %s -- import aborted!", cols_missing)
|
||||
sys.exit(2)
|
||||
except Exception as e:
|
||||
log.error(str(e).strip())
|
||||
log.error("Database initialization aborted due to error!")
|
||||
sys.exit(2)
|
||||
|
||||
return db
|
||||
|
||||
def import_from_csv(db: CardKeyDatabase, cr: csv.DictReader):
|
||||
count = 0
|
||||
for row in cr:
|
||||
try:
|
||||
db.insert_row(row)
|
||||
count+=1
|
||||
if count % 100 == 0:
|
||||
log.info("CSV file import in progress, %d rows imported...", count)
|
||||
except Exception as e:
|
||||
log.error(str(e).strip())
|
||||
log.error("CSV file import aborted due to error, no datasets committed!")
|
||||
sys.exit(2)
|
||||
log.info("CSV file import done, %d rows imported", count)
|
||||
|
||||
if __name__ == '__main__':
|
||||
option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
|
||||
option_parser.add_argument('--pgsql', metavar='FILE',
|
||||
default="~/.osmocom/pysim/card_data_pgsql.cfg",
|
||||
help='Read card data from PostgreSQL database (config file)')
|
||||
option_parser.add_argument('--csv', metavar='FILE', help='input CSV file with card data', required=True)
|
||||
option_parser.add_argument("--table-name", help="name of the card key table", type=str, required=True)
|
||||
option_parser.add_argument("--update-columns", help="add missing table columns", action='store_true', default=False)
|
||||
option_parser.add_argument("--create-table", action='store_true', help="create new card key table", default=False)
|
||||
option_parser.add_argument("--admin", action='store_true', help="perform action as admin", default=False)
|
||||
opts = option_parser.parse_args()
|
||||
|
||||
PySimLogger.setup(print, {logging.WARN: "\033[33m"})
|
||||
if (opts.verbose):
|
||||
PySimLogger.set_verbose(True)
|
||||
PySimLogger.set_level(logging.DEBUG)
|
||||
|
||||
# Open CSV file
|
||||
cr = open_csv(opts)
|
||||
|
||||
# Open database, create initial table, update column scheme
|
||||
db = open_db(cr, opts)
|
||||
|
||||
# Progress with import
|
||||
if not opts.admin:
|
||||
import_from_csv(db, cr)
|
||||
|
||||
# Commit changes to the database
|
||||
db.commit()
|
||||
@@ -1,100 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2026 by sysmocom - s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import logging
|
||||
import json
|
||||
import asn1tools
|
||||
import asn1tools.codecs.ber
|
||||
import asn1tools.codecs.der
|
||||
import pySim.esim.rsp as rsp
|
||||
import pySim.esim.saip as saip
|
||||
from pySim.esim.es2p import param, Es2pApiServerMno, Es2pApiServerHandlerMno
|
||||
from osmocom.utils import b2h
|
||||
from datetime import datetime
|
||||
from analyze_simaResponse import split_sima_response
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(Path(__file__).stem)
|
||||
|
||||
parser = argparse.ArgumentParser(description="""
|
||||
Utility to receive and log requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
|
||||
parser.add_argument("--host", help="Host/IP to bind HTTP(S) to", default="localhost")
|
||||
parser.add_argument("--port", help="TCP port to bind HTTP(S) to", default=443, type=int)
|
||||
parser.add_argument('--server-cert', help='X.509 server certificate used to provide the ES2+ HTTPs service')
|
||||
parser.add_argument('--client-ca-cert', help='X.509 CA certificates to authenticate the requesting client(s)')
|
||||
parser.add_argument("-v", "--verbose", help="enable debug output", action='store_true', default=False)
|
||||
|
||||
def decode_sima_response(sima_response):
|
||||
decoded = []
|
||||
euicc_response_list = split_sima_response(sima_response)
|
||||
for euicc_response in euicc_response_list:
|
||||
decoded.append(saip.asn1.decode('EUICCResponse', euicc_response))
|
||||
return decoded
|
||||
|
||||
def decode_result_data(result_data):
|
||||
return rsp.asn1.decode('PendingNotification', result_data)
|
||||
|
||||
def decode(data, path="/"):
|
||||
if data is None:
|
||||
return 'none'
|
||||
elif type(data) is datetime:
|
||||
return data.isoformat()
|
||||
elif type(data) is tuple:
|
||||
return {str(data[0]) : decode(data[1], path + str(data[0]) + "/")}
|
||||
elif type(data) is list:
|
||||
new_data = []
|
||||
for item in data:
|
||||
new_data.append(decode(item, path))
|
||||
return new_data
|
||||
elif type(data) is bytes:
|
||||
return b2h(data)
|
||||
elif type(data) is dict:
|
||||
new_data = {}
|
||||
for key, item in data.items():
|
||||
new_key = str(key)
|
||||
if path == '/' and new_key == 'resultData':
|
||||
new_item = decode_result_data(item)
|
||||
elif (path == '/resultData/profileInstallationResult/profileInstallationResultData/finalResult/successResult/' \
|
||||
or path == '/resultData/profileInstallationResult/profileInstallationResultData/finalResult/errorResult/') \
|
||||
and new_key == 'simaResponse':
|
||||
new_item = decode_sima_response(item)
|
||||
else:
|
||||
new_item = item
|
||||
new_data[new_key] = decode(new_item, path + new_key + "/")
|
||||
return new_data
|
||||
else:
|
||||
return data
|
||||
|
||||
class Es2pApiServerHandlerForLogging(Es2pApiServerHandlerMno):
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
|
||||
logging.info("ES2+:handleDownloadProgressInfo: %s" % json.dumps(decode(data)))
|
||||
return {}, None
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING,
|
||||
format='%(asctime)s %(levelname)s %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S')
|
||||
|
||||
Es2pApiServerMno(args.port, args.host, Es2pApiServerHandlerForLogging(), args.server_cert, args.client_ca_cert)
|
||||
|
||||
@@ -126,14 +126,14 @@ class Es9pClient:
|
||||
if self.opts.iccid:
|
||||
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid))
|
||||
|
||||
if self.opts.operation == 'install':
|
||||
if self.opts.operation == 'download':
|
||||
pird = {
|
||||
'transactionId': h2b(self.opts.transaction_id),
|
||||
'transactionId': self.opts.transaction_id,
|
||||
'notificationMetadata': ntf_metadata,
|
||||
'smdpOid': self.opts.smdpp_oid,
|
||||
'finalResult': ('successResult', {
|
||||
'aid': h2b(self.opts.isdp_aid),
|
||||
'simaResponse': h2b(self.opts.sima_response),
|
||||
'aid': self.opts.isdp_aid,
|
||||
'simaResponse': self.opts.sima_response,
|
||||
}),
|
||||
}
|
||||
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
||||
|
||||
@@ -42,9 +42,6 @@ case "$JOB_TYPE" in
|
||||
|
||||
# Run pySim-shell integration tests (requires physical cards)
|
||||
python3 -m unittest discover -v -s ./tests/pySim-shell_test/
|
||||
|
||||
# Run pySim-smpp2sim test
|
||||
tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
|
||||
;;
|
||||
"distcheck")
|
||||
virtualenv -p python3 venv --system-site-packages
|
||||
|
||||
@@ -107,7 +107,7 @@ parser_esrv.add_argument('--output-file', required=True, help='Output file name'
|
||||
parser_esrv.add_argument('--add-flag', default=[], choices=esrv_flag_choices, action='append', help='Add flag to mandatory services list')
|
||||
parser_esrv.add_argument('--remove-flag', default=[], choices=esrv_flag_choices, action='append', help='Remove flag from mandatory services list')
|
||||
|
||||
parser_tree = subparsers.add_parser('tree', help='Display the filesystem tree')
|
||||
parser_info = subparsers.add_parser('tree', help='Display the filesystem tree')
|
||||
|
||||
def write_pes(pes: ProfileElementSequence, output_file:str):
|
||||
"""write the PE sequence to a file"""
|
||||
|
||||
@@ -1,240 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2026 by sysmocom - s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Harald Welte, Philipp Maier
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import smpplib.gsm
|
||||
import smpplib.client
|
||||
import smpplib.consts
|
||||
import time
|
||||
from pySim.ota import OtaKeyset, OtaDialectSms, OtaAlgoCrypt, OtaAlgoAuth, CNTR_REQ, RC_CC_DS, POR_REQ
|
||||
from pySim.utils import b2h, h2b, is_hexstr
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(Path(__file__).stem)
|
||||
|
||||
option_parser = argparse.ArgumentParser(description='Tool to send OTA SMS RFM/RAM messages via SMPP',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
option_parser.add_argument("--host", help="Host/IP of the SMPP server", default="localhost")
|
||||
option_parser.add_argument("--port", help="TCP port of the SMPP server", default=2775, type=int)
|
||||
option_parser.add_argument("--system-id", help="System ID to use to bind to the SMPP server", default="test")
|
||||
option_parser.add_argument("--password", help="Password to use to bind to the SMPP server", default="test")
|
||||
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
|
||||
algo_crypt_choices = []
|
||||
algo_crypt_classes = OtaAlgoCrypt.__subclasses__()
|
||||
for cls in algo_crypt_classes:
|
||||
algo_crypt_choices.append(cls.enum_name)
|
||||
option_parser.add_argument("--algo-crypt", choices=algo_crypt_choices, default='triple_des_cbc2',
|
||||
help="OTA crypt algorithm")
|
||||
algo_auth_choices = []
|
||||
algo_auth_classes = OtaAlgoAuth.__subclasses__()
|
||||
for cls in algo_auth_classes:
|
||||
algo_auth_choices.append(cls.enum_name)
|
||||
option_parser.add_argument("--algo-auth", choices=algo_auth_choices, default='triple_des_cbc2',
|
||||
help="OTA auth algorithm")
|
||||
option_parser.add_argument('--kic', required=True, type=is_hexstr, help='OTA key (KIC)')
|
||||
option_parser.add_argument('--kic-idx', default=1, type=int, help='OTA key index (KIC)')
|
||||
option_parser.add_argument('--kid', required=True, type=is_hexstr, help='OTA key (KID)')
|
||||
option_parser.add_argument('--kid-idx', default=1, type=int, help='OTA key index (KID)')
|
||||
option_parser.add_argument('--cntr', default=0, type=int, help='replay protection counter')
|
||||
option_parser.add_argument('--tar', required=True, type=is_hexstr, help='Toolkit Application Reference')
|
||||
option_parser.add_argument("--cntr-req", choices=CNTR_REQ.decmapping.values(), default='no_counter',
|
||||
help="Counter requirement")
|
||||
option_parser.add_argument('--no-ciphering', action='store_true', default=False, help='Disable ciphering')
|
||||
option_parser.add_argument("--rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="message check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument('--por-in-submit', action='store_true', default=False,
|
||||
help='require PoR to be sent via SMS-SUBMIT')
|
||||
option_parser.add_argument('--por-no-ciphering', action='store_true', default=False, help='Disable ciphering (PoR)')
|
||||
option_parser.add_argument("--por-rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="PoR check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument("--por-req", choices=POR_REQ.decmapping.values(), default='por_required',
|
||||
help="Proof of Receipt requirements")
|
||||
option_parser.add_argument('--src-addr', default='12', type=str, help='SMS source address (MSISDN)')
|
||||
option_parser.add_argument('--dest-addr', default='23', type=str, help='SMS destination address (MSISDN)')
|
||||
option_parser.add_argument('--timeout', default=10, type=int, help='Maximum response waiting time')
|
||||
option_parser.add_argument('-a', '--apdu', action='append', required=True, type=is_hexstr, help='C-APDU to send')
|
||||
|
||||
class SmppHandler:
|
||||
client = None
|
||||
|
||||
def __init__(self, host: str, port: int,
|
||||
system_id: str, password: str,
|
||||
ota_keyset: OtaKeyset, spi: dict, tar: bytes):
|
||||
"""
|
||||
Initialize connection to SMPP server and set static OTA SMS-TPDU ciphering parameters
|
||||
Args:
|
||||
host : Hostname or IPv4/IPv6 address of the SMPP server
|
||||
port : TCP Port of the SMPP server
|
||||
system_id: SMPP System-ID used by ESME (client) to bind
|
||||
password: SMPP Password used by ESME (client) to bind
|
||||
ota_keyset: OTA keyset to be used for SMS-TPDU ciphering
|
||||
spi: Security Parameter Indicator (SPI) to be used for SMS-TPDU ciphering
|
||||
tar: Toolkit Application Reference (TAR) of the targeted card application
|
||||
"""
|
||||
|
||||
# Create and connect SMPP client
|
||||
client = smpplib.client.Client(host, port, allow_unknown_opt_params=True)
|
||||
client.set_message_sent_handler(self.message_sent_handler)
|
||||
client.set_message_received_handler(self.message_received_handler)
|
||||
client.connect()
|
||||
client.bind_transceiver(system_id=system_id, password=password)
|
||||
self.client = client
|
||||
|
||||
# Setup static OTA parameters
|
||||
self.ota_dialect = OtaDialectSms()
|
||||
self.ota_keyset = ota_keyset
|
||||
self.tar = tar
|
||||
self.spi = spi
|
||||
|
||||
def __del__(self):
|
||||
if self.client:
|
||||
self.client.unbind()
|
||||
self.client.disconnect()
|
||||
|
||||
def message_received_handler(self, pdu):
|
||||
if pdu.short_message:
|
||||
logger.info("SMS-TPDU received: %s", b2h(pdu.short_message))
|
||||
try:
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
|
||||
except ValueError:
|
||||
# Retry to decoding with ciphering disabled (in case the card has problems to decode the SMS-TDPU
|
||||
# we have sent, the response will contain an unencrypted error message)
|
||||
spi = self.spi.copy()
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
|
||||
logger.info("SMS-TPDU decoded: %s", dec)
|
||||
self.response = dec
|
||||
return None
|
||||
|
||||
def message_sent_handler(self, pdu):
|
||||
logger.debug("SMS-TPDU sent: pdu_sequence=%s pdu_message_id=%s", pdu.sequence, pdu.message_id)
|
||||
|
||||
def transceive_sms_tpdu(self, tpdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple:
|
||||
"""
|
||||
Transceive SMS-TPDU. This method sends the SMS-TPDU to the SMPP server, and waits for a response. The method
|
||||
returns when the response is received.
|
||||
|
||||
Args:
|
||||
tpdu : short message content (plaintext)
|
||||
src_addr : short message source address
|
||||
dest_addr : short message destination address
|
||||
timeout : timeout after which this method should give up waiting for a response
|
||||
Returns:
|
||||
tuple containing the response (plaintext)
|
||||
"""
|
||||
|
||||
logger.info("SMS-TPDU sending: %s...", b2h(tpdu))
|
||||
|
||||
self.client.send_message(
|
||||
# TODO: add parameters to switch source_addr_ton and dest_addr_ton between SMPP_TON_INTL and SMPP_NPI_ISDN
|
||||
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
source_addr=src_addr,
|
||||
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
destination_addr=dest_addr,
|
||||
short_message=tpdu,
|
||||
# TODO: add parameters to set data_coding and esm_class
|
||||
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
|
||||
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
|
||||
protocol_id=0x7f,
|
||||
# TODO: add parameter to use registered delivery
|
||||
# registered_delivery=True,
|
||||
)
|
||||
|
||||
logger.info("SMS-TPDU sent, waiting for response...")
|
||||
timestamp_sent=int(time.time())
|
||||
self.response = None
|
||||
while self.response is None:
|
||||
self.client.poll()
|
||||
if int(time.time()) - timestamp_sent > timeout:
|
||||
raise ValueError("Timeout reached, no response SMS-TPDU received!")
|
||||
return self.response
|
||||
|
||||
def transceive_apdu(self, apdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple[bytes, bytes]:
|
||||
"""
|
||||
Transceive APDU. This method wraps the given APDU into an SMS-TPDU, sends it to the SMPP server and waits for
|
||||
the response. When the response is received, the last response data and the last status word is extracted from
|
||||
the response and returned to the caller.
|
||||
|
||||
Args:
|
||||
apdu : one or more concatenated APDUs
|
||||
src_addr : short message source address
|
||||
dest_addr : short message destination address
|
||||
timeout : timeout after which this method should give up waiting for a response
|
||||
Returns:
|
||||
tuple containing the last response data and the last status word as byte strings
|
||||
"""
|
||||
|
||||
logger.info("C-APDU sending: %s...", b2h(apdu))
|
||||
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
# add user data header
|
||||
tpdu = b'\x02\x70\x00' + secured
|
||||
# send via SMPP
|
||||
response = self.transceive_sms_tpdu(tpdu, src_addr, dest_addr, timeout)
|
||||
|
||||
# Extract last_response_data and last_status_word from the response
|
||||
sw = None
|
||||
resp = None
|
||||
for container in response:
|
||||
if container:
|
||||
container_dict = dict(container)
|
||||
resp = container_dict.get('last_response_data')
|
||||
sw = container_dict.get('last_status_word')
|
||||
if resp is None:
|
||||
raise ValueError("Response does not contain any last_response_data, no R-APDU received!")
|
||||
if sw is None:
|
||||
raise ValueError("Response does not contain any last_status_word, no R-APDU received!")
|
||||
|
||||
logger.info("R-APDU received: %s %s", resp, sw)
|
||||
return h2b(resp), h2b(sw)
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = option_parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO,
|
||||
format='%(asctime)s %(levelname)s %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S')
|
||||
|
||||
if opts.kic_idx != opts.kid_idx:
|
||||
logger.warning("KIC index (%s) and KID index (%s) are different (security violation, card should reject message)",
|
||||
opts.kic_idx, opts.kid_idx)
|
||||
|
||||
ota_keyset = OtaKeyset(algo_crypt=opts.algo_crypt,
|
||||
kic_idx=opts.kic_idx,
|
||||
kic=h2b(opts.kic),
|
||||
algo_auth=opts.algo_auth,
|
||||
kid_idx=opts.kid_idx,
|
||||
kid=h2b(opts.kid),
|
||||
cntr=opts.cntr)
|
||||
spi = {'counter' : opts.cntr_req,
|
||||
'ciphering' : not opts.no_ciphering,
|
||||
'rc_cc_ds': opts.rc_cc_ds,
|
||||
'por_in_submit': opts.por_in_submit,
|
||||
'por_shall_be_ciphered': not opts.por_no_ciphering,
|
||||
'por_rc_cc_ds': opts.por_rc_cc_ds,
|
||||
'por': opts.por_req}
|
||||
apdu = h2b("".join(opts.apdu))
|
||||
|
||||
smpp_handler = SmppHandler(opts.host, opts.port, opts.system_id, opts.password, ota_keyset, spi, h2b(opts.tar))
|
||||
resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr, opts.dest_addr, opts.timeout)
|
||||
print("%s %s" % (b2h(resp), b2h(sw)))
|
||||
@@ -1,4 +1,4 @@
|
||||
Retrieving card-individual keys via CardKeyProvider
|
||||
Retrieving card-individual keys via CardKeyProvider
|
||||
===================================================
|
||||
|
||||
When working with a batch of cards, or more than one card in general, it
|
||||
@@ -20,11 +20,9 @@ example develop your own CardKeyProvider that queries some kind of
|
||||
database for the key material, or that uses a key derivation function to
|
||||
derive card-specific key material from a global master key.
|
||||
|
||||
pySim already includes two CardKeyProvider implementations. One to retrieve
|
||||
key material from a CSV file (`CardKeyProviderCsv`) and a second one that allows
|
||||
to retrieve the key material from a PostgreSQL database (`CardKeyProviderPgsql`).
|
||||
Both implementations equally implement a column encryption scheme that allows
|
||||
to protect sensitive columns using a *transport key*
|
||||
The only actual CardKeyProvider implementation included in pySim is the
|
||||
`CardKeyProviderCsv` which retrieves the key material from a
|
||||
[potentially encrypted] CSV file.
|
||||
|
||||
|
||||
The CardKeyProviderCsv
|
||||
@@ -42,224 +40,11 @@ of pySim-shell. If you do not specify a CSV file, pySim will attempt to
|
||||
open a CSV file from the default location at
|
||||
`~/.osmocom/pysim/card_data.csv`, and use that, if it exists.
|
||||
|
||||
The `CardKeyProviderCsv` is suitable to manage small amounts of key material
|
||||
locally. However, if your card inventory is very large and the key material
|
||||
must be made available on multiple sites, the `CardKeyProviderPgsql` is the
|
||||
better option.
|
||||
|
||||
|
||||
The CardKeyProviderPgsql
|
||||
------------------------
|
||||
|
||||
With the `CardKeyProviderPgsql` you can use a PostgreSQL database as storage
|
||||
medium. The implementation comes with a CSV importer tool that consumes the
|
||||
same CSV files you would normally use with the `CardKeyProviderCsv`, so you
|
||||
can just use your existing CSV files and import them into the database.
|
||||
|
||||
|
||||
Requirements
|
||||
^^^^^^^^^^^^
|
||||
|
||||
The `CardKeyProviderPgsql` uses the `Psycopg` PostgreSQL database adapter
|
||||
(https://www.psycopg.org). `Psycopg` is not part of the default requirements
|
||||
of pySim-shell and must be installed separately. `Psycopg` is available as
|
||||
Python package under the name `psycopg2-binary`.
|
||||
|
||||
|
||||
Setting up the database
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
From the perspective of the database, the `CardKeyProviderPgsql` has only
|
||||
minimal requirements. You do not have to create any tables in advance. An empty
|
||||
database and at least one user that may create, alter and insert into tables is
|
||||
sufficient. However, for increased reliability and as a protection against
|
||||
incorrect operation, the `CardKeyProviderPgsql` supports a hierarchical model
|
||||
with three users (or roles):
|
||||
|
||||
* **admin**:
|
||||
This should be the owner of the database. It is intended to be used for
|
||||
administrative tasks like adding new tables or adding new columns to existing
|
||||
tables. This user should not be used to insert new data into tables or to access
|
||||
data from within pySim-shell using the `CardKeyProviderPgsql`
|
||||
|
||||
* **importer**:
|
||||
This user is used when feeding new data into an existing table. It should only
|
||||
be able to insert new rows into existing tables. It should not be used for
|
||||
administrative tasks or to access data from within pySim-shell using the
|
||||
`CardKeyProviderPgsql`
|
||||
|
||||
* **reader**:
|
||||
To access data from within pySim shell using the `CardKeyProviderPgsql` the
|
||||
reader user is the correct one to use. This user should have no write access
|
||||
to the database or any of the tables.
|
||||
|
||||
|
||||
Creating a config file
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The default location for the config file is `~/.osmocom/pysim/card_data_pgsql.cfg`
|
||||
The file uses `yaml` syntax and should look like the example below:
|
||||
|
||||
::
|
||||
|
||||
host: "127.0.0.1"
|
||||
db_name: "my_database"
|
||||
table_names:
|
||||
- "uicc_keys"
|
||||
- "euicc_keys"
|
||||
db_users:
|
||||
admin:
|
||||
name: "my_admin_user"
|
||||
pass: "my_admin_password"
|
||||
importer:
|
||||
name: "my_importer_user"
|
||||
pass: "my_importer_password"
|
||||
reader:
|
||||
name: "my_reader_user"
|
||||
pass: "my_reader_password"
|
||||
|
||||
This file is used by pySim-shell and by the importer tool. Both expect the file
|
||||
in the aforementioned location. In case you want to store the file in a
|
||||
different location you may use the `--pgsql` commandline option to provide a
|
||||
custom config file path.
|
||||
|
||||
The hostname and the database name for the PostgreSQL database is set with the
|
||||
`host` and `db_name` fields. The field `db_users` sets the user names and
|
||||
passwords for each of the aforementioned users (or roles). In case only a single
|
||||
admin user is used, all three entries may be populated with the same user name
|
||||
and password (not recommended)
|
||||
|
||||
The field `table_names` sets the tables that the `CardKeyProviderPgsql` shall
|
||||
use to query to locate card key data. You can set up as many tables as you
|
||||
want, `CardKeyProviderPgsql` will query them in order, one by one until a
|
||||
matching entry is found.
|
||||
|
||||
NOTE: In case you do not want to disclose the admin and the importer credentials
|
||||
to pySim-shell you may remove those lines. pySim-shell will only require the
|
||||
`reader` entry under `db_users`.
|
||||
|
||||
|
||||
Using the Importer
|
||||
^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Before data can be imported, you must first create a database table. Tables
|
||||
are created with the provided importer tool, which can be found under
|
||||
`contrib/csv-to-pgsql.py`. This tool is used to create the database table and
|
||||
read the data from the provided CSV file into the database.
|
||||
|
||||
As mentioned before, all CSV file formats that work with `CardKeyProviderCsv`
|
||||
may be used. To demonstrate how the import process works, let's assume you want
|
||||
to import a CSV file format that looks like the following example. Let's also
|
||||
assume that you didn't get the Global Platform keys from your card vendor for
|
||||
this batch of UICC cards, so your CSV file lacks the columns for those fields.
|
||||
|
||||
::
|
||||
|
||||
"id","imsi","iccid","acc","pin1","puk1","pin2","puk2","ki","opc","adm1"
|
||||
"card1","999700000000001","8900000000000000001","0001","1111","11111111","0101","01010101","11111111111111111111111111111111","11111111111111111111111111111111","11111111"
|
||||
"card2","999700000000002","8900000000000000002","0002","2222","22222222","0202","02020202","22222222222222222222222222222222","22222222222222222222222222222222","22222222"
|
||||
"card3","999700000000003","8900000000000000003","0003","3333","22222222","0303","03030303","33333333333333333333333333333333","33333333333333333333333333333333","33333333"
|
||||
|
||||
Since this is your first import, the database still lacks the table. To
|
||||
instruct the importer to create a new table, you may use the `--create-table`
|
||||
option. You also have to pick an appropriate name for the table. Any name may
|
||||
be chosen as long as it contains the string `uicc_keys` or `euicc_keys`,
|
||||
depending on the type of data (`UICC` or `eUICC`) you intend to store in the
|
||||
table. The creation of the table is an administrative task and can only be done
|
||||
with the `admin` user. The `admin` user is selected using the `--admin` switch.
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys --create-table --admin
|
||||
INFO: CSV file: ./csv-to-pgsql_example_01.csv
|
||||
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
|
||||
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
|
||||
INFO: Database host: 127.0.0.1
|
||||
INFO: Database name: my_database
|
||||
INFO: Database user: my_admin_user
|
||||
INFO: New database table created: uicc_keys
|
||||
INFO: Database table: uicc_keys
|
||||
INFO: Database table columns: ['ICCID', 'IMSI']
|
||||
INFO: Adding missing columns: ['PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
|
||||
INFO: Changes to table uicc_keys committed!
|
||||
|
||||
The importer has created a new table with the name `uicc_keys`. The table is
|
||||
now ready to be filled with data.
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_01.csv --table-name uicc_keys
|
||||
INFO: CSV file: ./csv-to-pgsql_example_01.csv
|
||||
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1']
|
||||
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
|
||||
INFO: Database host: 127.0.0.1
|
||||
INFO: Database name: my_database
|
||||
INFO: Database user: my_importer_user
|
||||
INFO: Database table: uicc_keys
|
||||
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
|
||||
INFO: CSV file import done, 3 rows imported
|
||||
INFO: Changes to table uicc_keys committed!
|
||||
|
||||
A quick `SELECT * FROM uicc_keys;` at the PostgreSQL console should now display
|
||||
the contents of the CSV file you have fed into the importer.
|
||||
|
||||
Let's now assume that with your next batch of UICC cards your vendor includes
|
||||
the Global Platform keys so your CSV format changes. It may now look like this:
|
||||
|
||||
::
|
||||
|
||||
"id","imsi","iccid","acc","pin1","puk1","pin2","puk2","ki","opc","adm1","scp02_dek_1","scp02_enc_1","scp02_mac_1"
|
||||
"card4","999700000000004","8900000000000000004","0004","4444","44444444","0404","04040404","44444444444444444444444444444444","44444444444444444444444444444444","44444444","44444444444444444444444444444444","44444444444444444444444444444444","44444444444444444444444444444444"
|
||||
"card5","999700000000005","8900000000000000005","0005","4444","55555555","0505","05050505","55555555555555555555555555555555","55555555555555555555555555555555","55555555","55555555555555555555555555555555","55555555555555555555555555555555","55555555555555555555555555555555"
|
||||
"card6","999700000000006","8900000000000000006","0006","4444","66666666","0606","06060606","66666666666666666666666666666666","66666666666666666666666666666666","66666666","66666666666666666666666666666666","66666666666666666666666666666666","66666666666666666666666666666666"
|
||||
|
||||
When importing data from an updated CSV format the database table also has
|
||||
to be updated. This is done using the `--update-columns` switch. Like when
|
||||
creating new tables, this operation also requires admin privileges, so the
|
||||
`--admin` switch is required again.
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys --update-columns --admin
|
||||
INFO: CSV file: ./csv-to-pgsql_example_02.csv
|
||||
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
|
||||
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
|
||||
INFO: Database host: 127.0.0.1
|
||||
INFO: Database name: my_database
|
||||
INFO: Database user: my_admin_user
|
||||
INFO: Database table: uicc_keys
|
||||
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC']
|
||||
INFO: Adding missing columns: ['SCP02_ENC_1', 'SCP02_MAC_1', 'SCP02_DEK_1']
|
||||
INFO: Changes to table uicc_keys committed!
|
||||
|
||||
When the new table columns are added, the import may be continued like the
|
||||
first one:
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=../ ./csv-to-pgsql.py --csv ./csv-to-pgsql_example_02.csv --table-name uicc_keys
|
||||
INFO: CSV file: ./csv-to-pgsql_example_02.csv
|
||||
INFO: CSV file columns: ['ID', 'IMSI', 'ICCID', 'ACC', 'PIN1', 'PUK1', 'PIN2', 'PUK2', 'KI', 'OPC', 'ADM1', 'SCP02_DEK_1', 'SCP02_ENC_1', 'SCP02_MAC_1']
|
||||
INFO: Using config file: /home/user/.osmocom/pysim/card_data_pgsql.cfg
|
||||
INFO: Database host: 127.0.0.1
|
||||
INFO: Database name: my_database
|
||||
INFO: Database user: my_importer_user
|
||||
INFO: Database table: uicc_keys
|
||||
INFO: Database table columns: ['ICCID', 'IMSI', 'PIN2', 'PUK1', 'PUK2', 'ACC', 'ID', 'PIN1', 'ADM1', 'KI', 'OPC', 'SCP02_ENC_1', 'SCP02_MAC_1', 'SCP02_DEK_1']
|
||||
INFO: CSV file import done, 3 rows imported
|
||||
INFO: Changes to table uicc_keys committed!
|
||||
|
||||
On the PostgreSQL console a `SELECT * FROM uicc_keys;` should now show the
|
||||
imported data with the added columns. All important data should now also be
|
||||
available from within pySim-shell via the `CardKeyProviderPgsql`.
|
||||
|
||||
|
||||
Column-Level CSV encryption
|
||||
---------------------------
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
pySim supports column-level CSV encryption. This feature will make sure
|
||||
that your key material is not stored in plaintext in the CSV file (or
|
||||
database).
|
||||
that your key material is not stored in plaintext in the CSV file.
|
||||
|
||||
The encryption mechanism uses AES in CBC mode. You can use any key
|
||||
length permitted by AES (128/192/256 bit).
|
||||
@@ -287,8 +72,6 @@ by all columns of the set:
|
||||
* `SCP03_ISDA` is a group alias for `SCP03_ENC_ISDA`, `SCP03_MAC_ISDA`, `SCP03_DEK_ISDA`
|
||||
* `SCP03_ISDR` is a group alias for `SCP03_ENC_ISDR`, `SCP03_MAC_ISDR`, `SCP03_DEK_ISDR`
|
||||
|
||||
NOTE: When using `CardKeyProviderPqsl`, the input CSV files must be encrypted
|
||||
before import.
|
||||
|
||||
Field naming
|
||||
------------
|
||||
@@ -299,9 +82,9 @@ Field naming
|
||||
* For look-up of eUICC specific key material (like SCP03 keys for the
|
||||
ISD-R, ECASD), pySim uses the `EID` field as lookup key.
|
||||
|
||||
As soon as the CardKeyProvider finds a line (row) in your CSV file
|
||||
(or database) where the ICCID or EID match, it looks for the column containing
|
||||
the requested data.
|
||||
As soon as the CardKeyProviderCsv finds a line (row) in your CSV where
|
||||
the ICCID or EID match, it looks for the column containing the requested
|
||||
data.
|
||||
|
||||
|
||||
ADM PIN
|
||||
|
||||
@@ -48,7 +48,6 @@ pySim consists of several parts:
|
||||
sim-rest
|
||||
suci-keytool
|
||||
saip-tool
|
||||
smpp-ota-tool
|
||||
|
||||
|
||||
Indices and tables
|
||||
|
||||
@@ -1,842 +0,0 @@
|
||||
Guide: Managing GP Keys
|
||||
=======================
|
||||
|
||||
Most of todays smartcards follow the GlobalPlatform Card Specification and the included Security Domain model.
|
||||
UICCs and eUCCCs are no exception here.
|
||||
|
||||
The Security Domain acts as an on-card representative of a card authority or administrator. It is used to perform tasks
|
||||
like the installation of applications or the provisioning and rotation of secure channel keys. It also acts as a secure
|
||||
key storage and offers all kinds of cryptographic services to applications that are installed under a specific
|
||||
Security Domain (see also GlobalPlatform Card Specification, section 7).
|
||||
|
||||
In this tutorial, we will show how to work with the key material (keysets) stored inside a Security Domain and how to
|
||||
rotate (replace) existing keys. We will also show how to provision new keys.
|
||||
|
||||
.. warning:: Making changes to keysets requires extreme caution as misconfigured keysets may lock you out permanently.
|
||||
It also strongly recommended to maintain at least one backup keyset that you can use as fallback in case
|
||||
the primary keyset becomes unusable for some reason.
|
||||
|
||||
|
||||
Selecting a Security Domain
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
A typical smartcard, such as an UICC will have one primary Security Domain, called the Issuer Security Domain (ISD).
|
||||
When working with those cards, the ISD will show up in the UICC filesystem tree as `ADF.ISD` and can be selected like
|
||||
any other file.
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (00:MF)> select ADF.ISD
|
||||
{
|
||||
"application_id": "a000000003000000",
|
||||
"proprietary_data": {
|
||||
"maximum_length_of_data_field_in_command_message": 255
|
||||
}
|
||||
}
|
||||
|
||||
When working with eUICCs, multiple Security Domains are involved. The model is slightly different from the classic
|
||||
model with one primary ISD. In the case of eUICCs, an ISD-R and an ISD-P exists.
|
||||
|
||||
The ISD-R (Issuer Security Domain - Root) is indeed the primary ISD. Its purpose is to handle the installation of new
|
||||
profiles and to manage the already installed profiles. The ISD-R shows up as a `ADF.ISD-R` and can be selected normally
|
||||
(see above) The key material that allows access to the ISD-R is usually only known to the eUICC manufacturer.
|
||||
|
||||
The ISD-P (Issuer Security Domain - Profile) is the primary ISD of the currently enabled profile. The ISD-P is
|
||||
comparable to the ISD we find on a UICC. The key material for the ISD-P should be known known to the ISP, which
|
||||
is the owner of the installed profile.
|
||||
|
||||
Since the AID of the ISD-P is allocated during the profile installation and different for each profile, it is not known
|
||||
by pySim-shell. This means there will no `ADF.ISD-P` file show up in the file system, but we can simply select the
|
||||
ISD-R, request the AID of the ISD-P and switch over to that ISD-P using a raw APDU:
|
||||
``00a4040410`` + ``a0000005591010ffffffff8900001000`` + ``00``
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (00:MF)> select ADF.ISD-R
|
||||
{
|
||||
"application_id": "a0000005591010ffffffff8900000100",
|
||||
"proprietary_data": {
|
||||
"maximum_length_of_data_field_in_command_message": 255
|
||||
},
|
||||
"isdr_proprietary_application_template": {
|
||||
"supported_version_number": "020300"
|
||||
}
|
||||
}
|
||||
pySIM-shell (00:MF/ADF.ISD-R)> get_profiles_info
|
||||
{
|
||||
"profile_info_seq": {
|
||||
"profile_info": {
|
||||
"iccid": "8949449999999990023",
|
||||
"isdp_aid": "a0000005591010ffffffff8900001000",
|
||||
"profile_state": "enabled",
|
||||
"service_provider_name": "OsmocomSPN",
|
||||
"profile_name": "TS48V1-A-UNIQUE",
|
||||
"profile_class": "operational"
|
||||
}
|
||||
}
|
||||
}
|
||||
pySIM-shell (00:MF/ADF.ISD-R)> apdu 00a4040410a0000005591010ffffffff890000100000
|
||||
SW: 9000, RESP: 6f188410a0000005591010ffffffff8900001000a5049f6501ff
|
||||
pySIM-shell (00:MF/ADF.ISD-R)>
|
||||
|
||||
After that, the prompt will still show the ADF.ISD-R, but we are actually in ADF.ISD-P and the standard GlobalPlatform
|
||||
operations like `establish_scpXX`, `get_data`, and `put_key` should work. The same workaround can also be applied to any
|
||||
Supplementary Security Domain as well, provided that the AID is known to the user.
|
||||
|
||||
|
||||
Establishing a secure channel
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Before we can make changes to the keysets in the currently selected Security Domain we must first establish a secure channel
|
||||
with that Security Domain. The secure channel protocols commonly used for this are `SCP02` (see also GlobalPlatform Card
|
||||
Specification, section E.1.1) and `SCP03` (see also GlobalPlatform Card Specification – Amendment D). `SCP02` is slightly
|
||||
older and commonly used on UICCs. The more modern `SCP03` is commonly used on eUICCs. The main difference between the
|
||||
two is that `SCP02` uses 3DES while `SCP03` is based on AES.
|
||||
|
||||
.. warning:: Secure channel protocols like `SCP02` and `SCP03` may manage an error counter to count failed login
|
||||
attempts. This means attempting to establish a secure channel with a wrong keyset multiple times may lock
|
||||
you out permanently. Double check the applied keyset before attempting to establish a secure channel.
|
||||
|
||||
|
||||
Example: `SCP02`
|
||||
----------------
|
||||
|
||||
In the following example, we assume that we want to establish a secure channel with the ISD of a `sysmoUSIM-SJA5` UICC.
|
||||
Along with the card we have received the following keyset:
|
||||
|
||||
+---------+----------------------------------+
|
||||
| Keyname | Keyvalue |
|
||||
+=========+==================================+
|
||||
| ENC/KIC | F09C43EE1A0391665CC9F05AF4E0BD10 |
|
||||
+---------+----------------------------------+
|
||||
| MAC/KID | 01981F4A20999F62AF99988007BAF6CA |
|
||||
+---------+----------------------------------+
|
||||
| DEK/KIK | 8F8AEE5CDCC5D361368BC45673D99195 |
|
||||
+---------+----------------------------------+
|
||||
|
||||
This keyset is tied to the key version number KVN 122 and is configured as a DES keyset. We can use this keyset to
|
||||
establish a secure channel using the SCP02 Secure Channel Protocol.
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (00:MF/ADF.ISD)> establish_scp02 --key-enc F09C43EE1A0391665CC9F05AF4E0BD10 --key-mac 01981F4A20999F62AF99988007BAF6CA --key-dek 8F8AEE5CDCC5D361368BC45673D99195 --security-level 3
|
||||
Successfully established a SCP02[03] secure channel
|
||||
|
||||
|
||||
Example: `SCP03`
|
||||
----------------
|
||||
|
||||
The establishment of a secure channel via SCP03 works just the same. In the following example we will establish a
|
||||
secure channel to the ISD-R of an eUICC. The SCP03 keyset we use is tied to KVN 50 and looks like this:
|
||||
|
||||
+---------+------------------------------------------------------------------+
|
||||
| Keyname | Keyvalue |
|
||||
+=========+==================================================================+
|
||||
| ENC/KIC | 620ff456b0c0328b68dc0d7d5eb24e07dd749aa86c9ff1836a7263e1d8896510 |
|
||||
+---------+------------------------------------------------------------------+
|
||||
| MAC/KID | b38116a2c85f2c8f46bbdc0081d6e8a04b0a58087d0ce5ee0ccc4c945e4aeda6 |
|
||||
+---------+------------------------------------------------------------------+
|
||||
| DEK/KIK | d409486cbcb8092a8592ee46d8668dfa97bea5eb7ce9c2b5a3f3bb1db358a153 |
|
||||
+---------+------------------------------------------------------------------+
|
||||
|
||||
We assume that ADF.ISD-R is already selected. We may now establish the SCP03 secure channel:
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (00:MF/ADF.ISD-R)> establish_scp03 --key-enc 620ff456b0c0328b68dc0d7d5eb24e07dd749aa86c9ff1836a7263e1d8896510 --key-mac b38116a2c85f2c8f46bbdc0081d6e8a04b0a58087d0ce5ee0ccc4c945e4aeda6 --key-dek d409486cbcb8092a8592ee46d8668dfa97bea5eb7ce9c2b5a3f3bb1db358a153 --key-ver 50 --security-level 3
|
||||
Successfully established a SCP03[03] secure channel
|
||||
|
||||
|
||||
Understanding Keysets
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Before making any changes to keysets, it is recommended to check the status of the currently installed keysets. To do
|
||||
so, we use the `get_data` command to retrieve the `key_information`. We cannot read back the key values themselves, but
|
||||
we get a summary of the installed keys together with their KVN numbers, IDs, algorithm and key length values.
|
||||
|
||||
Example: `key_information` from a `sysmoISIM-SJA5`:
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> get_data key_information
|
||||
{
|
||||
"key_information": [
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 112,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 112,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 112,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 1,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 1,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 1,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 2,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 2,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 2,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 47,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 47,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 47,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
Example: `key_information` from a `sysmoEUICC1-C2T`:
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP03[03]:00:MF/ADF.ISD-R)> get_data key_information
|
||||
{
|
||||
"key_information": [
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 50,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 32
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 50,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 32
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 50,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 32
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 64,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 64,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "tls_psk",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
The output from those two examples above may seem lengthy, but in order to move on and to provision own keys
|
||||
successfully, it is important to understand each aspect of it.
|
||||
|
||||
Key Version Number (KVN)
|
||||
------------------------
|
||||
|
||||
Each key is associated with a Key Version Number (KVN). Multiple keys that share the same KVN belong to the same
|
||||
keyset. In the first example above we can see that four keysets with KVN numbers 112, 1, 2 and 47 are provisioned.
|
||||
In the second example we see two keysets. One with KVN 50 and one with KVN 64.
|
||||
|
||||
The term "Key Version Number" is misleading as this number is not really a version number. It's actually a unique
|
||||
identifier for a specific keyset that also defines with which Secure Channel Protocol a key can be used. This means
|
||||
that the KVN is not just an arbitrary number. The following (incomplete) table gives a hint which KVN numbers may be
|
||||
used with which Secure Channel Protocol.
|
||||
|
||||
+-----------+-------------------------------------------------------+
|
||||
| KVN range | Secure Channel Protocol |
|
||||
+===========+=======================================================+
|
||||
| 1-15 | reserved for `SCP80` (OTA SMS) |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 17 | reserved for DAP specified in ETSI TS 102 226 |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 32-47 | reserved for `SCP02` |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 48-63 | reserved for `SCP03` |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 112 | Token key (RSA public or DES, also used with `SCP02`) |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 113 | Receipt key (DES) |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 115 | DAP verifiation key (RS public or DES) |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 116 | reserved for CASD |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 117 | 16-byte DES key for Ciphered Load File Data Block |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 129-143 | reserved for `SCP81` |
|
||||
+-----------+-------------------------------------------------------+
|
||||
| 255 | reserved for ISD with SCP02 without SCP80 support |
|
||||
+-----------+-------------------------------------------------------+
|
||||
|
||||
With that we can now understand that in the first example, the first and the last keyset is intended to be used with
|
||||
`SCP02` and that the second and the third keyset is intended to be used with `SCP80` (OTA SMS). In the second example we
|
||||
can see that the first keyset is intended to be used with `SCP03`, wheres the second should be usable with `SCP81`.
|
||||
|
||||
|
||||
Key Identifier
|
||||
--------------
|
||||
|
||||
Each keyset consists of a number of keys, where each key has a different Key Identifier. The Key Identifier is usually
|
||||
an incrementing number that starts counting at 1. The Key Identifier is used to distinguish the keys within the keyset.
|
||||
The exact number of keys and their attributes depends on the secure channel protocol for which the keyset is intended
|
||||
for. Each secure channel protocol may have its specific requirements on how many keys of which which type, length or
|
||||
Key Identifier have to be present.
|
||||
|
||||
However, almost all of the classic secure channel protocols (including `SCP02`, `SCP03` and `SCP81`) make use of the
|
||||
following three-key scheme:
|
||||
|
||||
+----------------+---------+---------------------------------------+
|
||||
| Key Identifier | Keyname | Purpose |
|
||||
+================+=========+=======================================+
|
||||
| 1 | ENC/KIC | encryption/decryption |
|
||||
+----------------+---------+---------------------------------------+
|
||||
| 2 | MAC/KID | cryptographic checksumming/signing |
|
||||
+----------------+---------+---------------------------------------+
|
||||
| 3 | DEK/KIK | encryption/decryption of key material |
|
||||
+----------------+---------+---------------------------------------+
|
||||
|
||||
In this case, all three keys share the same length and are used with the same algorithm. The key length is often used
|
||||
to implicitly select sub-types of an algorithm. (e.g. a 16 byte key of type `aes` is associated with `AES128`, where a 32
|
||||
byte key would be associated with `AES256`).
|
||||
|
||||
That different schemes are possible shows the second example. The `SCP80` keyset from the second example uses a scheme
|
||||
that works with two keys:
|
||||
|
||||
+----------------+---------+---------------------------------------+
|
||||
| Key Identifier | Keyname | Purpose |
|
||||
+================+=========+=======================================+
|
||||
| 1 | TLS-PSK | pre-shared key used for TLS |
|
||||
+----------------+---------+---------------------------------------+
|
||||
| 2 | DEK/KIK | encryption/decryption of key material |
|
||||
+----------------+---------+---------------------------------------+
|
||||
|
||||
It should also be noted that the order in which keysets and keys appear is an implementation detail of the UICC/eUICC
|
||||
O/S. The order has no influence on how a keyset is interpreted. Only the Key Version Number (KVN) and the Key Identifier
|
||||
matter.
|
||||
|
||||
|
||||
Rotating a keyset
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
Rotating keys is one of the most basic tasks one might want to perform on an UICC/eUICC before using it productively. In
|
||||
the following example we will illustrate how key rotation can be done. When rotating keys, only the key itself may
|
||||
change. For example it is not possible to change the key length or the algorithm used (see also GlobalPlatform Card
|
||||
Specification, section 11.8.2.3.3). Any key of the current Security Domain can be rotated, this also includes the key
|
||||
that was used to establish the secure channel.
|
||||
|
||||
In the following example we assume that the Security Domain is selected and a secure channel is already established. We
|
||||
intend to rotate the keyset with KVN 112. Since this keyset uses triple DES keys with a key length of 16, we must
|
||||
replace it with a keyset with keys of the same nature.
|
||||
|
||||
The new keyset shall look like this:
|
||||
|
||||
+----------------+---------+----------------------------------+
|
||||
| Key Identifier | Keyname | Keyvalue |
|
||||
+================+=========+==================================+
|
||||
| 1 | ENC/KIC | 542C37A6043679F2F9F71116418B1CD5 |
|
||||
+----------------+---------+----------------------------------+
|
||||
| 2 | MAC/KID | 34F11BAC8E5390B57F4E601372339E3C |
|
||||
+----------------+---------+----------------------------------+
|
||||
| 3 | DEK/KIK | 5524F4BECFE96FB63FC29D6BAAC6058B |
|
||||
+----------------+---------+----------------------------------+
|
||||
|
||||
When passing the keys to the `put_key` commandline, we set the Key Identifier of the first key using the `--key-id`
|
||||
parameter. This Key Identifier will be valid for the first key (KIC) we pass. For all consecutive keys, the Key
|
||||
Identifier will be incremented automatically (see also GlobalPlatform Card Specification, section 11.8.2.2). To Ensure
|
||||
that the new KIC, KID and KIK keys get the correct Key Identifiers, it is crucial to maintain order when passing the
|
||||
keys in the `--key-data` arguments. It is also important that each `--key-data` argument is preceded by a `--key-type`
|
||||
argument that sets the algorithm correctly (`des` in this case).
|
||||
|
||||
Finally we have to target the keyset we want to rotate by its KVN. The `--old-key-version-nr` argument is set to 112
|
||||
as this is identifies the keyset we want to rotate. The `--key-version-nr` is also set to 112 as we do not want to the
|
||||
KVN to be changed in this example. Changing the KVN while rotating a keyset is possible. In case the KVN has to change
|
||||
for some reason, the new KVN must be selected carefully to keep the key usable with the associated Secure Channel
|
||||
Protocol.
|
||||
|
||||
The commandline that matches the keyset we had laid out above looks like this:
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> put_key --key-id 1 --key-type des --key-data 542C37A6043679F2F9F71116418B1CD5 --key-type des --key-data 34F11BAC8E5390B57F4E601372339E3C --key-type des --key-data 5524F4BECFE96FB63FC29D6BAAC6058B --old-key-version-nr 112 --key-version-nr 112
|
||||
|
||||
After executing this put_key commandline, the keyset identified by KVN 122 is equipped with new keys. We can use
|
||||
`get_data key_information` to inspect the currently installed keysets. The output should appear unchanged as
|
||||
we only swapped out the keys. All other parameters, identifiers etc. should remain constant.
|
||||
|
||||
.. warning:: It is technically possible to rotate a keyset in a `non atomic` way using one `put_key` commandline for
|
||||
each key. However, in case the targeted keyset is the one used to establish the current secure channel,
|
||||
this method should not be used since, depending on the UICC/eUICC model, half-written key material may
|
||||
interrupt the current secure channel.
|
||||
|
||||
|
||||
Removing a keyset
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
In some cases it is necessary to remove a keyset entirely. This can be done with the `delete_key` command. Here it is
|
||||
important to understand that `delete_key` only removes one specific key from a specific keyset. This means that you
|
||||
need to run a separate `delete_key` command for each key inside a keyset.
|
||||
|
||||
In the following example we assume that the Security Domain is selected and a secure channel is already established. We
|
||||
intend to remove the keyset with KVN 112. This keyset consists of three keys.
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> delete_key --key-ver 112 --key-id 1
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> delete_key --key-ver 112 --key-id 2
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> delete_key --key-ver 112 --key-id 3
|
||||
|
||||
To verify that the keyset has been deleted properly, we can use the `get_data key_information` command to inspect the
|
||||
current status of the installed keysets. We should see that the key with KVN 112 is no longer present.
|
||||
|
||||
|
||||
Adding a keyset
|
||||
~~~~~~~~~~~~~~~
|
||||
|
||||
In the following we will discuss how to add an entirely new keyset. The procedure is almost identical with the key
|
||||
rotation procedure we have already discussed and it is assumed that all details about the key rotation are understood.
|
||||
In this section we will go into more detail and and illustrate how to provision new 3DES, `AES128` and `AES256` keysets.
|
||||
|
||||
It is important to keep in mind that storage space on smartcard is a precious resource. In many cases the amount of
|
||||
keysets that a Security Domain can store is limited. In some situations you may be forced to sacrifice one of your
|
||||
existing keysets in favor of a new keyset.
|
||||
|
||||
The main difference between key rotation and the adding of new keys is that we do not simply replace an existing key.
|
||||
Instead an entirely new key is programmed into the Security Domain. Therefore the `put_key` commandline will have no
|
||||
`--old-key-version-nr` parameter. From the commandline perspective, this is already the only visible difference from a
|
||||
commandline that simply rotates a keyset. Since we are writing an entirely new keyset, we are free to chose the
|
||||
algorithm and the key length within the parameter range permitted by the targeted secure channel protocol. Otherwise
|
||||
the same rules apply.
|
||||
|
||||
For reference, it should be mentioned that it is also possible to add or rotate keyset using multiple `put_key`
|
||||
commandlines. In this case one `put_key` commandline for each key is used. Each commandline will specify `--key-id` and
|
||||
`--key-version-nr` and one `--key-type` and `--key-data` tuple. However, when rotating or adding a keyset step-by-step,
|
||||
the whole process happens in a `non-atomic` way, which is less reliable. Therefore we will favor the `atomic method`
|
||||
|
||||
In the following examples we assume that the Security Domain is selected and a secure channel is already established.
|
||||
|
||||
|
||||
Example: `3DES` key for `SCP02`
|
||||
-------------------------------
|
||||
|
||||
Let's assume we want to provision a new 3DES keyset that we can use for SCP02. The keyset shall look like this:
|
||||
|
||||
+----------------+---------+----------------------------------+
|
||||
| Key Identifier | Keyname | Keyvalue |
|
||||
+================+=========+==================================+
|
||||
| 1 | ENC/KIC | 542C37A6043679F2F9F71116418B1CD5 |
|
||||
+----------------+---------+----------------------------------+
|
||||
| 2 | MAC/KID | 34F11BAC8E5390B57F4E601372339E3C |
|
||||
+----------------+---------+----------------------------------+
|
||||
| 3 | DEK/KIK | 5524F4BECFE96FB63FC29D6BAAC6058B |
|
||||
+----------------+---------+----------------------------------+
|
||||
|
||||
The keyset shall be a associated with the KVN 46. We have made sure before that KVN 46 is still unused and that this
|
||||
KVN number is actually suitable for SCP02 keys. As we are using 3DES, it is obvious that we have to pass 3 keys with 16
|
||||
byte length.
|
||||
|
||||
To program the key, we may use the following commandline. As we can see, this commandline is almost the exact same as
|
||||
the one from the key rotation example where we were rotating a 3DES key. The only difference is that we didn't specify
|
||||
an old KVN number and that we have chosen a different KVN.
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> put_key --key-id 1 --key-type des --key-data 542C37A6043679F2F9F71116418B1CD5 --key-type des --key-data 34F11BAC8E5390B57F4E601372339E3C --key-type des --key-data 5524F4BECFE96FB63FC29D6BAAC6058B --key-version-nr 46
|
||||
|
||||
In case of success, the keyset should appear in the `key_information` among the other keysets that are already present.
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> get_data key_information
|
||||
{
|
||||
"key_information": [
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 46,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 46,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 46,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "des",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
Example: `AES128` key for `SCP80`
|
||||
---------------------------------
|
||||
|
||||
In this example we intend to provision a new `AES128` keyset that we can use with SCP80 (OTA SMS). The keyset shall look
|
||||
like this:
|
||||
|
||||
+----------------+---------+----------------------------------+
|
||||
| Key Identifier | Keyname | Keyvalue |
|
||||
+================+=========+==================================+
|
||||
| 1 | ENC/KIC | 542C37A6043679F2F9F71116418B1CD5 |
|
||||
+----------------+---------+----------------------------------+
|
||||
| 2 | MAC/KID | 34F11BAC8E5390B57F4E601372339E3C |
|
||||
+----------------+---------+----------------------------------+
|
||||
| 3 | DEK/KIK | 5524F4BECFE96FB63FC29D6BAAC6058B |
|
||||
+----------------+---------+----------------------------------+
|
||||
|
||||
In addition to that, we want to associate this key with KVN 3. We have inspected the currently installed keysets before
|
||||
and made sure that KVN 3 is still unused. We are also aware that for SCP80 we may only use KVN values from 1 to 15.
|
||||
|
||||
For `AES128`, we specify the algorithm using the `--key-type aes` parameter. The selection between `AES128` and `AES256` is
|
||||
done implicitly using the key length. Since we want to use `AES128` in this case, all three keys have a length of 16 byte.
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> put_key --key-id 1 --key-type aes --key-data 542C37A6043679F2F9F71116418B1CD5 --key-type aes --key-data 34F11BAC8E5390B57F4E601372339E3C --key-type aes --key-data 5524F4BECFE96FB63FC29D6BAAC6058B --key-version-nr 3
|
||||
|
||||
In case of success, the keyset should appear in the `key_information` among the other keysets that are already present.
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> get_data key_information
|
||||
{
|
||||
"key_information": [
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 3,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 3,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 3,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
Example: `AES256` key for `SCP03`
|
||||
---------------------------------
|
||||
|
||||
Let's assume we want to provision a new `AES256` keyset that we can use for SCP03. The keyset shall look like this:
|
||||
|
||||
+----------------+---------+------------------------------------------------------------------+
|
||||
| Key Identifier | Keyname | Keyvalue |
|
||||
+================+=========+==================================================================+
|
||||
| 1 | ENC/KIC | 542C37A6043679F2F9F71116418B1CD5542C37A6043679F2F9F71116418B1CD5 |
|
||||
+----------------+---------+------------------------------------------------------------------+
|
||||
| 2 | MAC/KID | 34F11BAC8E5390B57F4E601372339E3C34F11BAC8E5390B57F4E601372339E3C |
|
||||
+----------------+---------+------------------------------------------------------------------+
|
||||
| 3 | DEK/KIK | 5524F4BECFE96FB63FC29D6BAAC6058B5524F4BECFE96FB63FC29D6BAAC6058B |
|
||||
+----------------+---------+------------------------------------------------------------------+
|
||||
|
||||
In addition to that, we assume that we want to associate this key with KVN 51. This KVN number falls in the range of
|
||||
48 - 63 and is therefore suitable for a key that shall be usable with SCP03. We also made sure before that KVN 51 is
|
||||
still unused.
|
||||
|
||||
With that we can go ahead and make up the following commandline:
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> put_key --key-id 1 --key-type aes --key-data 542C37A6043679F2F9F71116418B1CD5542C37A6043679F2F9F71116418B1CD5 --key-type aes --key-data 34F11BAC8E5390B57F4E601372339E3C34F11BAC8E5390B57F4E601372339E3C --key-type aes --key-data 5524F4BECFE96FB63FC29D6BAAC6058B5524F4BECFE96FB63FC29D6BAAC6058B --key-version-nr 51
|
||||
|
||||
In case of success, we should see the keyset in the `key_information`
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP02[03]:00:MF/ADF.ISD)> get_data key_information
|
||||
{
|
||||
"key_information": [
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 51,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 32
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 51,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 32
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 3,
|
||||
"key_version_number": 51,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 32
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
Example: `AES128` key for `SCP81`
|
||||
---------------------------------
|
||||
|
||||
In this example we will show how to provision a new `AES128` keyset for `SCP81`. We will provision this keyset under
|
||||
KVN 64. The keyset we intend to apply shall look like this:
|
||||
|
||||
+----------------+---------+----------------------------------+
|
||||
| Key Identifier | Keyname | Keyvalue |
|
||||
+================+=========+==================================+
|
||||
| 1 | TLS-PSK | 000102030405060708090a0b0c0d0e0f |
|
||||
+----------------+---------+----------------------------------+
|
||||
| 2 | DEK/KIK | 000102030405060708090a0b0c0d0e0f |
|
||||
+----------------+---------+----------------------------------+
|
||||
|
||||
With that we can put together the following command line:
|
||||
|
||||
::
|
||||
|
||||
put_key --key-id 1 --key-type tls_psk --key-data 000102030405060708090a0b0c0d0e0f --key-type aes --key-data 000102030405060708090a0b0c0d0e0f --key-version-nr 64
|
||||
|
||||
In case of success, the keyset should appear in the `key_information` as follows:
|
||||
|
||||
::
|
||||
|
||||
pySIM-shell (SCP03[03]:00:MF/ADF.ISD-R)> get_data key_information
|
||||
{
|
||||
"key_information": [
|
||||
...,
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 2,
|
||||
"key_version_number": 64,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "aes",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_information_data": {
|
||||
"key_identifier": 1,
|
||||
"key_version_number": 64,
|
||||
"key_types": [
|
||||
{
|
||||
"type": "tls_psk",
|
||||
"length": 16
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -68,7 +68,7 @@ Usage Examples
|
||||
|
||||
suci-tutorial
|
||||
cap-tutorial
|
||||
put_key-tutorial
|
||||
|
||||
|
||||
Advanced Topics
|
||||
---------------
|
||||
|
||||
@@ -1,179 +0,0 @@
|
||||
smpp-ota-tool
|
||||
=============
|
||||
|
||||
The `smpp-ota-tool` allows users to send OTA SMS messages containing APDU scripts (RFM, RAM) via an SMPP server. The
|
||||
intended audience are developers who want to test/evaluate the OTA SMS interface of a SIM/UICC/eUICC. `smpp-ota-tool`
|
||||
is intended to be used as a companion tool for :ref:`pySim-smpp2sim`, however it should be usable on any other SMPP
|
||||
server (such as a production SMSC of a live cellular network) as well.
|
||||
|
||||
From the technical perspective `smpp-ota-tool` takes the role of an SMPP ESME. It takes care of the encoding, encryption
|
||||
and checksumming (signing) of the RFM/RAM OTA SMS and eventually submits it to the SMPP server. The program then waits
|
||||
for a response. The response is automatically parsed and printed on stdout. This makes the program also suitable to be
|
||||
called from shell scripts.
|
||||
|
||||
.. note:: In the following we will we will refer to `SIM` as one of the following: `SIM`, `USIM`, `ISIM`, `UICC`,
|
||||
`eUICC`, `eSIM`.
|
||||
|
||||
Applying OTA keys
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
Depending on the `SIM` type you will receive one or more sets of keys which you can use to communicate with the `SIM`
|
||||
through a secure channel protocol. When using the OTA SMS method, the SCP80 protocol is used and it therefore crucial
|
||||
to use a keyset that is actually suitable for SCP80.
|
||||
|
||||
A keyset usually consists of three keys:
|
||||
|
||||
#. KIC: the key used for ciphering (encryption/decryption)
|
||||
#. KID: the key used to compute a cryptographic checksum (signing)
|
||||
#. KIK: the key used to encrypt/decrypt key material (key rotation, adding of new keys)
|
||||
|
||||
From the transport security perspective, only KIC and KID are relevant. The KIK (also referenced as "Data Encryption
|
||||
Key", DEK) is only used when keys are rotated or new keys are added (see also ETSI TS 102 226, section 8.2.1.5).
|
||||
|
||||
When the keyset is programmed into the security domain of the `SIM`, it is tied to a specific cryptographic algorithm
|
||||
(3DES, AES128 or AES256) and a so called Key Version Number (KVN). The term "Key Version Number" is misleading, since
|
||||
it is actually not a version number. It is a unique identifier of a certain keyset which also identifies for which
|
||||
secure channel protocol the keyset may be used. Keysets with a KVN from 1-15 (``0x01``-``0x0F``) are suitable for SCP80.
|
||||
This means that it is not only important to know just the KIC/KID/KIK keys. Also the related algorithms and the KVN
|
||||
numbers must be known.
|
||||
|
||||
.. note:: SCP80 keysets typically start counting from 1 upwards. Typical configurations use a set of 3 keysets with
|
||||
KVN numbers 1-3.
|
||||
|
||||
Addressing an Application
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
When communicating with a specific application on a `SIM` via SCP80, it is important to address that application with
|
||||
the correct parameters. The following two parameters must be known in advance:
|
||||
|
||||
#. TAR: The Toolkit Application Reference (TAR) number is a three byte value that uniquely addresses an application
|
||||
on the `SIM`. The exact values may vary (see also ETSI TS 101 220, Table D.1).
|
||||
#. MSL: The Minimum Security Level (MSL) is a bit-field that dictates which of the security measures encoded in the
|
||||
SPI are mandatory (see also ETSI TS 102 225, section 5.1.1).
|
||||
|
||||
A practical example
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. note:: This tutorial assumes that pySim-smpp2sim is running on the local machine with its default parameters.
|
||||
See also :ref:`pySim-smpp2sim`.
|
||||
|
||||
Let's assume that an OTA SMS shall be sent to the SIM RFM application of an sysmoISIM-SJA2. What we want to do is to
|
||||
select DF.GSM and to get the select response back.
|
||||
|
||||
We have received the following key material from the `SIM` vendor:
|
||||
|
||||
::
|
||||
|
||||
KIC1: F09C43EE1A0391665CC9F05AF4E0BD10
|
||||
KID1: 01981F4A20999F62AF99988007BAF6CA
|
||||
KIK1: 8F8AEE5CDCC5D361368BC45673D99195
|
||||
KIC2: 01022916E945B656FDE03F806A105FA2
|
||||
KID2: D326CB69F160333CC5BD1495D448EFD6
|
||||
KIK2: 08037E0590DFE049D4975FFB8652F625
|
||||
KIC3: 2B22824D0D27A3A1CEEC512B312082B4
|
||||
KID3: F1697766925A11F4458295590137B672
|
||||
KIK3: C7EE69B2C5A1C8E160DD36A38EB517B3
|
||||
|
||||
Those are three keysets. The enumeration is directly equal to the KVN used. All three keysets are 3DES keys, which
|
||||
means triple_des_cbc2 is the correct algorithm to use.
|
||||
|
||||
.. note:: The key set configuration can be confirmed by retrieving the key configuration using
|
||||
`get_data key_information` from within an SCP02 session on ADF.ISD.
|
||||
|
||||
In this example we intend to address the SIM RFM application on the `SIM`. Which according to the manual has TAR ``B00010``
|
||||
and MSL ``0x06``. When we hold ``0x06`` = ``0b00000110`` against the SPI coding chart (see also ETSI TS 102 225,
|
||||
section 5.1.1). We can deduct that Ciphering and Cryptographic Checksum are mandatory.
|
||||
|
||||
.. note:: The MSL (see also ETSI TS 102 226, section 6.1) is assigned to an application by the `SIM` issuer. It is a
|
||||
custom decision and may vary with different `SIM` types/profiles. In the case of sysmoISIM-SJS1/SJA2/SJA5 the
|
||||
counter requirement has been waived to simplify lab/research type use. In productive environments, `SIM`
|
||||
applications should ideally use an MSL that makes the counter mandatory.
|
||||
|
||||
In order to select DF.GSM (``0x7F20``) and to retrieve the select response, two APDUs are needed. The first APDU is the
|
||||
select command ``A0A40000027F20`` and the second is the related get-response command ``A0C0000016``. Those APDUs will be
|
||||
concatenated and are sent in a single message. The message containing the concatenated APDUs works as a script that
|
||||
is received by the SIM RFM application and then executed. This method poses some limitations that have to be taken into
|
||||
account when making requests like this (see also ETSI TS 102 226, section 5).
|
||||
|
||||
With this information we may now construct a commandline for `smpp-ota-tool.py`. We will pass the KVN as kid_idx and
|
||||
kic_idx (see also ETSI TS 102 225, Table 2, fields `KIc` and `KID`). Both index values should refer to the same
|
||||
keyset/KVN as keysets should not be mixed. (`smpp-ota-tool` still provides separate parameters anyway to allow testing
|
||||
with invalid keyset combinations)
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016
|
||||
2026-02-26 17:13:56 INFO Connecting to localhost:2775...
|
||||
2026-02-26 17:13:56 INFO C-APDU sending: a0a40000027f20a0c0000016...
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU sending: 02700000281506191515b00010da1d6cbbd0d11ce4330d844c7408340943e843f67a6d7b0674730881605fd62d...
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU sent, waiting for response...
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU received: 027100002c12b000107ddf58d1780f771638b3975759f4296cf5c31efc87a16a1b61921426baa16da1b5ba1a9951d59a39
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU decoded: (Container(rpl=44, rhl=18, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00\x00', pcntr=0, response_status=uEnumIntegerString.new(0, 'por_ok'), cc_rc=b'\x8f\xea\xf5.\xf4\x0e\xc2\x14', secured_data=b'\x02\x90\x00\x00\x00\xff\xff\x7f \x02\x00\x00\x00\x00\x00\t\xb1\x065\x04\x00\x83\x8a\x83\x8a'), Container(number_of_commands=2, last_status_word=u'9000', last_response_data=u'0000ffff7f2002000000000009b106350400838a838a'))
|
||||
2026-02-26 17:13:56 INFO R-APDU received: 0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
2026-02-26 17:13:56 INFO Disconnecting...
|
||||
|
||||
The result we see is the select response of DF.GSM and a status word indicating that the last command has been
|
||||
processed normally.
|
||||
|
||||
As we can see, this mechanism now allows us to perform small administrative tasks remotely. We can read the contents of
|
||||
files remotely or make changes to files. Depending on the changes we make, there may be security issues arising from
|
||||
replay attacks. With the commandline above, the communication is encrypted and protected by a cryptographic checksum,
|
||||
so an adversary can neither read, nor alter the message. However, an adversary could still replay an intercepted
|
||||
message and the `SIM` would happily execute the contained APDUs again.
|
||||
|
||||
To prevent this, we may include a replay protection counter within the message. In this case, the MSL indicates that a
|
||||
replay protection counter is not required. However, to extended the security of our messages, we may chose to use a
|
||||
counter anyway. In the following example, we will encode a counter value of 100. We will instruct the `SIM` to make sure
|
||||
that the value we send is higher than the counter value that is currently stored in the `SIM`.
|
||||
|
||||
To add a replay connection counter we add the commandline arguments `--cntr-req` to set the counter requirement and
|
||||
`--cntr` to pass the counter value.
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016 --cntr-req counter_must_be_higher --cntr 100
|
||||
2026-02-26 17:16:39 INFO Connecting to localhost:2775...
|
||||
2026-02-26 17:16:39 INFO C-APDU sending: a0a40000027f20a0c0000016...
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU sending: 02700000281516191515b000103a4f599e94f2b5dcfbbda984761b7977df6514c57a580fb4844787c436d2eade...
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU sent, waiting for response...
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU received: 027100002c12b0001049fb0315f6c6401b553867f412cefaf9355b38271178edb342a3bc9cc7e670cdc1f45eea6ffcbb39
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU decoded: (Container(rpl=44, rhl=18, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00d', pcntr=0, response_status=uEnumIntegerString.new(0, 'por_ok'), cc_rc=b'\xa9/\xc7\xc9\x00"\xab5', secured_data=b'\x02\x90\x00\x00\x00\xff\xff\x7f \x02\x00\x00\x00\x00\x00\t\xb1\x065\x04\x00\x83\x8a\x83\x8a'), Container(number_of_commands=2, last_status_word=u'9000', last_response_data=u'0000ffff7f2002000000000009b106350400838a838a'))
|
||||
2026-02-26 17:16:39 INFO R-APDU received: 0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
2026-02-26 17:16:39 INFO Disconnecting...
|
||||
|
||||
The `SIM` has accepted the message. The message got processed and the `SIM` has set its internal to 100. As an experiment,
|
||||
we may try to re-use the counter value:
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016 --cntr-req counter_must_be_higher --cntr 100
|
||||
2026-02-26 17:16:43 INFO Connecting to localhost:2775...
|
||||
2026-02-26 17:16:43 INFO C-APDU sending: a0a40000027f20a0c0000016...
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU sending: 02700000281516191515b000103a4f599e94f2b5dcfbbda984761b7977df6514c57a580fb4844787c436d2eade...
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU sent, waiting for response...
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU received: 027100000b0ab0001000000000000006
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU decoded: (Container(rpl=11, rhl=10, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00\x00', pcntr=0, response_status=uEnumIntegerString.new(6, 'undefined_security_error'), cc_rc=b'', secured_data=b''), None)
|
||||
Traceback (most recent call last):
|
||||
File "/home/user/work/git_master/pysim/./contrib/smpp-ota-tool.py", line 238, in <module>
|
||||
resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr, opts.dest_addr, opts.timeout)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
File "/home/user/work/git_master/pysim/./contrib/smpp-ota-tool.py", line 162, in transceive_apdu
|
||||
raise ValueError("Response does not contain any last_response_data, no R-APDU received!")
|
||||
ValueError: Response does not contain any last_response_data, no R-APDU received!
|
||||
2026-02-26 17:16:43 INFO Disconnecting...
|
||||
|
||||
As we can see, the `SIM` has rejected the message with an `undefined_security_error`. The replay-protection-counter
|
||||
ensures that a message can only be sent once.
|
||||
|
||||
.. note:: The replay-protection-counter is implemented as a 5 byte integer value (see also ETSI TS 102 225, Table 3).
|
||||
When the counter has reached its maximum, it will not overflow nor can it be reset.
|
||||
|
||||
smpp-ota-tool syntax
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. argparse::
|
||||
:module: contrib.smpp-ota-tool
|
||||
:func: option_parser
|
||||
:prog: contrib/smpp-ota-tool.py
|
||||
@@ -55,5 +55,3 @@ And once your external program is sending SMS to the simulated SMSC, it will log
|
||||
SMSPPDownload(DeviceIdentities({'source_dev_id': 'network', 'dest_dev_id': 'uicc'}),Address({'ton_npi': 0, 'call_number': '0123456'}),SMS_TPDU({'tpdu': '400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d'}))
|
||||
INFO root: ENVELOPE: d147820283818604001032548b3b400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d
|
||||
INFO root: SW 9000: 027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c
|
||||
|
||||
.. note:: for sending OTA SMS messages :ref:`smpp-ota-tool` may be used.
|
||||
|
||||
2
lint_pylint.sh
Executable file
2
lint_pylint.sh
Executable file
@@ -0,0 +1,2 @@
|
||||
#!/bin/sh
|
||||
python3 -m pylint -j0 --errors-only --disable E1102 --disable E0401 --enable W0301 pySim
|
||||
4
lint_ruff.sh
Executable file
4
lint_ruff.sh
Executable file
@@ -0,0 +1,4 @@
|
||||
#!/bin/sh -e
|
||||
set -x
|
||||
cd "$(dirname "$0")"
|
||||
ruff check .
|
||||
@@ -44,7 +44,6 @@ from pySim.exceptions import SwMatchError
|
||||
from pySim.legacy.cards import card_detect, SimCard, UsimCard, IsimCard
|
||||
from pySim.utils import dec_imsi, dec_iccid
|
||||
from pySim.legacy.utils import format_xplmn_w_act, dec_st, dec_msisdn
|
||||
from pySim.ts_51_011 import EF_SMSP
|
||||
|
||||
option_parser = argparse.ArgumentParser(description='Legacy tool for reading some parts of a SIM card',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
@@ -142,15 +141,6 @@ if __name__ == '__main__':
|
||||
(res, sw) = card.read_record('SMSP', 1)
|
||||
if sw == '9000':
|
||||
print("SMSP: %s" % (res,))
|
||||
ef_smsp = EF_SMSP()
|
||||
smsc_a = ef_smsp.decode_record_bin(h2b(res), 1).get('tp_sc_addr', {})
|
||||
smsc_n = smsc_a.get('call_number', None)
|
||||
if smsc_a.get('ton_npi', {}).get('type_of_number', None) == 'international' and smsc_n is not None:
|
||||
smsc = '+' + smsc_n
|
||||
else:
|
||||
smsc = smsc_n
|
||||
if smsc is not None:
|
||||
print("SMSC: %s" % (smsc,))
|
||||
else:
|
||||
print("SMSP: Can't read, response code = %s" % (sw,))
|
||||
|
||||
|
||||
@@ -69,12 +69,12 @@ from pySim.ts_102_222 import Ts102222Commands
|
||||
from pySim.gsm_r import DF_EIRENE
|
||||
from pySim.cat import ProactiveCommand
|
||||
|
||||
from pySim.card_key_provider import CardKeyProviderCsv, CardKeyProviderPgsql
|
||||
from pySim.card_key_provider import CardKeyProviderCsv
|
||||
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
|
||||
|
||||
from pySim.app import init_card
|
||||
|
||||
log = PySimLogger.get(Path(__file__).stem)
|
||||
log = PySimLogger.get("main")
|
||||
|
||||
class Cmd2Compat(cmd2.Cmd):
|
||||
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
|
||||
@@ -519,17 +519,8 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
||||
@cmd2.with_category(CUSTOM_CATEGORY)
|
||||
def do_version(self, opts):
|
||||
"""Print the pySim software version."""
|
||||
from importlib.metadata import version as vsn
|
||||
self.poutput("pyosmocom " + vsn('pyosmocom'))
|
||||
import os
|
||||
cwd = os.path.dirname(os.path.realpath(__file__))
|
||||
if os.path.isdir(os.path.join(cwd, ".git")):
|
||||
import subprocess
|
||||
url = subprocess.check_output(['git', 'config', '--get', 'remote.origin.url']).decode('ascii').strip()
|
||||
version = subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=cwd).decode('ascii').strip()
|
||||
self.poutput(os.path.basename(url) + " " + version)
|
||||
else:
|
||||
self.poutput("pySim " + vsn('pySim'))
|
||||
import pkg_resources
|
||||
self.poutput(pkg_resources.get_distribution('pySim'))
|
||||
|
||||
@with_default_category('pySim Commands')
|
||||
class PySimCommands(CommandSet):
|
||||
@@ -1147,11 +1138,8 @@ global_group.add_argument("--verbose", help="Enable verbose logging",
|
||||
|
||||
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
|
||||
card_key_group.add_argument('--csv', metavar='FILE',
|
||||
default="~/.osmocom/pysim/card_data.csv",
|
||||
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
|
||||
help='Read card data from CSV file')
|
||||
card_key_group.add_argument('--pgsql', metavar='FILE',
|
||||
default="~/.osmocom/pysim/card_data_pgsql.cfg",
|
||||
help='Read card data from PostgreSQL database (config file)')
|
||||
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
help=argparse.SUPPRESS, dest='column_key')
|
||||
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||
@@ -1176,7 +1164,7 @@ if __name__ == '__main__':
|
||||
|
||||
# Ensure that we are able to print formatted warnings from the beginning.
|
||||
PySimLogger.setup(print, {logging.WARN: YELLOW})
|
||||
if opts.verbose:
|
||||
if (opts.verbose):
|
||||
PySimLogger.set_verbose(True)
|
||||
PySimLogger.set_level(logging.DEBUG)
|
||||
else:
|
||||
@@ -1189,10 +1177,8 @@ if __name__ == '__main__':
|
||||
for par in opts.column_key:
|
||||
name, key = par.split(':')
|
||||
column_keys[name] = key
|
||||
if os.path.isfile(os.path.expanduser(opts.csv)):
|
||||
card_key_provider_register(CardKeyProviderCsv(os.path.expanduser(opts.csv), column_keys))
|
||||
if os.path.isfile(os.path.expanduser(opts.pgsql)):
|
||||
card_key_provider_register(CardKeyProviderPgsql(os.path.expanduser(opts.pgsql), column_keys))
|
||||
if os.path.isfile(opts.csv):
|
||||
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
|
||||
|
||||
# Init card reader driver
|
||||
sl = init_reader(opts, proactive_handler = Proact())
|
||||
|
||||
@@ -23,7 +23,6 @@ from pySim.apdu_source.gsmtap import GsmtapApduSource
|
||||
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive
|
||||
from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap
|
||||
from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource
|
||||
from pySim.apdu_source.stdin_hex import StdinHexApduSource
|
||||
|
||||
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
|
||||
|
||||
@@ -191,10 +190,6 @@ parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
|
||||
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
|
||||
help='Name of the log file to be read')
|
||||
|
||||
parser_stdin_hex = subparsers.add_parser('stdin-hex', help="""
|
||||
Read APDUs as hex-string from stdin.""")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
opts = option_parser.parse_args()
|
||||
@@ -210,8 +205,6 @@ if __name__ == '__main__':
|
||||
s = PysharkGsmtapPcap(opts.pcap_file)
|
||||
elif opts.source == 'tca-loader-log':
|
||||
s = TcaLoaderLogApduSource(opts.log_file)
|
||||
elif opts.source == 'stdin-hex':
|
||||
s = StdinHexApduSource()
|
||||
else:
|
||||
raise ValueError("unsupported source %s", opts.source)
|
||||
|
||||
|
||||
@@ -84,5 +84,5 @@ class PysharkGsmtapPcap(_PysharkGsmtap):
|
||||
Args:
|
||||
pcap_filename: File name of the pcap file to be opened
|
||||
"""
|
||||
pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='gsm_sim || iso7816.atr', use_json=True, keep_packets=False)
|
||||
pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='gsm_sim', use_json=True, keep_packets=False)
|
||||
super().__init__(pyshark_inst)
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
# coding=utf-8
|
||||
|
||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from pySim.utils import h2b
|
||||
|
||||
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
|
||||
from pySim.apdu.ts_102_222 import ApduCommands as UiccAdmApduCommands
|
||||
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
|
||||
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
|
||||
|
||||
from . import ApduSource, PacketType, CardReset
|
||||
|
||||
ApduCommands = UiccApduCommands + UiccAdmApduCommands + UsimApduCommands + GpApduCommands
|
||||
|
||||
class StdinHexApduSource(ApduSource):
|
||||
"""ApduSource for reading apdu hex-strings from stdin."""
|
||||
|
||||
def read_packet(self) -> PacketType:
|
||||
while True:
|
||||
command = input("C-APDU >")
|
||||
if len(command) == 0:
|
||||
continue
|
||||
response = '9000'
|
||||
return ApduCommands.parse_cmd_bytes(h2b(command) + h2b(response))
|
||||
@@ -7,7 +7,7 @@ there are also automatic card feeders.
|
||||
"""
|
||||
|
||||
#
|
||||
# (C) 2019 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2019 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
|
||||
@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
|
||||
operation with pySim-shell.
|
||||
"""
|
||||
|
||||
# (C) 2021-2025 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier, Harald Welte
|
||||
@@ -36,9 +36,8 @@ from pySim.log import PySimLogger
|
||||
import abc
|
||||
import csv
|
||||
import logging
|
||||
import yaml
|
||||
|
||||
log = PySimLogger.get(__name__)
|
||||
log = PySimLogger.get("CARDKEY")
|
||||
|
||||
card_key_providers = [] # type: List['CardKeyProvider']
|
||||
|
||||
@@ -58,7 +57,7 @@ class CardKeyFieldCryptor:
|
||||
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
|
||||
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
|
||||
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
|
||||
'SCP03_ISDA': ['SCP03_ENC_ISDA', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
||||
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
||||
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
|
||||
}
|
||||
|
||||
@@ -160,7 +159,6 @@ class CardKeyProviderCsv(CardKeyProvider):
|
||||
csv_filename : file name (path) of CSV file containing card-individual key/data
|
||||
transport_keys : (see class CardKeyFieldCryptor)
|
||||
"""
|
||||
log.info("Using CSV file as card key data source: %s" % csv_filename)
|
||||
self.csv_file = open(csv_filename, 'r')
|
||||
if not self.csv_file:
|
||||
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
||||
@@ -188,69 +186,6 @@ class CardKeyProviderCsv(CardKeyProvider):
|
||||
return None
|
||||
return return_dict
|
||||
|
||||
class CardKeyProviderPgsql(CardKeyProvider):
|
||||
"""Card key provider implementation that allows to query against a specified PostgreSQL database table."""
|
||||
|
||||
def __init__(self, config_filename: str, transport_keys: dict):
|
||||
"""
|
||||
Args:
|
||||
config_filename : file name (path) of CSV file containing card-individual key/data
|
||||
transport_keys : (see class CardKeyFieldCryptor)
|
||||
"""
|
||||
import psycopg2
|
||||
log.info("Using SQL database as card key data source: %s" % config_filename)
|
||||
with open(config_filename, "r") as cfg:
|
||||
config = yaml.load(cfg, Loader=yaml.FullLoader)
|
||||
log.info("Card key database name: %s" % config.get('db_name'))
|
||||
db_users = config.get('db_users')
|
||||
user = db_users.get('reader')
|
||||
if user is None:
|
||||
raise ValueError("user for role 'reader' not set up in config file.")
|
||||
self.conn = psycopg2.connect(dbname=config.get('db_name'),
|
||||
user=user.get('name'),
|
||||
password=user.get('pass'),
|
||||
host=config.get('host'))
|
||||
self.tables = config.get('table_names')
|
||||
log.info("Card key database tables: %s" % str(self.tables))
|
||||
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||
|
||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||
import psycopg2
|
||||
from psycopg2.sql import Identifier, SQL
|
||||
db_result = None
|
||||
for t in self.tables:
|
||||
self.conn.rollback()
|
||||
cur = self.conn.cursor()
|
||||
|
||||
# Make sure that the database table and the key column actually exists. If not, move on to the next table
|
||||
cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (t,))
|
||||
cols_result = cur.fetchall()
|
||||
if cols_result == []:
|
||||
log.warning("Card Key database seems to lack table %s, check config file!" % t)
|
||||
continue
|
||||
if (key.lower(),) not in cols_result:
|
||||
continue
|
||||
|
||||
# Query requested columns from database table
|
||||
query = SQL("SELECT {}").format(Identifier(fields[0].lower()))
|
||||
for f in fields[1:]:
|
||||
query += SQL(", {}").format(Identifier(f.lower()))
|
||||
query += SQL(" FROM {} WHERE {} = %s LIMIT 1;").format(Identifier(t.lower()),
|
||||
Identifier(key.lower()))
|
||||
cur.execute(query, (value,))
|
||||
db_result = cur.fetchone()
|
||||
cur.close()
|
||||
|
||||
if db_result:
|
||||
break
|
||||
|
||||
if db_result is None:
|
||||
return None
|
||||
result = dict(zip(fields, db_result))
|
||||
|
||||
for k in result.keys():
|
||||
result[k] = self.crypt.decrypt_field(k, result.get(k))
|
||||
return result
|
||||
|
||||
|
||||
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
||||
|
||||
@@ -128,7 +128,7 @@ class EF_AD(TransparentEF):
|
||||
cell_test = 0x04
|
||||
|
||||
def __init__(self, fid='6f43', sfid=None, name='EF.AD',
|
||||
desc='Administrative Data', size=(3, None), **kwargs):
|
||||
desc='Service Provider Name', size=(3, None), **kwargs):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, **kwargs)
|
||||
self._construct = Struct(
|
||||
# Byte 1: Display Condition
|
||||
|
||||
@@ -54,8 +54,6 @@ def compile_asn1_subdir(subdir_name:str, codec='der'):
|
||||
__ver = sys.version_info
|
||||
if (__ver.major, __ver.minor) >= (3, 9):
|
||||
for i in resources.files('pySim.esim').joinpath('asn1').joinpath(subdir_name).iterdir():
|
||||
if not i.name.endswith('.asn'):
|
||||
continue
|
||||
asn_txt += i.read_text()
|
||||
asn_txt += "\n"
|
||||
#else:
|
||||
|
||||
@@ -16,12 +16,6 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import requests
|
||||
from klein import Klein
|
||||
from twisted.internet import defer, protocol, ssl, task, endpoints, reactor
|
||||
from twisted.internet.posixbase import PosixReactorBase
|
||||
from pathlib import Path
|
||||
from twisted.web.server import Site, Request
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import time
|
||||
@@ -33,7 +27,7 @@ logger.setLevel(logging.DEBUG)
|
||||
|
||||
class param:
|
||||
class Iccid(ApiParamString):
|
||||
"""String representation of 18 to 20 digits, where the 20th digit MAY optionally be the padding
|
||||
"""String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding
|
||||
character F."""
|
||||
@classmethod
|
||||
def _encode(cls, data):
|
||||
@@ -46,7 +40,7 @@ class param:
|
||||
|
||||
@classmethod
|
||||
def verify_encoded(cls, data):
|
||||
if len(data) not in (18, 19, 20):
|
||||
if len(data) not in [19, 20]:
|
||||
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
|
||||
|
||||
@classmethod
|
||||
@@ -59,7 +53,7 @@ class param:
|
||||
@classmethod
|
||||
def verify_decoded(cls, data):
|
||||
data = str(data)
|
||||
if len(data) not in (18, 19, 20):
|
||||
if len(data) not in [19, 20]:
|
||||
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
|
||||
if len(data) == 19:
|
||||
decimal_part = data
|
||||
@@ -129,12 +123,10 @@ class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||
class DownloadOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/downloadOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType
|
||||
}
|
||||
input_mandatory = ['header']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'iccid': param.Iccid,
|
||||
@@ -145,7 +137,6 @@ class DownloadOrder(Es2PlusApiFunction):
|
||||
class ConfirmOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/confirmOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
@@ -153,7 +144,7 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
'smdsAddress': param.SmdsAddress,
|
||||
'releaseFlag': param.ReleaseFlag,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'releaseFlag']
|
||||
input_mandatory = ['iccid', 'releaseFlag']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'eid': param.Eid,
|
||||
@@ -166,13 +157,12 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
class CancelOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/cancelOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
|
||||
}
|
||||
input_mandatory = ['header', 'finalProfileStatusIndicator', 'iccid']
|
||||
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -182,10 +172,9 @@ class CancelOrder(Es2PlusApiFunction):
|
||||
class ReleaseProfile(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/releaseProfile'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid']
|
||||
input_mandatory = ['iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -195,7 +184,6 @@ class ReleaseProfile(Es2PlusApiFunction):
|
||||
class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType,
|
||||
@@ -204,9 +192,10 @@ class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
'notificationPointStatus': param.NotificationPointStatus,
|
||||
'resultData': param.ResultData,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
expected_http_status = 204
|
||||
|
||||
|
||||
class Es2pApiClient:
|
||||
"""Main class representing a full ES2+ API client. Has one method for each API function."""
|
||||
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
|
||||
@@ -217,17 +206,18 @@ class Es2pApiClient:
|
||||
if client_cert:
|
||||
self.session.cert = client_cert
|
||||
|
||||
self.downloadOrder = JsonHttpApiClient(DownloadOrder(), url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = JsonHttpApiClient(ConfirmOrder(), url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = JsonHttpApiClient(CancelOrder(), url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = JsonHttpApiClient(ReleaseProfile(), url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = JsonHttpApiClient(HandleDownloadProgressInfo(), url_prefix, func_req_id, self.session)
|
||||
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
|
||||
|
||||
def _gen_func_id(self) -> str:
|
||||
"""Generate the next function call id."""
|
||||
self.func_id += 1
|
||||
return 'FCI-%u-%u' % (time.time(), self.func_id)
|
||||
|
||||
|
||||
def call_downloadOrder(self, data: dict) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(data, self._gen_func_id())
|
||||
@@ -247,116 +237,3 @@ class Es2pApiClient:
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())
|
||||
|
||||
class Es2pApiServerHandlerSmdpp(abc.ABC):
|
||||
"""ES2+ (SMDP+ side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_downloadOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_confirmOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_cancelOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_releaseProfile(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServerHandlerMno(abc.ABC):
|
||||
"""ES2+ (MNO side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServer(abc.ABC):
|
||||
"""Main class representing a full ES2+ API server. Has one method for each API function."""
|
||||
app = None
|
||||
|
||||
def __init__(self, port: int, interface: str, server_cert: str = None, client_cert_verify: str = None):
|
||||
logger.debug("HTTP SRV: starting ES2+ API server on %s:%s" % (interface, port))
|
||||
self.port = port
|
||||
self.interface = interface
|
||||
if server_cert:
|
||||
self.server_cert = ssl.PrivateCertificate.loadPEM(Path(server_cert).read_text())
|
||||
else:
|
||||
self.server_cert = None
|
||||
if client_cert_verify:
|
||||
self.client_cert_verify = ssl.Certificate.loadPEM(Path(client_cert_verify).read_text())
|
||||
else:
|
||||
self.client_cert_verify = None
|
||||
|
||||
def reactor(self, reactor: PosixReactorBase):
|
||||
logger.debug("HTTP SRV: listen on %s:%s" % (self.interface, self.port))
|
||||
if self.server_cert:
|
||||
if self.client_cert_verify:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(self.client_cert_verify),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenTCP(self.port, Site(self.app.resource()), interface=self.interface)
|
||||
return defer.Deferred()
|
||||
|
||||
class Es2pApiServerSmdpp(Es2pApiServer):
|
||||
"""ES2+ (SMDP+ side) API Server."""
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerSmdpp,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.downloadOrder = JsonHttpApiServer(DownloadOrder(), handler.call_downloadOrder)
|
||||
self.confirmOrder = JsonHttpApiServer(ConfirmOrder(), handler.call_confirmOrder)
|
||||
self.cancelOrder = JsonHttpApiServer(CancelOrder(), handler.call_cancelOrder)
|
||||
self.releaseProfile = JsonHttpApiServer(ReleaseProfile(), handler.call_releaseProfile)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(DownloadOrder.path)
|
||||
def call_downloadOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(request)
|
||||
|
||||
@app.route(ConfirmOrder.path)
|
||||
def call_confirmOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
return self.confirmOrder.call(request)
|
||||
|
||||
@app.route(CancelOrder.path)
|
||||
def call_cancelOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
return self.cancelOrder.call(request)
|
||||
|
||||
@app.route(ReleaseProfile.path)
|
||||
def call_releaseProfile(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
return self.releaseProfile.call(request)
|
||||
|
||||
class Es2pApiServerMno(Es2pApiServer):
|
||||
"""ES2+ (MNO side) API Server."""
|
||||
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerMno,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.handleDownloadProgressInfo = JsonHttpApiServer(HandleDownloadProgressInfo(),
|
||||
handler.call_handleDownloadProgressInfo)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(HandleDownloadProgressInfo.path)
|
||||
def call_handleDownloadProgressInfo(self, request: Request) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(request)
|
||||
|
||||
@@ -155,11 +155,11 @@ class Es9pApiClient:
|
||||
if server_cert_verify:
|
||||
self.session.verify = server_cert_verify
|
||||
|
||||
self.initiateAuthentication = JsonHttpApiClient(InitiateAuthentication(), url_prefix, '', self.session)
|
||||
self.authenticateClient = JsonHttpApiClient(AuthenticateClient(), url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = JsonHttpApiClient(GetBoundProfilePackage(), url_prefix, '', self.session)
|
||||
self.handleNotification = JsonHttpApiClient(HandleNotification(), url_prefix, '', self.session)
|
||||
self.cancelSession = JsonHttpApiClient(CancelSession(), url_prefix, '', self.session)
|
||||
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
|
||||
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
|
||||
self.handleNotification = HandleNotification(url_prefix, '', self.session)
|
||||
self.cancelSession = CancelSession(url_prefix, '', self.session)
|
||||
|
||||
def call_initiateAuthentication(self, data: dict) -> dict:
|
||||
return self.initiateAuthentication.call(data)
|
||||
|
||||
@@ -21,8 +21,6 @@ import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
import base64
|
||||
from twisted.web.server import Request
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
@@ -133,16 +131,6 @@ class JsonResponseHeader(ApiParam):
|
||||
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
|
||||
raise ValueError('Unknown/unspecified status "%s"' % status)
|
||||
|
||||
class JsonRequestHeader(ApiParam):
|
||||
"""SGP.22 section 6.5.1.3."""
|
||||
@classmethod
|
||||
def verify_decoded(cls, data):
|
||||
func_req_id = data.get('functionRequesterIdentifier')
|
||||
if not func_req_id:
|
||||
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
|
||||
func_call_id = data.get('functionCallIdentifier')
|
||||
if not func_call_id:
|
||||
raise ValueError('Missing mandatory functionCallIdentifier in header')
|
||||
|
||||
class HttpStatusError(Exception):
|
||||
pass
|
||||
@@ -161,8 +149,7 @@ class ApiError(Exception):
|
||||
'message': None,
|
||||
}
|
||||
actual_sec = func_ex_status.get('statusCodeData', None)
|
||||
if actual_sec:
|
||||
sec.update(actual_sec)
|
||||
sec.update(actual_sec)
|
||||
self.subject_code = sec['subjectCode']
|
||||
self.reason_code = sec['reasonCode']
|
||||
self.subject_id = sec['subjectIdentifier']
|
||||
@@ -173,118 +160,65 @@ class ApiError(Exception):
|
||||
|
||||
class JsonHttpApiFunction(abc.ABC):
|
||||
"""Base class for representing an HTTP[s] API Function."""
|
||||
# The below class variables are used to describe the properties of the API function. Derived classes are expected
|
||||
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
|
||||
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
|
||||
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
|
||||
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
|
||||
# prefix.
|
||||
# the below class variables are expected to be overridden in derived classes
|
||||
|
||||
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder')
|
||||
path = None
|
||||
|
||||
# dictionary of input parameters. key is parameter name, value is ApiParam class
|
||||
input_params = {}
|
||||
|
||||
# list of mandatory input parameters
|
||||
input_mandatory = []
|
||||
|
||||
# dictionary of output parameters. key is parameter name, value is ApiParam class
|
||||
output_params = {}
|
||||
|
||||
# list of mandatory output parameters (for successful response)
|
||||
output_mandatory = []
|
||||
|
||||
# list of mandatory output parameters (for failed response)
|
||||
output_mandatory_failed = []
|
||||
|
||||
# expected HTTP status code of the response
|
||||
expected_http_status = 200
|
||||
|
||||
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
|
||||
http_method = 'POST'
|
||||
|
||||
# additional custom HTTP headers (client requests)
|
||||
extra_http_req_headers = {}
|
||||
|
||||
# additional custom HTTP headers (server responses)
|
||||
extra_http_res_headers = {}
|
||||
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def __new__(cls, *args, role = 'legacy_client', **kwargs):
|
||||
"""
|
||||
Args:
|
||||
args: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
role: role ('server' or 'client') in which the JsonHttpApiFunction should be created.
|
||||
kwargs: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
"""
|
||||
|
||||
# Create a dictionary with the class attributes of this class (the properties listed above and the encode_
|
||||
# decode_ methods below). The dictionary will not include any dunder/magic methods
|
||||
cls_attr = {attr_name: getattr(cls, attr_name) for attr_name in dir(cls) if not attr_name.startswith('__')}
|
||||
|
||||
# Normal instantiation as JsonHttpApiFunction:
|
||||
if len(args) == 0 and len(kwargs) == 0:
|
||||
return type(cls.__name__, (abc.ABC,), cls_attr)()
|
||||
|
||||
# Instantiation as as JsonHttpApiFunction with a JsonHttpApiClient or JsonHttpApiServer base
|
||||
if role == 'legacy_client':
|
||||
# Deprecated: With the advent of the server role (JsonHttpApiServer) the API had to be changed. To maintain
|
||||
# compatibility with existing code (out-of-tree) the original behaviour and API interface and behaviour had
|
||||
# to be preserved. Already existing JsonHttpApiFunction definitions will still work and the related objects
|
||||
# may still be created on the original way: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session)
|
||||
logger.warning('implicit role (falling back to legacy JsonHttpApiClient) is deprecated, please specify role explcitly')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
result.legacy = True
|
||||
return result
|
||||
elif role == 'client':
|
||||
# Create a JsonHttpApiFunction in client role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='client')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
elif role == 'server':
|
||||
# Create a JsonHttpApiFunction in server role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='server')
|
||||
result = type(cls.__name__, (JsonHttpApiServer,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
else:
|
||||
raise ValueError('Invalid role \'%s\' specified' % role)
|
||||
|
||||
def encode_client(self, data: dict) -> dict:
|
||||
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for request body."""
|
||||
output = {}
|
||||
if func_call_id:
|
||||
output['header'] = {
|
||||
'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id
|
||||
}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
# pySim/esim/http_json_api.py:269:47: E1101: Instance of 'JsonHttpApiFunction' has no 'legacy' member (no-member)
|
||||
# pylint: disable=no-member
|
||||
if hasattr(self, 'legacy') and self.legacy:
|
||||
output[p] = JsonRequestHeader.encode(v)
|
||||
else:
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_client(self, data: dict) -> dict:
|
||||
def decode(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
if 'header' in self.output_params:
|
||||
# let's first do the header, it's special
|
||||
if not 'header' in data:
|
||||
raise ValueError('Mandatory output parameter "header" missing')
|
||||
hdr_class = self.output_params.get('header')
|
||||
output['header'] = hdr_class.decode(data['header'])
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
# we can only expect mandatory parameters to be present in case of successful execution
|
||||
for p in self.output_mandatory:
|
||||
if p == 'header':
|
||||
continue
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
@@ -296,167 +230,30 @@ class JsonHttpApiFunction(abc.ABC):
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
def encode_server(self, data: dict) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.output_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_server(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the request body."""
|
||||
output = {}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
class JsonHttpApiClient():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
|
||||
session: requests.Session):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
url_prefix : prefix to be put in front of the API function path (see JsonHttpApiFunction)
|
||||
func_req_id : function requestor id to use for requests
|
||||
session : session object (requests)
|
||||
"""
|
||||
self.api_func = api_func
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
|
||||
"""Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
|
||||
json-serializable dict. Output data is returned as json-deserialized dict."""
|
||||
|
||||
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
|
||||
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
|
||||
# field is checked by the encode_client method)
|
||||
if func_call_id:
|
||||
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_client(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
"""Make an API call to the HTTP API endpoint represented by this object.
|
||||
Input data is passed in `data` as json-serializable dict. Output data
|
||||
is returned as json-deserialized dict."""
|
||||
url = self.url_prefix + self.path
|
||||
encoded = json.dumps(self.encode(data, func_call_id))
|
||||
req_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
req_headers.update(self.api_func.extra_http_req_headers)
|
||||
req_headers.update(self.extra_http_req_headers)
|
||||
|
||||
# Perform HTTP request
|
||||
url = self.url_prefix + self.api_func.path
|
||||
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
|
||||
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
response = self.session.request(self.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
|
||||
logger.debug("HTTP RSP: %s" % (response.content))
|
||||
|
||||
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
|
||||
# SGP.22, section 6.5.1)
|
||||
if response.status_code != self.api_func.expected_http_status:
|
||||
if response.status_code != self.expected_http_status:
|
||||
raise HttpStatusError(response)
|
||||
if response.content and not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
if not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
raise HttpHeaderError(response)
|
||||
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
|
||||
raise HttpHeaderError(response)
|
||||
|
||||
# Decode response and return the result back to the caller
|
||||
if response.content:
|
||||
output = self.api_func.decode_client(response.json())
|
||||
# In case the response contains a header, check it to make sure that the API call was executed successfully
|
||||
# (the presence of the header field is checked by the decode_client method)
|
||||
if 'header' in output:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
return output
|
||||
return self.decode(response.json())
|
||||
return None
|
||||
|
||||
class JsonHttpApiServer():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
call_handler : handler function to process the request. This function must accept the
|
||||
decoded request as a dictionary. The handler function must return a tuple consisting
|
||||
of the response in the form of a dictionary (may be empty), and a function execution
|
||||
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
|
||||
"""
|
||||
self.api_func = api_func
|
||||
if call_handler:
|
||||
self.call_handler = call_handler
|
||||
else:
|
||||
self.call_handler = self.default_handler
|
||||
|
||||
def default_handler(self, data: dict) -> (dict, str):
|
||||
"""default handler, used in case no call handler is provided."""
|
||||
logger.error("no handler function for request: %s" % str(data))
|
||||
return {}, 'Failed'
|
||||
|
||||
def call(self, request: Request) -> str:
|
||||
""" Process an incoming request.
|
||||
Args:
|
||||
request : request object as received using twisted.web.server
|
||||
Returns:
|
||||
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
|
||||
provided the request object)
|
||||
"""
|
||||
|
||||
# Make sure the request is done with the correct HTTP method
|
||||
if (request.method.decode() != self.api_func.http_method):
|
||||
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
|
||||
|
||||
# Decode the request
|
||||
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
|
||||
|
||||
# Run call handler (see above)
|
||||
data, fe_status = self.call_handler(decoded_request)
|
||||
|
||||
# In case a function execution status is returned, use it to generate and prepend the header field according to
|
||||
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
|
||||
if fe_status:
|
||||
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_server(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
res_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
res_headers.update(self.api_func.extra_http_res_headers)
|
||||
for header, value in res_headers.items():
|
||||
request.setHeader(header, value)
|
||||
request.setResponseCode(self.api_func.expected_http_status)
|
||||
|
||||
# Return the encoded result back to the caller for sending (using twisted/klein)
|
||||
return encoded
|
||||
|
||||
|
||||
@@ -21,8 +21,6 @@ import io
|
||||
import os
|
||||
from typing import Tuple, List, Optional, Dict, Union
|
||||
from collections import OrderedDict
|
||||
from difflib import SequenceMatcher, Match
|
||||
|
||||
import asn1tools
|
||||
import zipfile
|
||||
from pySim import javacard
|
||||
@@ -46,29 +44,6 @@ asn1 = compile_asn1_subdir('saip')
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class NonMatch(Match):
|
||||
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
|
||||
@classmethod
|
||||
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
|
||||
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
|
||||
The caller must ensure that the input list is ordered, non-overlapping and only contains
|
||||
matches at equal offsets in a and b."""
|
||||
res = []
|
||||
cur = 0
|
||||
for match in l:
|
||||
if match.a != match.b:
|
||||
raise ValueError('only works for equal-offset matches')
|
||||
assert match.a >= cur
|
||||
nm_len = match.a - cur
|
||||
if nm_len > 0:
|
||||
# there's no point in generating zero-lenth non-matching sections
|
||||
res.append(cls(a=cur, b=cur, size=nm_len))
|
||||
cur = match.a + match.size
|
||||
if size > cur:
|
||||
res.append(cls(a=cur, b=cur, size=size-cur))
|
||||
|
||||
return res
|
||||
|
||||
class Naa:
|
||||
"""A class defining a Network Access Application (NAA)"""
|
||||
name = None
|
||||
@@ -151,8 +126,6 @@ class File:
|
||||
self.df_name = None
|
||||
self.fill_pattern = None
|
||||
self.fill_pattern_repeat = False
|
||||
self.pstdo = None # pinStatusTemplateDO, mandatory for DF/ADF
|
||||
self.lcsi = None # optional life cycle status indicator
|
||||
# apply some defaults from profile
|
||||
if self.template:
|
||||
self.from_template(self.template)
|
||||
@@ -171,9 +144,6 @@ class File:
|
||||
def file_size(self) -> Optional[int]:
|
||||
"""Return the size of the file in bytes."""
|
||||
if self.file_type in ['LF', 'CY']:
|
||||
if self._file_size and self.nb_rec is None and self.rec_len:
|
||||
self.nb_rec = self._file_size // self.rec_len
|
||||
|
||||
return self.nb_rec * self.rec_len
|
||||
elif self.file_type in ['TR', 'BT']:
|
||||
return self._file_size
|
||||
@@ -213,7 +183,7 @@ class File:
|
||||
self.file_type = template.file_type
|
||||
self.fid = template.fid
|
||||
self.sfi = template.sfi
|
||||
self.arr = template.arr.to_bytes(1, 'big')
|
||||
self.arr = template.arr.to_bytes(1)
|
||||
if hasattr(template, 'rec_len'):
|
||||
self.rec_len = template.rec_len
|
||||
else:
|
||||
@@ -257,7 +227,7 @@ class File:
|
||||
fileDescriptor['shortEFID'] = bytes([self.sfi])
|
||||
if self.df_name:
|
||||
fileDescriptor['dfName'] = self.df_name
|
||||
if self.arr and self.arr != self.template.arr.to_bytes(1, 'big'):
|
||||
if self.arr and self.arr != self.template.arr.to_bytes(1):
|
||||
fileDescriptor['securityAttributesReferenced'] = self.arr
|
||||
if self.file_type in ['LF', 'CY']:
|
||||
fdb_dec['file_type'] = 'working_ef'
|
||||
@@ -280,8 +250,6 @@ class File:
|
||||
elif self.file_type in ['MF', 'DF', 'ADF']:
|
||||
fdb_dec['file_type'] = 'df'
|
||||
fdb_dec['structure'] = 'no_info_given'
|
||||
# pinStatusTemplateDO is mandatory for DF/ADF
|
||||
fileDescriptor['pinStatusTemplateDO'] = self.pstdo
|
||||
# build file descriptor based on above input data
|
||||
fd_dict = {}
|
||||
if len(fdb_dec):
|
||||
@@ -296,7 +264,7 @@ class File:
|
||||
if self.read_and_update_when_deact:
|
||||
spfi |= 0x40 # TS 102 222 Table 5
|
||||
if spfi != 0x00:
|
||||
pefi['specialFileInformation'] = spfi.to_bytes(1, 'big')
|
||||
pefi['specialFileInformation'] = spfi.to_bytes(1)
|
||||
if self.fill_pattern:
|
||||
if not self.fill_pattern_repeat:
|
||||
pefi['fillPattern'] = self.fill_pattern
|
||||
@@ -308,8 +276,6 @@ class File:
|
||||
# desired fill or repeat pattern in the "proprietaryEFInfo" element for the EF in Profiles
|
||||
# downloaded to a V2.2 or earlier eUICC.
|
||||
fileDescriptor['proprietaryEFInfo'] = pefi
|
||||
if self.lcsi:
|
||||
fileDescriptor['lcsi'] = self.lcsi
|
||||
logger.debug("%s: to_fileDescriptor(%s)" % (self, fileDescriptor))
|
||||
return fileDescriptor
|
||||
|
||||
@@ -325,12 +291,6 @@ class File:
|
||||
dfName = fileDescriptor.get('dfName', None)
|
||||
if dfName:
|
||||
self.df_name = dfName
|
||||
efFileSize = fileDescriptor.get('efFileSize', None)
|
||||
if efFileSize:
|
||||
self._file_size = self._decode_file_size(efFileSize)
|
||||
|
||||
self.pstdo = fileDescriptor.get('pinStatusTemplateDO', None)
|
||||
self.lcsi = fileDescriptor.get('lcsi', None)
|
||||
pefi = fileDescriptor.get('proprietaryEFInfo', {})
|
||||
securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None)
|
||||
if securityAttributesReferenced:
|
||||
@@ -340,11 +300,13 @@ class File:
|
||||
fdb_dec = fd_dec['file_descriptor_byte']
|
||||
self.shareable = fdb_dec['shareable']
|
||||
if fdb_dec['file_type'] == 'working_ef':
|
||||
efFileSize = fileDescriptor.get('efFileSize', None)
|
||||
if fd_dec['num_of_rec']:
|
||||
self.nb_rec = fd_dec['num_of_rec']
|
||||
if fd_dec['record_len']:
|
||||
self.rec_len = fd_dec['record_len']
|
||||
if efFileSize:
|
||||
self._file_size = self._decode_file_size(efFileSize)
|
||||
if self.rec_len and self.nb_rec == None:
|
||||
# compute the number of records from file size and record length
|
||||
self.nb_rec = self._file_size // self.rec_len
|
||||
@@ -444,40 +406,12 @@ class File:
|
||||
return ValueError("Unknown key '%s' in tuple list" % k)
|
||||
return stream.getvalue()
|
||||
|
||||
def file_content_to_tuples(self, optimize:bool = False) -> List[Tuple]:
|
||||
"""Encode the file contents into a list of fillFileContent / fillFileOffset tuples that can be fed
|
||||
into the asn.1 encoder. If optimize is True, it will try to encode only the differences from the
|
||||
fillFileContent of the profile template. Otherwise, the entire file contents will be encoded
|
||||
as-is."""
|
||||
if not self.file_type in ['TR', 'LF', 'CY', 'BT']:
|
||||
return []
|
||||
if not optimize:
|
||||
# simplistic approach: encode the full file, ignoring the template/default
|
||||
return [('fillFileContent', self.body)]
|
||||
# Try to 'compress' the file body, based on the default file contents.
|
||||
if self.template:
|
||||
default = self.template.expand_default_value_pattern(length=len(self.body))
|
||||
if not default:
|
||||
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
||||
else:
|
||||
if default == self.body:
|
||||
# 100% match: return an empty tuple list to make eUICC use the default
|
||||
return []
|
||||
sm = SequenceMatcher(a=default, b=self.body)
|
||||
else:
|
||||
# no template at all: we can only remove padding
|
||||
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
||||
matching_blocks = sm.get_matching_blocks()
|
||||
# we can only make use of matches that have the same offset in 'a' and 'b'
|
||||
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
|
||||
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
|
||||
ret = []
|
||||
cur = 0
|
||||
for block in non_matching_blocks:
|
||||
ret.append(('fillFileOffset', block.a - cur))
|
||||
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
|
||||
cur += block.size
|
||||
return ret
|
||||
def file_content_to_tuples(self) -> List[Tuple]:
|
||||
# FIXME: simplistic approach. needs optimization. We should first check if the content
|
||||
# matches the expanded default value from the template. If it does, return empty list.
|
||||
# Next, we should compute the diff between the default value and self.body, and encode
|
||||
# that as a sequence of fillFileOffset and fillFileContent tuples.
|
||||
return [('fillFileContent', self.body)]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "File(%s)" % self.pe_name
|
||||
@@ -699,15 +633,8 @@ class FsProfileElement(ProfileElement):
|
||||
self.pe_sequence.cur_df = pe_df
|
||||
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
|
||||
|
||||
def file2pe(self, file: File):
|
||||
"""Update the "decoded" member for the given file with the contents from the given File instance.
|
||||
We expect that the File instance is part of self.files"""
|
||||
if self.files[file.pe_name] != file:
|
||||
raise ValueError("The file you passed is not part of this ProfileElement")
|
||||
self.decoded[file.pe_name] = file.to_tuples()
|
||||
|
||||
def files2pe(self):
|
||||
"""Update the "decoded" member for each file with the contents of the "files" member."""
|
||||
"""Update the "decoded" member with the contents of the "files" member."""
|
||||
for k, f in self.files.items():
|
||||
self.decoded[k] = f.to_tuples()
|
||||
|
||||
@@ -1079,6 +1006,13 @@ class SecurityDomainKey:
|
||||
'keyVersionNumber': bytes([self.key_version_number]),
|
||||
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
||||
|
||||
def get_key_component(self, key_type):
|
||||
for kc in self.key_components:
|
||||
if kc.key_type == key_type:
|
||||
return kc.key_data
|
||||
return None
|
||||
|
||||
|
||||
class ProfileElementSD(ProfileElement):
|
||||
"""Class representing a securityDomain ProfileElement."""
|
||||
type = 'securityDomain'
|
||||
@@ -1093,7 +1027,6 @@ class ProfileElementSD(ProfileElement):
|
||||
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
||||
super().__init__(decoded, **kwargs)
|
||||
if decoded:
|
||||
self._post_decode()
|
||||
return
|
||||
# provide some reasonable defaults for a MNO-SD
|
||||
self.decoded['instance'] = {
|
||||
@@ -1812,7 +1745,8 @@ class ProfileElementSequence:
|
||||
del hdr.decoded['eUICC-Mandatory-services'][service]
|
||||
# remove any associated mandatory filesystem templates
|
||||
for template in naa.templates:
|
||||
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
|
||||
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
|
||||
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
|
||||
# determine the ADF names (AIDs) of all NAA ADFs
|
||||
naa_adf_names = []
|
||||
if naa.pe_types[0] in self.pe_by_type:
|
||||
@@ -1855,7 +1789,7 @@ class ProfileElementSequence:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def peclass_for_path(path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]:
|
||||
def peclass_for_path(path: Path) -> Optional[ProfileElement]:
|
||||
"""Return the ProfileElement class that can contain a file with given path."""
|
||||
naa = ProfileElementSequence.naa_for_path(path)
|
||||
if naa:
|
||||
@@ -1888,7 +1822,7 @@ class ProfileElementSequence:
|
||||
return ProfileElementTelecom, ft
|
||||
return ProfileElementGFM, None
|
||||
|
||||
def pe_for_path(self, path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]:
|
||||
def pe_for_path(self, path: Path) -> Optional[ProfileElement]:
|
||||
"""Return the ProfileElement instance that can contain a file with matching path. This will
|
||||
either be an existing PE within the sequence, or it will be a newly-allocated PE that is
|
||||
inserted into the sequence."""
|
||||
@@ -1954,10 +1888,7 @@ class ProfileElementSequence:
|
||||
|
||||
|
||||
class FsNode:
|
||||
"""A node in the filesystem hierarchy. Each node can have a parent node and any number of children.
|
||||
Each node is identified uniquely within the parent by its numeric FID and its optional human-readable
|
||||
name. Each node usually is associated with an instance of the File class for the actual content of
|
||||
the file. FsNode is the base class used by more specific nodes, such as FsNode{EF,DF,ADF,MF}."""
|
||||
"""A node in the filesystem hierarchy."""
|
||||
def __init__(self, fid: int, parent: Optional['FsNode'], file: Optional[File] = None,
|
||||
name: Optional[str] = None):
|
||||
self.fid = fid
|
||||
@@ -2012,7 +1943,7 @@ class FsNode:
|
||||
return x
|
||||
|
||||
def walk(self, fn, **kwargs):
|
||||
"""call 'fn(self, ``**kwargs``) for the File."""
|
||||
"""call 'fn(self, **kwargs) for the File."""
|
||||
return [fn(self, **kwargs)]
|
||||
|
||||
class FsNodeEF(FsNode):
|
||||
@@ -2102,7 +2033,7 @@ class FsNodeDF(FsNode):
|
||||
return cur
|
||||
|
||||
def walk(self, fn, **kwargs):
|
||||
"""call 'fn(self, ``**kwargs``) for the DF and recursively for all children."""
|
||||
"""call 'fn(self, **kwargs) for the DF and recursively for all children."""
|
||||
ret = super().walk(fn, **kwargs)
|
||||
for c in self.children.values():
|
||||
ret += c.walk(fn, **kwargs)
|
||||
|
||||
237
pySim/esim/saip/param_source.py
Normal file
237
pySim/esim/saip/param_source.py
Normal file
@@ -0,0 +1,237 @@
|
||||
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
|
||||
#
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: nhofmeyr@sysmocom.de
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import secrets
|
||||
import re
|
||||
from pySim.utils import all_subclasses_of
|
||||
from osmocom.utils import b2h
|
||||
|
||||
class ParamSourceExn(Exception):
|
||||
pass
|
||||
|
||||
class ParamSourceExhaustedExn(ParamSourceExn):
|
||||
pass
|
||||
|
||||
class ParamSourceUndefinedExn(ParamSourceExn):
|
||||
pass
|
||||
|
||||
class ParamSource:
|
||||
'abstract parameter source. For usage, see personalization.BatchPersonalization.'
|
||||
is_abstract = True
|
||||
|
||||
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
|
||||
name = 'none'
|
||||
|
||||
@classmethod
|
||||
def get_all_implementations(cls, blacklist=None):
|
||||
"return all subclasses of ParamSource that have is_abstract = False."
|
||||
# return a set() so that multiple inheritance does not return dups
|
||||
return set(c
|
||||
for c in all_subclasses_of(cls)
|
||||
if (not c.is_abstract) and ((not blacklist) or (c not in blacklist))
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
'''Subclasses implement this:
|
||||
if a parameter source defines some string input magic, override this function.
|
||||
For example, a RandomDigitSource derives the number of digits from the string length,
|
||||
so the user can enter '0000' to get a four digit random number.'''
|
||||
return cls(s)
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
'''Subclasses implement this: return the next value from the parameter source.
|
||||
When there are no more values from the source, raise a ParamSourceExhaustedExn.'''
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
|
||||
class ConstantSource(ParamSource):
|
||||
'one value for all'
|
||||
is_abstract = False
|
||||
name = 'constant'
|
||||
|
||||
def __init__(self, val:str):
|
||||
self.val = val
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
return self.val
|
||||
|
||||
class InputExpandingParamSource(ParamSource):
|
||||
|
||||
@classmethod
|
||||
def expand_str(cls, s:str):
|
||||
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
|
||||
if '*' not in s:
|
||||
return s
|
||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
|
||||
if len(tokens) < 3:
|
||||
return s
|
||||
parts = []
|
||||
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
|
||||
parts.append(unchanged)
|
||||
repeat = int(repeat_str)
|
||||
parts.append(snippet * repeat)
|
||||
return ''.join(parts)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
return cls(cls.expand_str(s))
|
||||
|
||||
class RandomSourceMixin:
|
||||
random_impl = secrets.SystemRandom()
|
||||
|
||||
class RandomDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
'return a different sequence of random decimal digits each'
|
||||
is_abstract = False
|
||||
name = 'random decimal digits'
|
||||
used_keys = set()
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
"""
|
||||
See also from_str().
|
||||
|
||||
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
|
||||
num_digits: number of random digits (possibly with leading zeros) to generate.
|
||||
first_value, last_value: the decimal range in which to provide random digits.
|
||||
"""
|
||||
num_digits = int(num_digits)
|
||||
first_value = int(first_value)
|
||||
last_value = int(last_value)
|
||||
assert num_digits > 0
|
||||
assert first_value <= last_value
|
||||
self.num_digits = num_digits
|
||||
self.val_first_last = (first_value, last_value)
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
# try to generate random digits that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randint(*self.val_first_last)
|
||||
if val in RandomDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
continue
|
||||
RandomDigitSource.used_keys.add(val)
|
||||
break
|
||||
return self.val_to_digit(val)
|
||||
|
||||
def val_to_digit(self, val:int):
|
||||
return '%0*d' % (self.num_digits, val) # pylint: disable=consider-using-f-string
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
|
||||
if '..' in s:
|
||||
first_str, last_str = s.split('..')
|
||||
first_str = first_str.strip()
|
||||
last_str = last_str.strip()
|
||||
else:
|
||||
first_str = s.strip()
|
||||
last_str = None
|
||||
|
||||
first_value = int(first_str)
|
||||
last_value = int(last_str) if last_str is not None else '9' * len(first_str)
|
||||
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
|
||||
|
||||
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
'return a different sequence of random hexadecimal digits each'
|
||||
is_abstract = False
|
||||
name = 'random hexadecimal digits'
|
||||
used_keys = set()
|
||||
|
||||
def __init__(self, num_digits):
|
||||
'see from_str()'
|
||||
num_digits = int(num_digits)
|
||||
if num_digits < 1:
|
||||
raise ValueError('zero number of digits')
|
||||
# hex digits always come in two
|
||||
if (num_digits & 1) != 0:
|
||||
raise ValueError(f'hexadecimal value should have even number of digits, not {num_digits}')
|
||||
self.num_digits = num_digits
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
# try to generate random bytes that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randbytes(self.num_digits // 2)
|
||||
if val in RandomHexDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
continue
|
||||
RandomHexDigitSource.used_keys.add(val)
|
||||
break
|
||||
|
||||
return b2h(val)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
return cls(num_digits=len(s.strip()))
|
||||
|
||||
class IncDigitSource(RandomDigitSource):
|
||||
'incrementing sequence of digits'
|
||||
is_abstract = False
|
||||
name = 'incrementing decimal digits'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"The arguments defining the number of digits and value range are identical to RandomDigitSource.__init__()."
|
||||
super().__init__(*args, **kwargs)
|
||||
self.next_val = None
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
"Restart from the first value of the defined range passed to __init__()."
|
||||
self.next_val = self.val_first_last[0]
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = self.next_val
|
||||
if val is None:
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
returnval = self.val_to_digit(val)
|
||||
|
||||
val += 1
|
||||
if val > self.val_first_last[1]:
|
||||
self.next_val = None
|
||||
else:
|
||||
self.next_val = val
|
||||
|
||||
return returnval
|
||||
|
||||
class CsvSource(ParamSource):
|
||||
'apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)'
|
||||
is_abstract = False
|
||||
name = 'from CSV'
|
||||
|
||||
def __init__(self, csv_column):
|
||||
"""
|
||||
csv_column: column name indicating the column to use for this parameter.
|
||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
||||
CsvSource picks the column with the name matching csv_column.
|
||||
"""
|
||||
self.csv_column = csv_column
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = None
|
||||
if csv_row:
|
||||
val = csv_row.get(self.csv_column)
|
||||
if not val:
|
||||
raise ParamSourceUndefinedExn(f'no value for CSV column {self.csv_column!r}')
|
||||
return val
|
||||
File diff suppressed because it is too large
Load Diff
@@ -673,7 +673,7 @@ class FilesUsimDf5GS(ProfileTemplate):
|
||||
FileTemplate(0x4f06, 'EF.UAC_AIC', 'TR', None, 4, 2, 0x06, None, True, ass_serv=[126]),
|
||||
FileTemplate(0x4f07, 'EF.SUCI_Calc_Info', 'TR', None, None, 2, 0x07, 'FF...FF', False, ass_serv=[124]),
|
||||
FileTemplate(0x4f08, 'EF.OPL5G', 'LF', None, 10, 10, 0x08, 'FF...FF', False, ['nb_rec'], ass_serv=[129]),
|
||||
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130], pe_name='ef-supinai'),
|
||||
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130]),
|
||||
FileTemplate(0x4f0a, 'EF.Routing_Indicator', 'TR', None, 4, 2, 0x0a, 'F0FFFFFF', False, ass_serv=[124]),
|
||||
]
|
||||
|
||||
@@ -818,7 +818,7 @@ class FilesIsimOptional(ProfileTemplate):
|
||||
base_path = Path('ADF.ISIM')
|
||||
extends = FilesIsimMandatory
|
||||
files = [
|
||||
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5], pe_name='ef-pcscf'),
|
||||
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5]),
|
||||
FileTemplate(0x6f3c, 'EF.SMS', 'LF', 10, 176, 5, None, '00FF...FF', False, ass_serv=[6,8]),
|
||||
FileTemplate(0x6f42, 'EF.SMSP', 'LF', 1, 38, 5, None, 'FF...FF', False, ass_serv=[8]),
|
||||
FileTemplate(0x6f43, 'EF.SMSS', 'TR', None, 2, 5, None, 'FFFF', False, ass_serv=[6,8]),
|
||||
|
||||
@@ -181,7 +181,7 @@ class SeqNumber(BER_TLV_IE, tag=0x80):
|
||||
class NotificationAddress(BER_TLV_IE, tag=0x0c):
|
||||
_construct = Utf8Adapter(GreedyBytes)
|
||||
class Iccid(BER_TLV_IE, tag=0x5a):
|
||||
_construct = PaddedBcdAdapter(GreedyBytes)
|
||||
_construct = BcdAdapter(GreedyBytes)
|
||||
class NotificationMetadata(BER_TLV_IE, tag=0xbf2f, nested=[SeqNumber, ProfileMgmtOperation,
|
||||
NotificationAddress, Iccid]):
|
||||
pass
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# GlobalPlatform install parameter generator
|
||||
#
|
||||
# (C) 2024 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2024 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
|
||||
@@ -266,13 +266,11 @@ class SCP02(SCP):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def dek_encrypt(self, plaintext:bytes) -> bytes:
|
||||
# See also GPC section B.1.1.2, E.4.7, and E.4.1
|
||||
cipher = DES3.new(self.sk.data_enc, DES.MODE_ECB)
|
||||
cipher = DES.new(self.card_keys.dek[:8], DES.MODE_ECB)
|
||||
return cipher.encrypt(plaintext)
|
||||
|
||||
def dek_decrypt(self, ciphertext:bytes) -> bytes:
|
||||
# See also GPC section B.1.1.2, E.4.7, and E.4.1
|
||||
cipher = DES3.new(self.sk.data_enc, DES.MODE_ECB)
|
||||
cipher = DES.new(self.card_keys.dek[:8], DES.MODE_ECB)
|
||||
return cipher.decrypt(ciphertext)
|
||||
|
||||
def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes):
|
||||
|
||||
@@ -91,6 +91,7 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
||||
|
||||
# Key Usage:
|
||||
# KVN 0x01 .. 0x0F reserved for SCP80
|
||||
# KVN 0x81 .. 0x8f reserved for SCP81
|
||||
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
|
||||
# KVN 0x20 .. 0x2F reserved for SCP02
|
||||
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# JavaCard related utilities
|
||||
#
|
||||
# (C) 2024 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2024 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
"""
|
||||
|
||||
#
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||
@@ -44,7 +44,7 @@ class PySimLogger:
|
||||
"""
|
||||
|
||||
LOG_FMTSTR = "%(levelname)s: %(message)s"
|
||||
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- " + LOG_FMTSTR
|
||||
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
|
||||
__formatter = logging.Formatter(LOG_FMTSTR)
|
||||
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
|
||||
|
||||
@@ -108,10 +108,7 @@ class PySimLogger:
|
||||
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
|
||||
color = PySimLogger.colors.get(record.levelno)
|
||||
if color:
|
||||
if isinstance(color, str):
|
||||
PySimLogger.print_callback(color + formatted_message + "\033[0m")
|
||||
else:
|
||||
PySimLogger.print_callback(style(formatted_message, fg = color))
|
||||
PySimLogger.print_callback(style(formatted_message, fg = color))
|
||||
else:
|
||||
PySimLogger.print_callback(formatted_message)
|
||||
|
||||
|
||||
@@ -57,13 +57,12 @@ CompactRemoteResp = Struct('number_of_commands'/Int8ub,
|
||||
'last_response_data'/HexAdapter(GreedyBytes))
|
||||
|
||||
RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
|
||||
CNTR_REQ = Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, counter_must_be_higher=2, counter_must_be_lower=3)
|
||||
POR_REQ = Enum(BitsInteger(2), no_por=0, por_required=1, por_only_when_error=2)
|
||||
|
||||
# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
|
||||
SPI = BitStruct( # first octet
|
||||
Padding(3),
|
||||
'counter'/CNTR_REQ,
|
||||
'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
|
||||
counter_must_be_higher=2, counter_must_be_lower=3),
|
||||
'ciphering'/Flag,
|
||||
'rc_cc_ds'/RC_CC_DS,
|
||||
# second octet
|
||||
@@ -71,7 +70,8 @@ SPI = BitStruct( # first octet
|
||||
'por_in_submit'/Flag,
|
||||
'por_shall_be_ciphered'/Flag,
|
||||
'por_rc_cc_ds'/RC_CC_DS,
|
||||
'por'/POR_REQ
|
||||
'por'/Enum(BitsInteger(2), no_por=0,
|
||||
por_required=1, por_only_when_error=2)
|
||||
)
|
||||
|
||||
# TS 102 225 Section 5.1.2
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
"""
|
||||
|
||||
#
|
||||
# (C) 2021 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2021 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
# coding=utf-8
|
||||
|
||||
"""Representation of the runtime state of an application like pySim-shell.
|
||||
"""
|
||||
|
||||
@@ -26,7 +25,7 @@ from pySim.exceptions import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.log import PySimLogger
|
||||
|
||||
log = PySimLogger.get(__name__)
|
||||
log = PySimLogger.get("RUNTIME")
|
||||
|
||||
def lchan_nr_from_cla(cla: int) -> int:
|
||||
"""Resolve the logical channel number from the CLA byte."""
|
||||
@@ -116,7 +115,7 @@ class RuntimeState:
|
||||
for a in aids_unknown:
|
||||
log.info(" unknown: %s (EF.DIR)" % a)
|
||||
else:
|
||||
log.warning("EF.DIR seems to be empty!")
|
||||
log.warn("EF.DIR seems to be empty!")
|
||||
|
||||
# Some card applications may not be registered in EF.DIR, we will actively
|
||||
# probe for those applications
|
||||
@@ -557,8 +556,8 @@ class RuntimeLchan:
|
||||
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
|
||||
(data_len, writeable_name, writeable_size, data_len - writeable_size))
|
||||
elif data_len < writeable_size:
|
||||
log.warning("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
|
||||
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
|
||||
log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
|
||||
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
|
||||
|
||||
def update_binary(self, data_hex: str, offset: int = 0):
|
||||
"""Update transparent EF binary data.
|
||||
|
||||
@@ -3,6 +3,18 @@
|
||||
""" pySim: PCSC reader transport link base
|
||||
"""
|
||||
|
||||
import os
|
||||
import abc
|
||||
import argparse
|
||||
from typing import Optional, Tuple
|
||||
from construct import Construct
|
||||
from osmocom.utils import b2h, h2b, i2h, Hexstr
|
||||
|
||||
from pySim.exceptions import *
|
||||
from pySim.utils import SwHexstr, SwMatchstr, ResTuple, sw_match, parse_command_apdu
|
||||
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
|
||||
|
||||
#
|
||||
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
|
||||
# Copyright (C) 2021-2023 Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
@@ -18,20 +30,8 @@
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import os
|
||||
import abc
|
||||
import argparse
|
||||
from typing import Optional, Tuple
|
||||
from construct import Construct
|
||||
from osmocom.utils import b2h, h2b, i2h, Hexstr
|
||||
|
||||
from pySim.exceptions import *
|
||||
from pySim.utils import SwHexstr, SwMatchstr, ResTuple, sw_match, parse_command_apdu
|
||||
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
|
||||
from pySim.log import PySimLogger
|
||||
|
||||
log = PySimLogger.get(__name__)
|
||||
|
||||
class ApduTracer:
|
||||
def trace_command(self, cmd):
|
||||
@@ -46,11 +46,11 @@ class ApduTracer:
|
||||
class StdoutApduTracer(ApduTracer):
|
||||
"""Minimalistic APDU tracer, printing commands to stdout."""
|
||||
def trace_response(self, cmd, sw, resp):
|
||||
log.info("-> %s %s", cmd[:10], cmd[10:])
|
||||
log.info("<- %s: %s", sw, resp)
|
||||
print("-> %s %s" % (cmd[:10], cmd[10:]))
|
||||
print("<- %s: %s" % (sw, resp))
|
||||
|
||||
def trace_reset(self):
|
||||
log.info("-- RESET")
|
||||
print("-- RESET")
|
||||
|
||||
class ProactiveHandler(abc.ABC):
|
||||
"""Abstract base class representing the interface of some code that handles
|
||||
@@ -177,7 +177,7 @@ class LinkBase(abc.ABC):
|
||||
if self.apdu_strict:
|
||||
raise ValueError(exeption_str)
|
||||
else:
|
||||
log.warning(exeption_str)
|
||||
print('Warning: %s' % exeption_str)
|
||||
|
||||
return (data, sw)
|
||||
|
||||
@@ -211,7 +211,7 @@ class LinkBase(abc.ABC):
|
||||
# parse the proactive command
|
||||
pcmd = ProactiveCommand()
|
||||
parsed = pcmd.from_tlv(h2b(fetch_rv[0]))
|
||||
log.info("FETCH: %s (%s)", fetch_rv[0], type(parsed).__name__)
|
||||
print("FETCH: %s (%s)" % (fetch_rv[0], type(parsed).__name__))
|
||||
if self.proactive_handler:
|
||||
# Extension point: If this does return a list of TLV objects,
|
||||
# they could be appended after the Result; if the first is a
|
||||
@@ -361,13 +361,13 @@ def init_reader(opts, **kwargs) -> LinkBase:
|
||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
sl = ModemATCommandLink(opts, **kwargs)
|
||||
else: # Serial reader is default
|
||||
log.warning("No reader/driver specified; falling back to default (Serial reader)")
|
||||
print("No reader/driver specified; falling back to default (Serial reader)")
|
||||
from pySim.transport.serial import SerialSimLink
|
||||
sl = SerialSimLink(opts, **kwargs)
|
||||
|
||||
if os.environ.get('PYSIM_INTEGRATION_TEST') == "1":
|
||||
log.info("Using %s reader interface" % (sl.name))
|
||||
print("Using %s reader interface" % (sl.name))
|
||||
else:
|
||||
log.info("Using reader %s" % sl)
|
||||
print("Using reader %s" % sl)
|
||||
|
||||
return sl
|
||||
|
||||
@@ -166,7 +166,7 @@ class ModemATCommandLink(LinkBaseTpdu):
|
||||
|
||||
# Make sure that the response has format: b'+CSIM: %d,\"%s\"'
|
||||
try:
|
||||
result = re.match(rb'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
|
||||
result = re.match(b'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
|
||||
(_rsp_tpdu_len, rsp_tpdu) = result.groups()
|
||||
except Exception as exc:
|
||||
raise ReaderError('Failed to parse response from modem: %s' % rsp) from exc
|
||||
|
||||
@@ -486,17 +486,17 @@ class EF_UST(EF_UServiceTable):
|
||||
# TS 31.103 Section 4.2.7 - *not* the same as DF.GSM/EF.ECC!
|
||||
class EF_ECC(LinFixedEF):
|
||||
_test_de_encode = [
|
||||
( '19f1ff01', { "call_code": "911",
|
||||
( '19f1ff01', { "call_code": "911f",
|
||||
"service_category": { "police": True, "ambulance": False, "fire_brigade": False,
|
||||
"marine_guard": False, "mountain_rescue": False,
|
||||
"manual_ecall": False, "automatic_ecall": False } } ),
|
||||
( '19f3ff02', { "call_code": "913",
|
||||
( '19f3ff02', { "call_code": "913f",
|
||||
"service_category": { "police": False, "ambulance": True, "fire_brigade": False,
|
||||
"marine_guard": False, "mountain_rescue": False,
|
||||
"manual_ecall": False, "automatic_ecall": False } } ),
|
||||
]
|
||||
_test_no_pad = True
|
||||
cc_construct = PaddedBcdAdapter(Rpad(Bytes(3)))
|
||||
cc_construct = BcdAdapter(Rpad(Bytes(3)))
|
||||
category_construct = FlagsEnum(Byte, police=1, ambulance=2, fire_brigade=3, marine_guard=4,
|
||||
mountain_rescue=5, manual_ecall=6, automatic_ecall=7)
|
||||
alpha_construct = GsmOrUcs2Adapter(Rpad(GreedyBytes))
|
||||
@@ -596,7 +596,7 @@ class EF_ICI(CyclicEF):
|
||||
self._construct = Struct('alpha_id'/Bytes(this._.total_len-28),
|
||||
'len_of_bcd_contents'/Int8ub,
|
||||
'ton_npi'/Int8ub,
|
||||
'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))),
|
||||
'call_number'/BcdAdapter(Bytes(10)),
|
||||
'cap_cfg2_record_id'/Int8ub,
|
||||
'ext5_record_id'/Int8ub,
|
||||
'date_and_time'/BcdAdapter(Bytes(7)),
|
||||
@@ -612,7 +612,7 @@ class EF_OCI(CyclicEF):
|
||||
self._construct = Struct('alpha_id'/Bytes(this._.total_len-27),
|
||||
'len_of_bcd_contents'/Int8ub,
|
||||
'ton_npi'/Int8ub,
|
||||
'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))),
|
||||
'call_number'/BcdAdapter(Bytes(10)),
|
||||
'cap_cfg2_record_id'/Int8ub,
|
||||
'ext5_record_id'/Int8ub,
|
||||
'date_and_time'/BcdAdapter(Bytes(7)),
|
||||
@@ -1118,7 +1118,7 @@ class EF_Routing_Indicator(TransparentEF):
|
||||
# responsibility of home network operator but BCD coding shall be used. If a network
|
||||
# operator decides to assign less than 4 digits to Routing Indicator, the remaining digits
|
||||
# shall be coded as "1111" to fill the 4 digits coding of Routing Indicator
|
||||
self._construct = Struct('routing_indicator'/PaddedBcdAdapter(Rpad(Bytes(2))),
|
||||
self._construct = Struct('routing_indicator'/Rpad(BcdAdapter(Bytes(2)), 'f', 2),
|
||||
'rfu'/Bytes(2))
|
||||
|
||||
# TS 31.102 Section 4.4.11.13 (Rel 16)
|
||||
|
||||
@@ -40,7 +40,6 @@ from osmocom.utils import *
|
||||
from osmocom.construct import *
|
||||
|
||||
from pySim.utils import dec_iccid, enc_iccid, dec_imsi, enc_imsi, dec_plmn, enc_plmn, dec_xplmn_w_act
|
||||
from pySim.utils import bytes_for_nibbles
|
||||
from pySim.profile import CardProfile, CardProfileAddon
|
||||
from pySim.filesystem import *
|
||||
from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X
|
||||
@@ -152,7 +151,7 @@ class EF_ADN(LinFixedEF):
|
||||
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))),
|
||||
'len_of_bcd'/Int8ub,
|
||||
'ton_npi'/TonNpi,
|
||||
'dialing_nr'/ExtendedBcdAdapter(PaddedBcdAdapter(Rpad(Bytes(10)))),
|
||||
'dialing_nr'/ExtendedBcdAdapter(BcdAdapter(Rpad(Bytes(10)))),
|
||||
'cap_conf_id'/Int8ub,
|
||||
ext_name/Int8ub)
|
||||
|
||||
@@ -193,11 +192,11 @@ class EF_MSISDN(LinFixedEF):
|
||||
( 'ffffffffffffffffffffffffffffffffffffffff04b12143f5ffffffffffffffffff',
|
||||
{"alpha_id": "", "len_of_bcd": 4, "ton_npi": {"ext": True, "type_of_number": "network_specific",
|
||||
"numbering_plan_id": "isdn_e164"},
|
||||
"dialing_nr": "12345"}),
|
||||
"dialing_nr": "12345f"}),
|
||||
( '456967656e65205275666e756d6d6572ffffffff0891947172199181f3ffffffffff',
|
||||
{"alpha_id": "Eigene Rufnummer", "len_of_bcd": 8, "ton_npi": {"ext": True, "type_of_number": "international",
|
||||
"numbering_plan_id": "isdn_e164"},
|
||||
"dialing_nr": "4917279119183"}),
|
||||
"dialing_nr": "4917279119183f"}),
|
||||
]
|
||||
|
||||
# Ensure deprecated representations still work
|
||||
@@ -215,7 +214,7 @@ class EF_MSISDN(LinFixedEF):
|
||||
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-14)))),
|
||||
'len_of_bcd'/Int8ub,
|
||||
'ton_npi'/TonNpi,
|
||||
'dialing_nr'/ExtendedBcdAdapter(PaddedBcdAdapter(Rpad(Bytes(10)))),
|
||||
'dialing_nr'/ExtendedBcdAdapter(BcdAdapter(Rpad(Bytes(10)))),
|
||||
Padding(2, pattern=b'\xff'))
|
||||
|
||||
# Maintain compatibility with deprecated representations
|
||||
@@ -240,20 +239,11 @@ class EF_MSISDN(LinFixedEF):
|
||||
|
||||
# TS 51.011 Section 10.5.6
|
||||
class EF_SMSP(LinFixedEF):
|
||||
_test_de_encode = [
|
||||
( '534d5343ffffffffffffffffffffffffe1ffffffffffffffffffffffff0891945197109099f9ffffff0000a9',
|
||||
{ "alpha_id": "SMSC", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
|
||||
"tp_pid": True, "tp_dcs": True, "tp_vp": True },
|
||||
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
|
||||
"numbering_plan_id": "reserved_for_extension" },
|
||||
"call_number": "" },
|
||||
"tp_sc_addr": { "length": 8, "ton_npi": { "ext": True, "type_of_number": "international",
|
||||
"numbering_plan_id": "isdn_e164" },
|
||||
"call_number": "4915790109999" },
|
||||
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
|
||||
# FIXME: re-encode fails / missing alpha_id at start of output
|
||||
_test_decode = [
|
||||
( '454e6574776f726b73fffffffffffffff1ffffffffffffffffffffffffffffffffffffffffffffffff0000a7',
|
||||
{ "alpha_id": "ENetworks", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
|
||||
"tp_pid": True, "tp_dcs": True, "tp_vp": False },
|
||||
"tp_pid": True, "tp_dcs": True, "tp_vp": True },
|
||||
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
|
||||
"numbering_plan_id": "reserved_for_extension" },
|
||||
"call_number": "" },
|
||||
@@ -277,36 +267,22 @@ class EF_SMSP(LinFixedEF):
|
||||
raise ValueError
|
||||
def _encode(self, obj, context, path):
|
||||
if obj <= 12*60:
|
||||
return obj // 5 - 1
|
||||
return obj/5 - 1
|
||||
elif obj <= 24*60:
|
||||
return 143 + ((obj - (12 * 60)) // 30)
|
||||
elif obj <= 30 * 24 * 60:
|
||||
return 166 + (obj // (24 * 60))
|
||||
return 166 + (obj / (24 * 60))
|
||||
elif obj <= 63 * 7 * 24 * 60:
|
||||
return 192 + (obj // (7 * 24 * 60))
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
@staticmethod
|
||||
def sc_addr_len(ctx):
|
||||
"""Compute the length field for an address field (like TP-DestAddr or TP-ScAddr)."""
|
||||
if not hasattr(ctx, 'call_number') or len(ctx.call_number) == 0:
|
||||
return 0xff
|
||||
else:
|
||||
return bytes_for_nibbles(len(ctx.call_number)) + 1
|
||||
|
||||
def __init__(self, fid='6f42', sfid=None, name='EF.SMSP', desc='Short message service parameters', **kwargs):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(28, None), **kwargs)
|
||||
ScAddr = Struct('length'/Rebuild(Int8ub, lambda ctx: EF_SMSP.sc_addr_len(ctx)),
|
||||
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
|
||||
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))),
|
||||
'parameter_indicators'/InvertAdapter(BitStruct(
|
||||
Const(7, BitsInteger(3)),
|
||||
'tp_vp'/Flag,
|
||||
'tp_dcs'/Flag,
|
||||
'tp_pid'/Flag,
|
||||
'tp_sc_addr'/Flag,
|
||||
'tp_dest_addr'/Flag)),
|
||||
ScAddr = Struct('length'/Int8ub, 'ton_npi'/TonNpi, 'call_number'/BcdAdapter(Rpad(Bytes(10))))
|
||||
self._construct = Struct('alpha_id'/COptional(GsmStringAdapter(Rpad(Bytes(this._.total_len-28)))),
|
||||
'parameter_indicators'/InvertAdapter(FlagsEnum(Byte, tp_dest_addr=1, tp_sc_addr=2,
|
||||
tp_pid=3, tp_dcs=4, tp_vp=5)),
|
||||
'tp_dest_addr'/ScAddr,
|
||||
'tp_sc_addr'/ScAddr,
|
||||
|
||||
@@ -661,12 +637,12 @@ class EF_AD(TransparentEF):
|
||||
# TS 51.011 Section 10.3.20 / 10.3.22
|
||||
class EF_VGCS(TransRecEF):
|
||||
_test_de_encode = [
|
||||
( "92f9ffff", "299" ),
|
||||
( "92f9ffff", "299fffff" ),
|
||||
]
|
||||
def __init__(self, fid='6fb1', sfid=None, name='EF.VGCS', size=(4, 200), rec_len=4,
|
||||
desc='Voice Group Call Service', **kwargs):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, rec_len=rec_len, **kwargs)
|
||||
self._construct = PaddedBcdAdapter(Rpad(Bytes(4)))
|
||||
self._construct = BcdAdapter(Bytes(4))
|
||||
|
||||
# TS 51.011 Section 10.3.21 / 10.3.23
|
||||
class EF_VGCSS(TransparentEF):
|
||||
|
||||
@@ -526,13 +526,6 @@ def expand_hex(hexstring, length):
|
||||
# no change
|
||||
return hexstring
|
||||
|
||||
def bytes_for_nibbles(num_nibbles: int) -> int:
|
||||
"""compute the number of bytes needed to store the given number of nibbles."""
|
||||
n_bytes = num_nibbles // 2
|
||||
if num_nibbles & 1:
|
||||
n_bytes += 1
|
||||
return n_bytes
|
||||
|
||||
|
||||
def boxed_heading_str(heading, width=80):
|
||||
"""Generate a string that contains a boxed heading."""
|
||||
@@ -631,17 +624,15 @@ def decomposeATR(atr_txt):
|
||||
Returns:
|
||||
dictionary of field and values
|
||||
|
||||
Example::
|
||||
|
||||
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
|
||||
{ 'T0': {'value': 167},
|
||||
'TB': {1: {'value': 0}},
|
||||
'TC': {2: {'value': 24}},
|
||||
'TD': {1: {'value': 64}},
|
||||
'TS': {'value': 59},
|
||||
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
|
||||
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
|
||||
'hbn': 7}
|
||||
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
|
||||
{ 'T0': {'value': 167},
|
||||
'TB': {1: {'value': 0}},
|
||||
'TC': {2: {'value': 24}},
|
||||
'TD': {1: {'value': 64}},
|
||||
'TS': {'value': 59},
|
||||
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
|
||||
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
|
||||
'hbn': 7}
|
||||
"""
|
||||
ATR_PROTOCOL_TYPE_T0 = 0
|
||||
atr_txt = normalizeATR(atr_txt)
|
||||
@@ -1118,3 +1109,9 @@ class CardCommandSet:
|
||||
if cla and not cmd.match_cla(cla):
|
||||
return None
|
||||
return cmd
|
||||
|
||||
|
||||
def all_subclasses_of(cls):
|
||||
for subc in cls.__subclasses__():
|
||||
yield subc
|
||||
yield from all_subclasses_of(subc)
|
||||
|
||||
2
pylint.sh
Executable file
2
pylint.sh
Executable file
@@ -0,0 +1,2 @@
|
||||
#!/bin/sh
|
||||
python3 -m pylint -j0 --errors-only --disable E1102 --disable E0401 --enable W0301 pySim
|
||||
@@ -5,7 +5,7 @@ cmd2>=2.6.2,<3.0
|
||||
jsonpath-ng
|
||||
construct>=2.10.70
|
||||
bidict
|
||||
pyosmocom>=0.0.12
|
||||
pyosmocom>=0.0.9
|
||||
pyyaml>=5.1
|
||||
termcolor
|
||||
colorlog
|
||||
@@ -15,4 +15,3 @@ git+https://github.com/osmocom/asn1tools
|
||||
packaging
|
||||
git+https://github.com/hologram-io/smpp.pdu
|
||||
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
|
||||
smpplib
|
||||
|
||||
8
ruff.toml
Normal file
8
ruff.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[lint]
|
||||
ignore = [
|
||||
"E741",
|
||||
|
||||
"F403",
|
||||
"F405",
|
||||
"E713",
|
||||
]
|
||||
6
setup.py
6
setup.py
@@ -25,7 +25,7 @@ setup(
|
||||
"jsonpath-ng",
|
||||
"construct >= 2.10.70",
|
||||
"bidict",
|
||||
"pyosmocom >= 0.0.12",
|
||||
"pyosmocom >= 0.0.9",
|
||||
"pyyaml >= 5.1",
|
||||
"termcolor",
|
||||
"colorlog",
|
||||
@@ -55,10 +55,6 @@ setup(
|
||||
"service-identity",
|
||||
"pyopenssl",
|
||||
"requests",
|
||||
"smpplib",
|
||||
],
|
||||
"CardKeyProviderPgsql": [
|
||||
"psycopg2-binary",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
@@ -2200,9 +2200,9 @@ update_record 6 fe0112ffb53e96e5ff99731d51ad7beafd0e23ffffffffffffffffffffffffff
|
||||
update_record 7 fe02101da012f436d06824ecdd15050419ff9affffffffffffffffffffffffffffffff
|
||||
update_record 8 fe02116929a373388ac904aff57ff57f6b3431ffffffffffffffffffffffffffffffff
|
||||
update_record 9 fe0212a99245a5dc814e2f4c1aa908e9946e03ffffffffffffffffffffffffffffffff
|
||||
update_record 10 fe03601111111111111111111111111111111111111111111111111111111111111111
|
||||
update_record 11 fe03612222222222222222222222222222222222222222222222222222222222222222
|
||||
update_record 12 fe03623333333333333333333333333333333333333333333333333333333333333333
|
||||
update_record 10 fe0310521312c05a9aea93d70d44405172a580ffffffffffffffffffffffffffffffff
|
||||
update_record 11 fe0311a9e45c72d45abde7db74261ee0c11b1bffffffffffffffffffffffffffffffff
|
||||
update_record 12 fe0312867ba36b5873d60ea8b2cdcf3c0ddddaffffffffffffffffffffffffffffffff
|
||||
#
|
||||
################################################################################
|
||||
# MF/DF.SYSTEM/EF.SIM_AUTH_COUNTER #
|
||||
|
||||
@@ -6,7 +6,6 @@ IMSI: 001010000000111
|
||||
GID1: ffffffffffffffff
|
||||
GID2: ffffffffffffffff
|
||||
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
|
||||
SMSC: 0015555
|
||||
SPN: Fairwaves
|
||||
Show in HPLMN: False
|
||||
Hide in OPLMN: False
|
||||
|
||||
@@ -6,7 +6,6 @@ IMSI: 001010000000102
|
||||
GID1: Can't read file -- SW match failed! Expected 9000 and got 6a82.
|
||||
GID2: Can't read file -- SW match failed! Expected 9000 and got 6a82.
|
||||
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
|
||||
SMSC: 0015555
|
||||
SPN: wavemobile
|
||||
Show in HPLMN: False
|
||||
Hide in OPLMN: False
|
||||
|
||||
@@ -6,7 +6,6 @@ IMSI: 001010000000102
|
||||
GID1: Can't read file -- SW match failed! Expected 9000 and got 9404.
|
||||
GID2: Can't read file -- SW match failed! Expected 9000 and got 9404.
|
||||
SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
|
||||
SMSC: 0015555
|
||||
SPN: Magic
|
||||
Show in HPLMN: True
|
||||
Hide in OPLMN: False
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Utility to verify the functionality of pySim-prog.py
|
||||
#
|
||||
# (C) 2018 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2018 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier
|
||||
|
||||
@@ -6,7 +6,6 @@ IMSI: 001010000000102
|
||||
GID1: ffffffffffffffffffff
|
||||
GID2: ffffffffffffffffffff
|
||||
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
|
||||
SMSC: 0015555
|
||||
SPN: Magic
|
||||
Show in HPLMN: True
|
||||
Hide in OPLMN: True
|
||||
|
||||
@@ -6,7 +6,6 @@ IMSI: 001010000000102
|
||||
GID1: ffffffffffffffffffff
|
||||
GID2: ffffffffffffffffffff
|
||||
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
|
||||
SMSC: 0015555
|
||||
SPN: Magic
|
||||
Show in HPLMN: True
|
||||
Hide in OPLMN: True
|
||||
|
||||
@@ -6,7 +6,6 @@ IMSI: 001010000000102
|
||||
GID1: ffffffffffffffffffff
|
||||
GID2: ffffffffffffffffffff
|
||||
SMSP: ffffffffffffffffffffffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
|
||||
SMSC: 0015555
|
||||
SPN: Magic
|
||||
Show in HPLMN: True
|
||||
Hide in OPLMN: True
|
||||
|
||||
@@ -6,7 +6,6 @@ IMSI: 001010000000102
|
||||
GID1: Can't read file -- SW match failed! Expected 9000 and got 9404.
|
||||
GID2: Can't read file -- SW match failed! Expected 9000 and got 9404.
|
||||
SMSP: ffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
|
||||
SMSC: 0015555
|
||||
SPN: Not available
|
||||
Show in HPLMN: False
|
||||
Hide in OPLMN: False
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
},
|
||||
{
|
||||
"profile_info": {
|
||||
"iccid": "8949449999999990031",
|
||||
"iccid": "8949449999999990031f",
|
||||
"isdp_aid": "a0000005591010ffffffff8900001200",
|
||||
"profile_state": "disabled",
|
||||
"service_provider_name": "OsmocomSPN",
|
||||
|
||||
@@ -23,7 +23,7 @@ import os
|
||||
import json
|
||||
from utils import *
|
||||
|
||||
# This testcase requires a sysmoEUICC1-C2T with the test prfile TS48V1-B-UNIQUE (ICCID 8949449999999990031)
|
||||
# This testcase requires a sysmoEUICC1-C2T with the test prfile TS48V1-B-UNIQUE (ICCID 8949449999999990031f)
|
||||
# installed, and in disabled state. Also the profile must be installed in such a way that notifications are
|
||||
# generated when the profile is disabled or enabled (ProfileMetadata)
|
||||
|
||||
@@ -56,7 +56,7 @@ class test_case(UnittestUtils):
|
||||
self.runPySimShell(cardname, "test_enable_disable_profile.script")
|
||||
self.assertEqualFiles("enable_disable_profile.tmp")
|
||||
|
||||
def test_set_nickname(self):
|
||||
def test_enable_disable_profile(self):
|
||||
cardname = 'sysmoEUICC1-C2T'
|
||||
|
||||
self.runPySimShell(cardname, "test_set_nickname.script")
|
||||
|
||||
@@ -3,9 +3,6 @@ set echo true
|
||||
|
||||
select ADF.ISD-R
|
||||
|
||||
# Ensure that the test-profile we intend to test with is actually enabled
|
||||
enable_profile --iccid 89000123456789012341
|
||||
|
||||
# by ICCID (pre-installed test profile on sysmoEUICC1-C2T)
|
||||
disable_profile --iccid 89000123456789012341 > enable_disable_profile.tmp
|
||||
enable_profile --iccid 89000123456789012341 >> enable_disable_profile.tmp
|
||||
|
||||
@@ -3,11 +3,6 @@ set echo true
|
||||
|
||||
select ADF.ISD-R
|
||||
|
||||
# Ensure that the test-profile is actually enabled. (In case te test-profile
|
||||
# was disabled, a notification may be generated. The testcase should tolerate
|
||||
# that)
|
||||
enable_profile --iccid 89000123456789012341
|
||||
|
||||
# Generate two (additional) notifications by quickly enabeling the test profile
|
||||
enable_profile --iccid 8949449999999990031
|
||||
enable_profile --iccid 8949449999999990031f
|
||||
enable_profile --iccid 89000123456789012341
|
||||
|
||||
@@ -1,10 +1,5 @@
|
||||
set debug true
|
||||
set echo true
|
||||
|
||||
# The output of get_profiles_info will also include the "profile_state", which
|
||||
# can be either "enabled" or "disabled". Ensure that the correct profile is
|
||||
# enabled.
|
||||
enable_profile --iccid 89000123456789012341
|
||||
|
||||
select ADF.ISD-R
|
||||
get_profiles_info > get_profiles_info.tmp
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
"type_of_number": "reserved_for_extension",
|
||||
"numbering_plan_id": "reserved_for_extension"
|
||||
},
|
||||
"dialing_nr": "1234567",
|
||||
"dialing_nr": "123456",
|
||||
"cap_conf_id": 42,
|
||||
"ext4_record_id": 23
|
||||
},
|
||||
@@ -67,7 +67,7 @@
|
||||
"type_of_number": "reserved_for_extension",
|
||||
"numbering_plan_id": "reserved_for_extension"
|
||||
},
|
||||
"dialing_nr": "1234567",
|
||||
"dialing_nr": "123456",
|
||||
"cap_conf_id": 42,
|
||||
"ext4_record_id": 23
|
||||
},
|
||||
@@ -127,7 +127,7 @@
|
||||
"type_of_number": "reserved_for_extension",
|
||||
"numbering_plan_id": "reserved_for_extension"
|
||||
},
|
||||
"dialing_nr": "1234567",
|
||||
"dialing_nr": "123456",
|
||||
"cap_conf_id": 42,
|
||||
"ext4_record_id": 23
|
||||
}
|
||||
@@ -140,7 +140,7 @@
|
||||
"type_of_number": "reserved_for_extension",
|
||||
"numbering_plan_id": "reserved_for_extension"
|
||||
},
|
||||
"dialing_nr": "1234567",
|
||||
"dialing_nr": "123456",
|
||||
"cap_conf_id": 42,
|
||||
"ext4_record_id": 23
|
||||
}
|
||||
|
||||
@@ -1,216 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Utility to verify the functionality of pySim-trace.py
|
||||
#
|
||||
# (C) 2026 by sysmocom - s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
PYSIM_SHELL=./pySim-shell.py
|
||||
PYSIM_SHELL_LOG=./pySim-shell.log
|
||||
PYSIM_SMPP2SIM=./pySim-smpp2sim.py
|
||||
PYSIM_SMPP2SIM_LOG=./pySim-smpp2sim.log
|
||||
PYSIM_SMPP2SIM_PORT=2775
|
||||
PYSIM_SMPP2SIM_TIMEOUT=10
|
||||
PYSIM_SMPPOTATOOL=./contrib/smpp-ota-tool.py
|
||||
PYSIM_SMPPOTATOOL_LOG=./smpp-ota-tool.log
|
||||
|
||||
function dump_logs {
|
||||
echo ""
|
||||
echo "$PYSIM_SMPPOTATOOL_LOG"
|
||||
echo "------------8<------------"
|
||||
cat $PYSIM_SMPPOTATOOL_LOG
|
||||
echo "------------8<------------"
|
||||
echo ""
|
||||
echo "$PYSIM_SMPP2SIM_LOG"
|
||||
echo "------------8<------------"
|
||||
cat $PYSIM_SMPP2SIM_LOG
|
||||
echo "------------8<------------"
|
||||
}
|
||||
|
||||
function send_test_request {
|
||||
echo ""
|
||||
echo "Sending request to SMPP server:"
|
||||
C_APDU=$1
|
||||
R_APDU_EXPECTED=$2
|
||||
|
||||
echo "Sending: $C_APDU"
|
||||
COMMANDLINE="$PYSIM_SMPPOTATOOL --verbose --port $PYSIM_SMPP2SIM_PORT --kic $KIC --kid $KID --kic-idx $KEY_INDEX --kid-idx $KEY_INDEX --algo-crypt $ALGO_CRYPT --algo-auth $ALGO_AUTH --tar $TAR --apdu $C_APDU"
|
||||
echo "Commandline: $COMMANDLINE"
|
||||
R_APDU=`$COMMANDLINE 2> $PYSIM_SMPPOTATOOL_LOG`
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Unable to send request! -- failed!"
|
||||
dump_logs
|
||||
exit 1
|
||||
fi
|
||||
echo ""
|
||||
echo "Got response from SMPP server:"
|
||||
echo "Sent: $C_APDU"
|
||||
echo "Received: $R_APDU"
|
||||
echo "Expected: $R_APDU_EXPECTED"
|
||||
if [ "$R_APDU" != "$R_APDU_EXPECTED" ]; then
|
||||
echo "Response does not match the expected response! -- failed!"
|
||||
dump_logs
|
||||
exit 1
|
||||
fi
|
||||
echo "Response matches the expected response -- success!"
|
||||
}
|
||||
|
||||
function start_smpp_server {
|
||||
PCSC_READER=$1
|
||||
echo ""
|
||||
echo "Starting SMPP server:"
|
||||
|
||||
# Start the SMPP server
|
||||
COMMANDLINE="$PYSIM_SMPP2SIM -p $PCSC_READER --smpp-bind-port $PYSIM_SMPP2SIM_PORT --apdu-trace"
|
||||
echo "Commandline: $COMMANDLINE"
|
||||
$COMMANDLINE > $PYSIM_SMPP2SIM_LOG 2>&1 &
|
||||
PYSIM_SMPP2SIM_PID=$!
|
||||
trap 'kill $PYSIM_SMPP2SIM_PID' EXIT
|
||||
echo "SMPP server started (PID=$PYSIM_SMPP2SIM_PID)"
|
||||
|
||||
# Wait until the SMPP server is reachable
|
||||
RC=1
|
||||
RETRY_COUNT=0
|
||||
while [ $RC -ne 0 ]; do
|
||||
nc -z localhost $PYSIM_SMPP2SIM_PORT
|
||||
RC=$?
|
||||
((RETRY_COUNT++))
|
||||
if [ $RETRY_COUNT -gt $PYSIM_SMPP2SIM_TIMEOUT ]; then
|
||||
echo "SMPP server not reachable (port=$PYSIM_SMPP2SIM_PORT) -- abort"
|
||||
dump_logs
|
||||
exit 1
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
echo "SMPP server reachable (port=$PYSIM_SMPP2SIM_PORT)"
|
||||
}
|
||||
|
||||
function stop_smpp_server {
|
||||
echo ""
|
||||
echo "Stopping SMPP server:"
|
||||
kill $PYSIM_SMPP2SIM_PID
|
||||
echo "SMPP server stopped (PID=$PYSIM_SMPP2SIM_PID)"
|
||||
trap EXIT
|
||||
}
|
||||
|
||||
function find_card_by_iccid_or_eid {
|
||||
ICCID=$1
|
||||
EID=$2
|
||||
echo ""
|
||||
echo "Searching for card:"
|
||||
echo "ICCID: \"$ICCID\""
|
||||
if [ -n "$EID" ]; then
|
||||
echo "EID: \"$EID\""
|
||||
fi
|
||||
|
||||
# Determine number of available PCSC readers
|
||||
PCSC_READER_COUNT=`pcsc_scan -rn | wc -l`
|
||||
|
||||
# In case an EID is set, search for a card with that EID first
|
||||
if [ -n "$EID" ]; then
|
||||
for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
|
||||
echo "probing card (eID) in reader $PCSC_READER ..."
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "get_eid" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep $EID > /dev/null
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Found card (eID) in reader $PCSC_READER"
|
||||
return $PCSC_READER
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
# Search for card with the given ICCID
|
||||
if [ -z "$ICCID" ]; then
|
||||
echo "invalid ICCID, zero length ICCID is not allowed! -- abort"
|
||||
exit 1
|
||||
fi
|
||||
for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
|
||||
echo "probing card (ICCID) in reader $PCSC_READER ..."
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select EF.ICCID" -e "read_binary_decoded" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep $ICCID > /dev/null
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Found card (by ICCID) in reader $PCSC_READER"
|
||||
return $PCSC_READER
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Card not found -- abort"
|
||||
exit 1
|
||||
}
|
||||
|
||||
function enable_profile {
|
||||
PCSC_READER=$1
|
||||
ICCID=$2
|
||||
EID=$3
|
||||
if [ -z "$EID" ]; then
|
||||
# This is no eUICC, nothing to enable
|
||||
return 0
|
||||
fi
|
||||
|
||||
# Check if the profile is already enabled
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select EF.ICCID" -e "read_binary_decoded" 2> /dev/null | tail -3`
|
||||
ICCID_ENABLED=`echo $RESULT_JSON | jq -r '.iccid'`
|
||||
if [ $ICCID != $ICCID_ENABLED ]; then
|
||||
# Disable the currentle enabled profile
|
||||
echo ""
|
||||
echo "Disabeling currently enabled profile:"
|
||||
echo "ICCID: \"$ICCID\""
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "disable_profile --iccid $ICCID_ENABLED" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep "ok" > /dev/null
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "unable to disable profile with \"$ICCID_ENABLED\""
|
||||
exit 1
|
||||
fi
|
||||
echo "profile disabled"
|
||||
|
||||
# Enable the profile we intend to test with
|
||||
echo ""
|
||||
echo "Enabeling profile:"
|
||||
echo "ICCID: \"$ICCID\""
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "enable_profile --iccid $ICCID" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep "ok\|profileNotInDisabledState" > /dev/null
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "unable to enable profile with \"$ICCID\""
|
||||
exit 1
|
||||
fi
|
||||
echo "profile enabled"
|
||||
fi
|
||||
}
|
||||
|
||||
export PYTHONPATH=./
|
||||
|
||||
echo "pySim-smpp2sim_test - a test program to test pySim-smpp2sim.py"
|
||||
echo "=============================================================="
|
||||
|
||||
TESTCASE_DIR=`dirname $0`
|
||||
for TEST_CONFIG_FILE in $TESTCASE_DIR/testcase_*.cfg ; do
|
||||
echo ""
|
||||
echo "running testcase: $TEST_CONFIG_FILE"
|
||||
. $TEST_CONFIG_FILE
|
||||
find_card_by_iccid_or_eid $ICCID $EID
|
||||
PCSC_READER=$?
|
||||
enable_profile $PCSC_READER $ICCID $EID
|
||||
start_smpp_server $PCSC_READER
|
||||
send_test_request $APDU "$EXPECTED_RESPONSE"
|
||||
stop_smpp_server
|
||||
echo ""
|
||||
echo "testcase ok"
|
||||
echo "--------------------------------------------------------------"
|
||||
done
|
||||
|
||||
echo "done."
|
||||
@@ -1,17 +0,0 @@
|
||||
# Preparation:
|
||||
# This testcase executes against a sysmoISIM-SJA5 card. For the testcase, the
|
||||
# key configuration on the card may be used as it is.
|
||||
|
||||
# Card parameter:
|
||||
ICCID="8949440000001155314" # <-- change to the ICCID of your card!
|
||||
EID=""
|
||||
KIC='51D4FC44BCBA7C4589DFADA3297720AF' # <-- change to the KIC1 of your card!
|
||||
KID='0449699C472CE71E2FB7B56245EF7684' # <-- change to the KID1 of your card!
|
||||
KEY_INDEX=1
|
||||
ALGO_CRYPT=triple_des_cbc2
|
||||
ALGO_AUTH=triple_des_cbc2
|
||||
TAR='B00010'
|
||||
|
||||
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
|
||||
APDU='A0A40000027F20A0C0000016'
|
||||
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'
|
||||
@@ -1,19 +0,0 @@
|
||||
# Preparation:
|
||||
# This testcase executes against a sysmoEUICC1-C2T, which is equipped with the
|
||||
# TS48V1-B-UNIQUE test profile from https://test.rsp.sysmocom.de/ (Activation
|
||||
# code: 1$smdpp.test.rsp.sysmocom.de$TS48V1-B-UNIQUE). This testprofile must be
|
||||
# present on the eUICC before this testcase can be executed.
|
||||
|
||||
# Card parameter:
|
||||
ICCID="8949449999999990031"
|
||||
EID="89049044900000000000000000102355" # <-- change to the EID of your card!
|
||||
KIC='66778899aabbccdd1122334455eeff10'
|
||||
KID='112233445566778899aabbccddeeff10'
|
||||
KEY_INDEX=2
|
||||
ALGO_CRYPT=aes_cbc
|
||||
ALGO_AUTH=aes_cmac
|
||||
TAR='b00120'
|
||||
|
||||
# Testcase: Send OTA-SMS that selects DF.ICCID and returns the select response
|
||||
APDU='00a40004022fe200C000001d'
|
||||
EXPECTED_RESPONSE='621b8202412183022fe2a503d001408a01058b032f06038002000a8800 9000'
|
||||
@@ -1,28 +0,0 @@
|
||||
# Preparation:
|
||||
# This testcase executes against a sysmoISIM-SJA5 card. Since this card model is
|
||||
# shipped with a classic DES key configuration, it is necessary to provision
|
||||
# AES128 test keys before this testcase may be executed. The the following
|
||||
# pySim-shell command sequence may be used:
|
||||
#
|
||||
# verify_adm 34173960 # <-- change to the ADM key of your card!
|
||||
# select /DF.SYSTEM/EF.0348_KEY
|
||||
# update_record 10 fe03601111111111111111111111111111111111111111111111111111111111111111
|
||||
# update_record 11 fe03612222222222222222222222222222222222222222222222222222222222222222
|
||||
# update_record 12 fe03623333333333333333333333333333333333333333333333333333333333333333
|
||||
#
|
||||
# This overwrites one of the already existing 3DES SCP02 key (KVN 47) and replaces it
|
||||
# with an AES256 SCP80 key (KVN 3).
|
||||
|
||||
# Card parameter:
|
||||
ICCID="8949440000001155314" # <-- change to the ICCID of your card!
|
||||
EID=""
|
||||
KIC='1111111111111111111111111111111111111111111111111111111111111111'
|
||||
KID='2222222222222222222222222222222222222222222222222222222222222222'
|
||||
KEY_INDEX=3
|
||||
ALGO_CRYPT=aes_cbc
|
||||
ALGO_AUTH=aes_cmac
|
||||
TAR='B00010'
|
||||
|
||||
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
|
||||
APDU='A0A40000027F20A0C0000016'
|
||||
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Utility to verify the functionality of pySim-trace.py
|
||||
#
|
||||
# (C) 2023 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2023 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier
|
||||
|
||||
@@ -143,7 +143,7 @@ CardReset(3b9f96801f878031e073fe211b674a4c753034054ba9)
|
||||
===============================
|
||||
00 SEARCH RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [2], "sw": "9000"}}
|
||||
===============================
|
||||
00 READ RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"alpha_id": "", "parameter_indicators": {"tp_vp": true, "tp_dcs": true, "tp_pid": true, "tp_sc_addr": true, "tp_dest_addr": false}, "tp_dest_addr": {"length": 255, "ton_npi": {"ext": true, "type_of_number": "reserved_for_extension", "numbering_plan_id": "reserved_for_extension"}, "call_number": ""}, "tp_sc_addr": {"length": 5, "ton_npi": {"ext": true, "type_of_number": "unknown", "numbering_plan_id": "isdn_e164"}, "call_number": "0015555"}, "tp_pid": "00", "tp_dcs": "00", "tp_vp_minutes": 5}
|
||||
00 READ RECORD MF/ADF.USIM/EF.SMSP 01 9000 {"alpha_id": "", "parameter_indicators": {"tp_dest_addr": false, "tp_sc_addr": true, "tp_pid": true, "tp_dcs": true, "tp_vp": true}, "tp_dest_addr": {"length": 255, "ton_npi": {"ext": true, "type_of_number": "reserved_for_extension", "numbering_plan_id": "reserved_for_extension"}, "call_number": ""}, "tp_sc_addr": {"length": 5, "ton_npi": {"ext": true, "type_of_number": "unknown", "numbering_plan_id": "isdn_e164"}, "call_number": "0015555f"}, "tp_pid": "00", "tp_dcs": "00", "tp_vp_minutes": 5}
|
||||
===============================
|
||||
00 SEARCH RECORD MF/ADF.USIM/EF.SMS 01 9000 {"cmd": {"file": "currently_selected_ef", "mode": "forward_search", "record_number": 1, "search_string": "00ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"}, "rsp": {"body": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], "sw": "9000"}}
|
||||
===============================
|
||||
|
||||
1
tests/unittests/smdpp_data
Symbolic link
1
tests/unittests/smdpp_data
Symbolic link
@@ -0,0 +1 @@
|
||||
../../smdpp-data
|
||||
399
tests/unittests/test_configurable_parameters.py
Executable file
399
tests/unittests/test_configurable_parameters.py
Executable file
@@ -0,0 +1,399 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: Neels Hofmeyr
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import io
|
||||
import sys
|
||||
import unittest
|
||||
import io
|
||||
from importlib import resources
|
||||
from osmocom.utils import hexstr
|
||||
from pySim.esim.saip import ProfileElementSequence
|
||||
import pySim.esim.saip.personalization as p13n
|
||||
import smdpp_data.upp
|
||||
|
||||
import xo
|
||||
update_expected_output = False
|
||||
|
||||
def valstr(val):
|
||||
if isinstance(val, io.BytesIO):
|
||||
val = val.getvalue()
|
||||
if isinstance(val, bytearray):
|
||||
val = bytes(val)
|
||||
return f'{val!r}'
|
||||
|
||||
def valtypestr(val):
|
||||
if isinstance(val, dict):
|
||||
types = []
|
||||
for v in val.values():
|
||||
types.append(f'{type(v).__name__}')
|
||||
|
||||
val_type = '{' + ', '.join(types) + '}'
|
||||
else:
|
||||
val_type = f'{type(val).__name__}'
|
||||
return f'{valstr(val)}:{val_type}'
|
||||
|
||||
class D:
|
||||
mandatory = set()
|
||||
optional = set()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
||||
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
||||
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
for k in self.optional:
|
||||
if not hasattr(self, k):
|
||||
setattr(self, k, None)
|
||||
|
||||
class ConfigurableParameterTest(unittest.TestCase):
|
||||
|
||||
def test_parameters(self):
|
||||
|
||||
upp_fnames = (
|
||||
'TS48v5_SAIP2.1A_NoBERTLV.der',
|
||||
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
|
||||
'TS48v5_SAIP2.1B_NoBERTLV.der',
|
||||
'TS48v5_SAIP2.3_NoBERTLV.der',
|
||||
)
|
||||
|
||||
class Paramtest(D):
|
||||
mandatory = (
|
||||
'param_cls',
|
||||
'val',
|
||||
'expect_val',
|
||||
)
|
||||
optional = (
|
||||
'expect_clean_val',
|
||||
)
|
||||
|
||||
param_tests = [
|
||||
Paramtest(param_cls=p13n.Imsi, val='123456',
|
||||
expect_clean_val=str('123456'),
|
||||
expect_val={'IMSI': hexstr('123456'),
|
||||
'IMSI-ACC': '0040'}),
|
||||
Paramtest(param_cls=p13n.Imsi, val=int(123456),
|
||||
expect_val={'IMSI': hexstr('123456'),
|
||||
'IMSI-ACC': '0040'}),
|
||||
|
||||
Paramtest(param_cls=p13n.Imsi, val='123456789012345',
|
||||
expect_clean_val=str('123456789012345'),
|
||||
expect_val={'IMSI': hexstr('123456789012345'),
|
||||
'IMSI-ACC': '0020'}),
|
||||
Paramtest(param_cls=p13n.Imsi, val=int(123456789012345),
|
||||
expect_val={'IMSI': hexstr('123456789012345'),
|
||||
'IMSI-ACC': '0020'}),
|
||||
|
||||
Paramtest(param_cls=p13n.Puk1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Puk1,
|
||||
val=int(12345678),
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Puk2,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='1234',
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='123456',
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(1234),
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(123456),
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(12345678),
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='1234',
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='123456',
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val=int(123456),
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='Milenage',
|
||||
expect_clean_val=1,
|
||||
expect_val='Milenage'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='TUAK',
|
||||
expect_clean_val=2,
|
||||
expect_val='TUAK'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='usim-test',
|
||||
expect_clean_val=3,
|
||||
expect_val='usim-test'),
|
||||
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=1,
|
||||
expect_clean_val=1,
|
||||
expect_val='Milenage'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=2,
|
||||
expect_clean_val=2,
|
||||
expect_val='TUAK'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=3,
|
||||
expect_clean_val=3,
|
||||
expect_val='usim-test'),
|
||||
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=int(11020304050607080910111213141516),
|
||||
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='11020304050607080910111213141516'),
|
||||
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
]
|
||||
|
||||
for sdkey_cls in (
|
||||
# thin out the number of tests, as a compromise between completeness and test runtime
|
||||
p13n.SdKeyScp80Kvn01Enc,
|
||||
#p13n.SdKeyScp80Kvn01Dek,
|
||||
#p13n.SdKeyScp80Kvn01Mac,
|
||||
#p13n.SdKeyScp80Kvn02Enc,
|
||||
p13n.SdKeyScp80Kvn02Dek,
|
||||
#p13n.SdKeyScp80Kvn02Mac,
|
||||
#p13n.SdKeyScp81Kvn81Enc,
|
||||
#p13n.SdKeyScp81Kvn81Dek,
|
||||
p13n.SdKeyScp81Kvn81Mac,
|
||||
#p13n.SdKeyScp81Kvn82Enc,
|
||||
#p13n.SdKeyScp81Kvn82Dek,
|
||||
#p13n.SdKeyScp81Kvn82Mac,
|
||||
p13n.SdKeyScp81Kvn83Enc,
|
||||
#p13n.SdKeyScp81Kvn83Dek,
|
||||
#p13n.SdKeyScp81Kvn83Mac,
|
||||
#p13n.SdKeyScp02Kvn20Enc,
|
||||
p13n.SdKeyScp02Kvn20Dek,
|
||||
#p13n.SdKeyScp02Kvn20Mac,
|
||||
#p13n.SdKeyScp02Kvn21Enc,
|
||||
#p13n.SdKeyScp02Kvn21Dek,
|
||||
p13n.SdKeyScp02Kvn21Mac,
|
||||
#p13n.SdKeyScp02Kvn22Enc,
|
||||
#p13n.SdKeyScp02Kvn22Dek,
|
||||
#p13n.SdKeyScp02Kvn22Mac,
|
||||
p13n.SdKeyScp02KvnffEnc,
|
||||
#p13n.SdKeyScp02KvnffDek,
|
||||
#p13n.SdKeyScp02KvnffMac,
|
||||
#p13n.SdKeyScp03Kvn30Enc,
|
||||
p13n.SdKeyScp03Kvn30Dek,
|
||||
#p13n.SdKeyScp03Kvn30Mac,
|
||||
#p13n.SdKeyScp03Kvn31Enc,
|
||||
#p13n.SdKeyScp03Kvn31Dek,
|
||||
p13n.SdKeyScp03Kvn31Mac,
|
||||
#p13n.SdKeyScp03Kvn32Enc,
|
||||
#p13n.SdKeyScp03Kvn32Dek,
|
||||
#p13n.SdKeyScp03Kvn32Mac,
|
||||
):
|
||||
|
||||
param_tests.extend([
|
||||
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val='010203040506070809101112131415161718192021222324',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
||||
b'\x17\x18\x19\x20\x21\x22\x23\x24',
|
||||
expect_val='010203040506070809101112131415161718192021222324'),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val='0102030405060708091011121314151617181920212223242526272829303132',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
||||
b'\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31\x32',
|
||||
expect_val='0102030405060708091011121314151617181920212223242526272829303132'),
|
||||
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516',
|
||||
),
|
||||
Paramtest(param_cls=sdkey_cls,
|
||||
val=11020304050607080910111213141516,
|
||||
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='11020304050607080910111213141516',
|
||||
),
|
||||
])
|
||||
|
||||
outputs = []
|
||||
|
||||
for upp_fname in upp_fnames:
|
||||
test_idx = -1
|
||||
try:
|
||||
|
||||
der = resources.read_binary(smdpp_data.upp, upp_fname)
|
||||
|
||||
for t in param_tests:
|
||||
test_idx += 1
|
||||
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
|
||||
|
||||
param = None
|
||||
try:
|
||||
param = t.param_cls()
|
||||
param.input_value = t.val
|
||||
param.validate()
|
||||
except ValueError as e:
|
||||
raise ValueError(f'{logloc}: {e}') from e
|
||||
|
||||
clean_val = param.value
|
||||
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
|
||||
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
|
||||
raise ValueError(f'{logloc}: expected'
|
||||
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
|
||||
|
||||
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
|
||||
# pes = copy.deepcopy(orig_pes)
|
||||
pes = ProfileElementSequence.from_der(der)
|
||||
try:
|
||||
param.apply(pes)
|
||||
except ValueError as e:
|
||||
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
|
||||
|
||||
changed_der = pes.to_der()
|
||||
|
||||
pes2 = ProfileElementSequence.from_der(changed_der)
|
||||
|
||||
read_back_val = t.param_cls.get_value_from_pes(pes2)
|
||||
|
||||
# compose log string to show the precise type of dict values
|
||||
if isinstance(read_back_val, dict):
|
||||
types = set()
|
||||
for v in read_back_val.values():
|
||||
types.add(f'{type(v).__name__}')
|
||||
|
||||
read_back_val_type = '{' + ', '.join(types) + '}'
|
||||
else:
|
||||
read_back_val_type = f'{type(read_back_val).__name__}'
|
||||
|
||||
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
|
||||
|
||||
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
|
||||
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
|
||||
|
||||
expect_val = t.expect_val
|
||||
if not isinstance(expect_val, dict):
|
||||
expect_val = { t.param_cls.get_name(): expect_val }
|
||||
if read_back_val != expect_val:
|
||||
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
|
||||
|
||||
ok = logloc.replace(' clean_val', '\n\tclean_val'
|
||||
).replace(' read_back_val', '\n\tread_back_val'
|
||||
).replace('=', '=\t'
|
||||
)
|
||||
output = f'\nok: {ok}'
|
||||
outputs.append(output)
|
||||
print(output)
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f'Error while testing UPP {upp_fname} {test_idx=}: {e}') from e
|
||||
|
||||
output = '\n'.join(outputs) + '\n'
|
||||
xo_name = 'test_configurable_parameters'
|
||||
if update_expected_output:
|
||||
with resources.path(xo, xo_name) as xo_path:
|
||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
else:
|
||||
xo_str = resources.read_text(xo, xo_name)
|
||||
if xo_str != output:
|
||||
at = 0
|
||||
while at < len(output):
|
||||
if output[at] == xo_str[at]:
|
||||
at += 1
|
||||
continue
|
||||
break
|
||||
|
||||
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if '-u' in sys.argv:
|
||||
update_expected_output = True
|
||||
sys.argv.remove('-u')
|
||||
unittest.main()
|
||||
@@ -63,6 +63,44 @@ class SaipTest(unittest.TestCase):
|
||||
# TODO: we don't actually test the results here, but we just verify there is no exception
|
||||
pes.to_der()
|
||||
|
||||
def test_personalization2(self):
|
||||
"""Test some of the personalization operations."""
|
||||
pes = ProfileElementSequence.from_der(self.per_input)
|
||||
prev_val = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{prev_val=}')
|
||||
self.assertTrue(prev_val)
|
||||
|
||||
set_val = '42342342342342342342342342342342'
|
||||
param = SdKeyScp80_01Kic(set_val)
|
||||
param.validate()
|
||||
param.apply(pes)
|
||||
|
||||
get_val1 = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{get_val1=} {set_val=}')
|
||||
self.assertEqual(get_val1, set((set_val,)))
|
||||
|
||||
get_val1b = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{get_val1b=} {set_val=}')
|
||||
self.assertEqual(get_val1b, set((set_val,)))
|
||||
|
||||
print("HELLOO")
|
||||
der = pes.to_der()
|
||||
print("DONEDONE")
|
||||
|
||||
get_val1c = set(SdKeyScp80_01Kic.get_values_from_pes(pes))
|
||||
print(f'{get_val1c=} {set_val=}')
|
||||
self.assertEqual(get_val1c, set((set_val,)))
|
||||
|
||||
# assertTrue to not dump the entire der.
|
||||
# Expecting the modified DER to be different. If this assertion fails, then no change has happened in the output
|
||||
# DER and the ConfigurableParameter subclass is buggy.
|
||||
self.assertTrue(der != self.per_input)
|
||||
|
||||
pes2 = ProfileElementSequence.from_der(der)
|
||||
get_val2 = set(SdKeyScp80_01Kic.get_values_from_pes(pes2))
|
||||
print(f'{get_val2=} {set_val=}')
|
||||
self.assertEqual(get_val2, set((set_val,)))
|
||||
|
||||
def test_constructor_encode(self):
|
||||
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
|
||||
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
|
||||
@@ -90,34 +128,5 @@ class OidTest(unittest.TestCase):
|
||||
self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0'))
|
||||
self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1'))
|
||||
|
||||
class NonMatchTest(unittest.TestCase):
|
||||
def test_nonmatch(self):
|
||||
# non-matches before, in between and after matches
|
||||
match_list = [Match(a=10, b=10, size=5), Match(a=20, b=20, size=4)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 26)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=10), NonMatch(a=15, b=15, size=5),
|
||||
NonMatch(a=24, b=24, size=2)])
|
||||
|
||||
def test_nonmatch_beg(self):
|
||||
# single match at beginning
|
||||
match_list = [Match(a=0, b=0, size=5)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 20)
|
||||
self.assertEqual(nm_list, [NonMatch(a=5, b=5, size=15)])
|
||||
|
||||
def test_nonmatch_end(self):
|
||||
# single match at end
|
||||
match_list = [Match(a=19, b=19, size=5)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 24)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=19)])
|
||||
|
||||
def test_nonmatch_none(self):
|
||||
# no match at all
|
||||
match_list = []
|
||||
nm_list = NonMatch.from_matchlist(match_list, 24)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=24)])
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -96,7 +96,7 @@ class LinFixed_Test(unittest.TestCase):
|
||||
inst = c()
|
||||
encoded, rec_num, decoded = self._parse_t(t)
|
||||
logging.debug("Testing encode of %s", name)
|
||||
re_enc = inst.encode_record_hex(decoded, rec_num, len(encoded)//2)
|
||||
re_enc = inst.encode_record_hex(decoded, rec_num)
|
||||
self.assertEqual(encoded.upper(), re_enc.upper())
|
||||
|
||||
def test_de_encode_record(self):
|
||||
@@ -122,7 +122,7 @@ class LinFixed_Test(unittest.TestCase):
|
||||
self.assertEqual(decoded, re_dec)
|
||||
# re-encode the decoded data
|
||||
logging.debug("Testing re-encode of %s", name)
|
||||
re_enc = inst.encode_record_hex(re_dec, rec_num, len(encoded)//2)
|
||||
re_enc = inst.encode_record_hex(re_dec, rec_num)
|
||||
self.assertEqual(encoded.upper(), re_enc.upper())
|
||||
if hasattr(c, '_test_no_pad') and c._test_no_pad:
|
||||
continue
|
||||
@@ -196,7 +196,7 @@ class TransRecEF_Test(unittest.TestCase):
|
||||
self.assertEqual(decoded, re_dec)
|
||||
# re-encode the decoded data
|
||||
logging.debug("Testing re-encode of %s", name)
|
||||
re_enc = inst.encode_record_hex(re_dec, len(encoded)//2)
|
||||
re_enc = inst.encode_record_hex(re_dec)
|
||||
self.assertEqual(encoded.upper(), re_enc.upper())
|
||||
# there's no point in testing padded input, as TransRecEF have a fixed record
|
||||
# size and we cannot ever receive more input data than that size.
|
||||
@@ -256,8 +256,8 @@ class TransparentEF_Test(unittest.TestCase):
|
||||
encoded = t[0]
|
||||
decoded = t[1]
|
||||
logging.debug("Testing encode of %s", name)
|
||||
re_enc = inst.encode_hex(decoded, len(encoded)//2)
|
||||
self.assertEqual(encoded, re_enc)
|
||||
re_dec = inst.decode_hex(encoded)
|
||||
self.assertEqual(decoded, re_dec)
|
||||
|
||||
def test_de_encode_file(self):
|
||||
"""Test the decoder and encoder for a transparent EF. Performs first a decoder
|
||||
@@ -280,7 +280,7 @@ class TransparentEF_Test(unittest.TestCase):
|
||||
self.assertEqual(decoded, re_dec)
|
||||
logging.debug("Testing re-encode of %s", name)
|
||||
re_dec = inst.decode_hex(encoded)
|
||||
re_enc = inst.encode_hex(re_dec, len(encoded)//2)
|
||||
re_enc = inst.encode_hex(re_dec)
|
||||
self.assertEqual(encoded.upper(), re_enc.upper())
|
||||
if hasattr(c, '_test_no_pad') and c._test_no_pad:
|
||||
continue
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH
|
||||
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||
@@ -25,7 +25,7 @@ import io
|
||||
import sys
|
||||
from inspect import currentframe, getframeinfo
|
||||
|
||||
log = PySimLogger.get(__name__)
|
||||
log = PySimLogger.get("TEST")
|
||||
|
||||
TEST_MSG_DEBUG = "this is a debug message"
|
||||
TEST_MSG_INFO = "this is an info message"
|
||||
@@ -82,15 +82,15 @@ class PySimLogger_Test(unittest.TestCase):
|
||||
PySimLogger.setup(self._test_print_callback)
|
||||
PySimLogger.set_verbose(True)
|
||||
frame = currentframe()
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- DEBUG: " + TEST_MSG_DEBUG
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
|
||||
log.debug(TEST_MSG_DEBUG)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- INFO: " + TEST_MSG_INFO
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
|
||||
log.info(TEST_MSG_INFO)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- WARNING: " + TEST_MSG_WARNING
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
|
||||
log.warning(TEST_MSG_WARNING)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- ERROR: " + TEST_MSG_ERROR
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
|
||||
log.error(TEST_MSG_ERROR)
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- CRITICAL: " + TEST_MSG_CRITICAL
|
||||
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
|
||||
log.critical(TEST_MSG_CRITICAL)
|
||||
|
||||
def test_04_level(self):
|
||||
|
||||
216
tests/unittests/test_param_src.py
Executable file
216
tests/unittests/test_param_src.py
Executable file
@@ -0,0 +1,216 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: Neels Hofmeyr
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import math
|
||||
from importlib import resources
|
||||
import unittest
|
||||
from pySim.esim.saip import param_source
|
||||
|
||||
import xo
|
||||
update_expected_output = False
|
||||
|
||||
class D:
|
||||
mandatory = set()
|
||||
optional = set()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
||||
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
||||
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
for k in self.optional:
|
||||
if not hasattr(self, k):
|
||||
setattr(self, k, None)
|
||||
|
||||
decimals = '0123456789'
|
||||
hexadecimals = '0123456789abcdefABCDEF'
|
||||
|
||||
class FakeRandom:
|
||||
vals = b'\xab\xcfm\xf0\x98J_\xcf\x96\x87fp5l\xe7f\xd1\xd6\x97\xc1\xf9]\x8c\x86+\xdb\t^ke\xc1r'
|
||||
i = 0
|
||||
|
||||
@classmethod
|
||||
def next(cls):
|
||||
cls.i = (cls.i + 1) % len(cls.vals)
|
||||
return cls.vals[cls.i]
|
||||
|
||||
@staticmethod
|
||||
def randint(a, b):
|
||||
d = b - a
|
||||
n_bytes = math.ceil(math.log(d, 2))
|
||||
r = int.from_bytes( bytes(FakeRandom.next() for i in range(n_bytes)) )
|
||||
return a + (r % (b - a))
|
||||
|
||||
@staticmethod
|
||||
def randbytes(n):
|
||||
return bytes(FakeRandom.next() for i in range(n))
|
||||
|
||||
|
||||
class ParamSourceTest(unittest.TestCase):
|
||||
|
||||
def test_param_source(self):
|
||||
|
||||
class ParamSourceTest(D):
|
||||
mandatory = (
|
||||
'param_source',
|
||||
'n',
|
||||
'expect',
|
||||
)
|
||||
optional = (
|
||||
'expect_arg',
|
||||
'csv_rows',
|
||||
)
|
||||
|
||||
def expect_const(t, vals):
|
||||
return tuple(t.expect_arg) == tuple(vals)
|
||||
|
||||
def expect_random(t, vals):
|
||||
chars = t.expect_arg.get('digits')
|
||||
repetitions = (t.n - len(set(vals)))
|
||||
if repetitions:
|
||||
raise RuntimeError(f'expect_random: there are {repetitions} repetitions in the returned values: {vals}')
|
||||
for val_i in range(len(vals)):
|
||||
v = vals[val_i]
|
||||
val_minlen = t.expect_arg.get('val_minlen')
|
||||
val_maxlen = t.expect_arg.get('val_maxlen')
|
||||
if len(v) < val_minlen or len(v) > val_maxlen:
|
||||
raise RuntimeError(f'expect_random: invalid length {len(v)} for value [{val_i}]: {v!r}, expecting'
|
||||
f' {val_minlen}..{val_maxlen}')
|
||||
|
||||
if chars is not None and not all(c in chars for c in v):
|
||||
raise RuntimeError(f'expect_random: invalid char in value [{val_i}]: {v!r}')
|
||||
return True
|
||||
|
||||
param_source_tests = [
|
||||
ParamSourceTest(param_source=param_source.ConstantSource.from_str('123'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('123', '123', '123')
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('12345'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 5,
|
||||
'val_maxlen': 5,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('1..999'),
|
||||
n=10,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 1,
|
||||
'val_maxlen': 3,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('001..999'),
|
||||
n=10,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 3,
|
||||
'val_maxlen': 3,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.IncDigitSource.from_str('10001'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('10001', '10002', '10003')
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.CsvSource('column_name'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('first val', 'second val', 'third val'),
|
||||
csv_rows=(
|
||||
{'column_name': 'first val',},
|
||||
{'column_name': 'second val',},
|
||||
{'column_name': 'third val',},
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
outputs = []
|
||||
|
||||
for t in param_source_tests:
|
||||
try:
|
||||
if hasattr(t.param_source, 'random_impl'):
|
||||
t.param_source.random_impl = FakeRandom
|
||||
|
||||
vals = []
|
||||
for i in range(t.n):
|
||||
csv_row = None
|
||||
if t.csv_rows is not None:
|
||||
csv_row = t.csv_rows[i]
|
||||
vals.append( t.param_source.get_next(csv_row=csv_row) )
|
||||
if not t.expect(t, vals):
|
||||
raise RuntimeError(f'invalid values returned: returned {vals}')
|
||||
output = f'ok: {t.param_source.__class__.__name__} {vals=!r}'
|
||||
outputs.append(output)
|
||||
print(output)
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f'{t.param_source.__class__.__name__} {t.n=} {t.expect.__name__}({t.expect_arg!r}): {e}') from e
|
||||
|
||||
output = '\n'.join(outputs) + '\n'
|
||||
xo_name = 'test_param_src'
|
||||
if update_expected_output:
|
||||
with resources.path(xo, xo_name) as xo_path:
|
||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
else:
|
||||
xo_str = resources.read_text(xo, xo_name)
|
||||
if xo_str != output:
|
||||
at = 0
|
||||
while at < len(output):
|
||||
if output[at] == xo_str[at]:
|
||||
at += 1
|
||||
continue
|
||||
break
|
||||
|
||||
raise RuntimeError(f'output differs from expected output at position {at}: {xo_str[at:at+128]!r}')
|
||||
|
||||
if __name__ == "__main__":
|
||||
if '-u' in sys.argv:
|
||||
update_expected_output = True
|
||||
sys.argv.remove('-u')
|
||||
unittest.main()
|
||||
@@ -79,30 +79,6 @@ class DecTestCase(unittest.TestCase):
|
||||
def testDecMNCfromPLMN_unused_str(self):
|
||||
self.assertEqual(utils.dec_mnc_from_plmn_str("00f0ff"), "")
|
||||
|
||||
def testEncImsi(self):
|
||||
#Type IMSI, odd number of identity digits
|
||||
self.assertEqual(utils.enc_imsi("228062800000208"), "082982608200002080")
|
||||
self.assertEqual(utils.enc_imsi("001010000123456"), "080910100000214365")
|
||||
self.assertEqual(utils.enc_imsi("0010100001234"), "0709101000002143ff")
|
||||
|
||||
#Type IMSI, even number of identity digits
|
||||
self.assertEqual(utils.enc_imsi("22806280000028"), "0821826082000020f8")
|
||||
self.assertEqual(utils.enc_imsi("00101000012345"), "0801101000002143f5")
|
||||
self.assertEqual(utils.enc_imsi("001010000123"), "07011010000021f3ff")
|
||||
|
||||
def testDecImsi(self):
|
||||
#Type IMSI, odd number of identity digits
|
||||
self.assertEqual(utils.dec_imsi("082982608200002080"), "228062800000208")
|
||||
self.assertEqual(utils.dec_imsi("080910100000214365"), "001010000123456")
|
||||
self.assertEqual(utils.dec_imsi("0709101000002143ff"), "0010100001234")
|
||||
self.assertEqual(utils.dec_imsi("0709101000002143"), "0010100001234")
|
||||
|
||||
#Type IMSI, even number of identity digits
|
||||
self.assertEqual(utils.dec_imsi("0821826082000020f8"), "22806280000028")
|
||||
self.assertEqual(utils.dec_imsi("0801101000002143f5"), "00101000012345")
|
||||
self.assertEqual(utils.dec_imsi("07011010000021f3ff"), "001010000123")
|
||||
self.assertEqual(utils.dec_imsi("07011010000021f3"), "001010000123")
|
||||
|
||||
def test_enc_plmn(self):
|
||||
with self.subTest("2-digit MCC"):
|
||||
self.assertEqual(utils.enc_plmn("001", "01F"), "00F110")
|
||||
|
||||
1520
tests/unittests/xo/test_configurable_parameters
Normal file
1520
tests/unittests/xo/test_configurable_parameters
Normal file
File diff suppressed because it is too large
Load Diff
9
tests/unittests/xo/test_param_src
Normal file
9
tests/unittests/xo/test_param_src
Normal file
@@ -0,0 +1,9 @@
|
||||
ok: ConstantSource vals=['123', '123', '123']
|
||||
ok: RandomDigitSource vals=['13987', '49298', '55670']
|
||||
ok: RandomDigitSource vals=['650', '580', '49', '885', '497', '195', '320', '137', '245', '663']
|
||||
ok: RandomDigitSource vals=['638', '025', '232', '779', '826', '972', '650', '580', '049', '885']
|
||||
ok: RandomHexDigitSource vals=['6b65c172', 'abcf6df0', '984a5fcf']
|
||||
ok: RandomHexDigitSource vals=['96876670', '356ce766', 'd1d697c1']
|
||||
ok: RandomHexDigitSource vals=['f95d8c86', '2bdb095e', '6b65c172']
|
||||
ok: IncDigitSource vals=['10001', '10002', '10003']
|
||||
ok: CsvSource vals=['first val', 'second val', 'third val']
|
||||
Reference in New Issue
Block a user