53 Commits

Author SHA1 Message Date
Alexander Couzens
369bdb98c6 working without the step 0 hack
Change-Id: I52a8799be88a8cc5d6113466cd6f4d0d42f37adc
2025-12-16 14:49:21 +01:00
Alexander Couzens
4f30a9c2f7 osmo-smdpp: ignore EID check in EUM cert
Change-Id: Ia5baf42878f0e83b0d24350c5b265a8aadbe1a34
2025-12-16 14:21:13 +01:00
Alexander Couzens
51d91896ff Hack: make the congress profile working
Change-Id: I121a4a1a0c2279b7a44417a1f805b29dff1adc8e
2025-12-15 22:34:36 +01:00
Alexander Couzens
125449126d ts_31_102: EF SUCI_Calc_Info: fix decoding empty files
When trying to use `edit_binary_decoded` with an empty file, pysim
runs into a len(None) exception, because hpkl.to_dict()['hnet_pubkey_list'] returns
None.

Can reproduced with a CCC Camp 2023 usim and editing the file.

Change-Id: Ib8e322e65dd768bfd49e7a5620a2163f12a74ec7
2025-12-14 23:51:49 +01:00
Harald Welte
572a81f2af pySim.runtime: Fix file selection by upper case hex FID
When trying to remove a file (e.g. DF.5G_ProSe, 5FF0),
there seems to be a case sensitive check when checking for the dict:
pySim/runtime.py: get_file_for_filename():

478          def get_file_for_filename(self, name: str):
479              """Get the related CardFile object for a specified filename."""
480              sels = self.selected_file.get_selectables()
481              return sels[name]

The dict sels contains 5ff0, but not 5FF0.
The type of argument name is str. So a case sensitive check will be used.

Change-Id: Idd0db1f4bbd3ee9eec20f5fd0f4371c2882950cd
Closes: OS#6898
2025-12-10 13:34:27 +00:00
Neels Hofmeyr
ff4f2491b8 fix downstream error: ImportError: cannot import name 'style' from 'cmd2'
cmd2 version 3.0 was released, with significant API changes. Limit the
dependency to below 3.0, as already reflected in requirements.txt.

Seeing but not changing the discrepancy in minimum version:
requirements.txt has >2.6.2 while setup.py has >= 1.5.0.

Related: SYS#7775 SYS#7777
Change-Id: I5186f242dbc1b770e3ab8cdca7f27d2a1029fff6
2025-12-10 04:06:43 +01:00
Harald Welte
05fd870d1b contrib/saip-tool: Use repr() on security domain keys
Let's not reinvent the wheel of printing such data structures and use
the repr method provided by the respective class instead.  This also
adds the missing key_usage_qualifier information to the print-out,
as well as the mac_len of the key components.

Change-Id: Iaead4a02f07130fd00bcecc43e1c843f1c221e63
2025-12-09 16:23:49 +00:00
Harald Welte
c07ecbae52 pySim.esim.saip: Hex representation of SecurityDomainKey
Let's print the key_usage_qualifier in hexadecimal notation (more compact)

Change-Id: Ic9a92d53d73378eafca1760dd8351215bce1157a
2025-12-09 16:23:49 +00:00
Alexander Couzens
e20f9e6cdf ts_102_221: EF.ARR: fix read_arr_record
`read_arr_record 1` failed with an AttributeError exception
because RECORD_NR must be all caps.

Change-Id: If44f9f2271293d3063f6c527e5a68dcfaeb5942e
2025-12-04 15:32:16 +01:00
Philipp Maier
3f3f4e20e2 docs/conf.py: update copyright year
The copyright year of the docs is still at 2023, let's update it
to the current year.

Change-Id: Icf64670847d090a250f732d94d18e780e483239b
2025-11-25 17:14:54 +01:00
Philipp Maier
c2fb84251b card_key_provider: add missing type annotation
Related: SYS#7725
Change-Id: I45751d2b31976c04c03006d8baa195eef2576b4f
2025-11-21 17:49:09 +01:00
Philipp Maier
61541e7502 card_key_provider: refactor code and optimize out get_field method
The method get_field in the base class can be optimized out. This
also allows us to remove code dup in the card_key_provider_get_field
function.

Let's also fix the return code behavior. A get method in a
CardKeyProvider implementation should always return None in case
nothing is found. Also it should not crash in that case. This will
allow the card_key_provider_get function to move on to the next
CardKeyProvider. In case no CardKeyProvider yields any results, an
exception is appropriate since it is pointless to continue execution
with "None" as key material.

To make the debugging of problems easier, let's also print some debug
messages that inform the user what key/value pair and which
CardKeyProvider was queried. This will make it easier to investigate
in case an expected result was not found.

Related: SYS#7725
Change-Id: I4d6367b8eb057e7b2c06c8625094d8a1e4c8eef9
2025-11-21 17:49:09 +01:00
Philipp Maier
579214c4d0 card_key_provider: remove method _verify_get_data from base class
The method _verify_get_data was intended to be used to verify the
user input before it further processed but ended up to be a simple
check that only checks the name of the key column very basically.

Unfortunately it is difficult to generalize the check code as the
concrete implementation of those checks is highly format dependent.
With the advent of eUICCs, we now have two data formats with
different lookup keys, so a static list with valid lookup keys is
also no longer up to the task.

After all it makes not much sense to keep this method, so let's
remove it.

(From the technical perspective, the key column is not limitied to
any specif field. In theory it would even be possible to use the KI
as lookup key as well, even though it would not make sense in
practice)

Related: SYS#7725
Change-Id: Ibf5745fb8a4f927397adff33900731524715d6a9
2025-11-21 17:49:09 +01:00
Philipp Maier
4a7651eb65 pySim-shell: re-organize Card Key Provider related options
As we plan to support other formats as data source for the Card Key
Provider soon, the more commandline options may be added and it makes
sense to group the Card Key Provider options in a dedicated group.

Let's also rename the option "--csv-column-key" to just "--column-key".
The column encryption is a generic concept and not CSV format specific.
(let's silently keep the "--csv-column-key" argument so maintain backward
compatibility)

Related: SYS#7725
Change-Id: I5093f8383551f8c9b84342ca6674c1ebdbbfc19c
2025-11-21 17:49:09 +01:00
Philipp Maier
01a6724153 pySim-shell: add command to manually query the Card Key Provider
The Card Key Provider is a built in mechanism of pySim-shell which
allows the user to read key material from a CSV file in order to
avoid having to lookup and enter the key material himself. The
lookup normally done by the pySim-shell commands automatically.

However, in some cases it may also be useful to be able to query the
CSV file manually in order to get certain fields displayed. Such a
command is in particular helpful to check and diagnose the CSV data
source.

Related: SYS#7725
Change-Id: I76e0f883572a029bdca65a5a6b3eef306db1c221
2025-11-21 17:49:09 +01:00
Philipp Maier
a6ca5b7cd1 card_key_provider: remove unnecessary class property definitions
The two properties csv_file and csv_filename are defined by the
constructor anyway, let's remove the declaration in the class body
because it is not needed.

Change-Id: Ibbe8e17b03a4ba0041c0e9990a5e9614388d9c03
2025-11-21 17:49:09 +01:00
Philipp Maier
bcca2bffc2 card_key_provider: rename parameter filename to csv_filename
let's rename the parameter filename to csv_filename to make it
more clear to what kind of file this parameter refers.

Change-Id: Id5b7c61b5e72fb205e30d2787855b2c276840a7b
2025-11-21 17:49:09 +01:00
Philipp Maier
e80f96cc3b card_key_provider: use case-insensitive field names
It is common in CSV files that the columns have uppercase names, so we
have adopted this scheme when we started using the card_key_provider.

This also means that the API of the card_key_provider_get() and
card_key_provider_get_field() function now implicitly requires
uppercase field names like 'ICCID', 'ADM1', etc.

Unfortunately this may be unreliable, so let's convert the field
names to uppercase as soon as we receive them. This makes the API
case-insensitive and gives us the assurance that all field names
we ever work with are in uppercase.

Related: SYS#7725
Change-Id: I9d80752587e2ccff0963c10abd5a2f42f5868d79
2025-11-21 17:49:09 +01:00
Philipp Maier
4550574e03 card_key_provider: separate and refactor CSV column encryption
The CardKeyProviderCsv class implements a column decryption scheme
where columns are protected using a transport key. The CSV files
are enrcypted using contrib/csv-encrypt-columns.py.

The current implementation has two main problems:

- The decryption code in CardKeyProviderCsv is not specific to CSV files.
  It could be re-used in other formats, for example to decrypt columns
  (fields) red from a database. So let's split the decryption code in a
  separate class.

- The encryption code in csv-encrypt-columns.py accesses methods and
  properties in CardKeyProviderCsv. Also having the coresponding
  encryption code somewhere out of tree may be confusing. Let's improve
  the design and put encryption and decryption functions in a single
  class. Let's also make sure the encryption/decryption is covered by
  unittests.

Related: SYS#7725
Change-Id: I180457d4938f526d227c81020e4e03c6b3a57dab
2025-11-21 17:49:09 +01:00
Philipp Maier
08565e8a98 pySim-shell: use log level INFO by default
The default log level of the PySimLogger is DEBUG by default. This is
to ensure that all messages are printed in an unconfigured setup.

However in pySim-Shell we care about configuring the logger, so let's
set the debug log level to INFO in startup. This will allow us to
turn debug messages on and off using the verbose switch.

Change-Id: I89315f830ce1cc2d573887de4f4cf4e19d17543b
Related: SYS#7725
2025-11-21 17:49:09 +01:00
Harald Welte
fb20b7bc58 contrib: Add a small command line script to generate StoreMetadataRequest
It's occasionally useful to be able to manually generate a
SGP.22 StoreMetadataRequest (tag BF25), so let's add a small utility
program doing exactly that.

Change-Id: I56ebd040f09dcd167b0b22148c2f1af56240b3b5
2025-11-21 11:41:29 +00:00
Harald Welte
52df66cd56 pySim.esim.es8p: Support non-operational ProfileMetadata
If no profileClass is given, ProfileMetadata defaults to operational.
Let's add the capability to also generate metadata for test or provisioning profiles.

Change-Id: Id55537ed03e2690c1fc9545bb3c49cfc76d8e331
2025-11-21 11:41:29 +00:00
Philipp Maier
784cebded4 card_key_provider: add unit-test
There is no unit-test for the CardKeyProviderCsv class yet. Let's add
one to ensure that the CardKeyProviderCsv class keeps working as expected.

Related: SYS#7725
Change-Id: I52519847a4c4a13a7bca49985133872b01c4aaab
2025-11-18 15:47:40 +01:00
Philipp Maier
4f75aa1c8f card_key_provider: fix sourcecode formatting
Change-Id: I5675f9f087086646937ca077d3545d2729ccd812
2025-11-18 14:08:42 +01:00
Philipp Maier
94811ab585 pySim-shell: allow user friendly selection of the pin type
The CHV commands (verify_chv, enable_chv, disable_chv, unblock_chv)
provide a --pin-nr parameter.

The --pin-nr is a decimal parameter that specifies the pin type to be
used. The exact pin type numbers are specified in ETSI TS 102.221,
Table 9.3.

Unfortunately the --pin-nr parameter is not very intuitive to use, it
it requires the user to manually lookup the numeric value. The specs
list that value as hexadecimal, so the user also has to convert it
to decimal. To make this less complicated, let's also accept
hexadecimal numbers with the --pin-nr parameter.

However, this alone does not improve the user expierience much. Let's
also add a --pin-type parameter (similar to the --adm-type parameter
of the verify_adm command) to specifiy the pin type in a human
readable form.

Change-Id: I0b58c402d95cbc4fe690e6edb214829d463e9f2c
2025-11-01 14:03:23 +00:00
Philipp Maier
f3e6e85f99 osmo-smdpp: update documentation
osmo-smdpp has built-in SSL/TLS support for quite some time now. The manual does not
yet mention this feature yet.

Change-Id: I2db5ae32914386a34eab1ed7d2aff8cae82bfa9b
2025-10-31 16:31:45 +01:00
Philipp Maier
7f2cb157c8 osmo-smdpp: update commandline help and default port
osmo-smdpp has built-in TLS support for some time now. Let's update
update the commandline help to be more concise.

Since the built-in SSL/TLS support is enabled by default, let's also
update the default port from 8000 to 443.

Change-Id: Ib5a069a8612beb1a9716a7514b498ec70d141178
2025-10-31 16:31:40 +01:00
Philipp Maier
f94f366cf9 runtime: check record/file size before write
When writing data to a transparent or linear fixed (record oriented)
and the data to write exceeds the record/file size, then the UICC will
respond with an error "6700: Checking errors - Wrong length"

In particular when the data is supplied as a JSON object and not as a
hex string, it may not be immediately obvious to the average user what
the problem actually is.

Let's check the record/file size before writing the data and raise an
exception in case the data excieeds the record/file size. Let's also
print an informative string message in case the data length is less
than the record/file size to make the user aware of unwritten bytes
at the end of a record/file.

Related: OS#6864
Change-Id: I7fa717d803ae79398d2c5daf92a7336be660c5ad
2025-10-28 13:39:35 +01:00
Philipp Maier
4429e1cc70 pySim-shell: add a logger class to centralize logging
In many sub modules we still use print() to occassionally print status
messages or warnings. This technically does not hurt, but it is an unclean
solution which we should replace with something more mature.

Let's use python's built in logging framework to create a static logger
class that fits our needs. To maintain compatibility let's also make sure
that the logger class will behave like a normal print() statement when no
configuration parameters are supplied by the API user.

To illustrate how the approach can be used in sub-modules, this patch
replaces the print statements in runtime.py. The other print statements
will the be fixed in follow-up patches.

Related: OS#6864
Change-Id: I187f117e7e1ccdb2a85dfdfb18e84bd7561704eb
2025-10-25 19:46:34 +00:00
Philipp Maier
1ab2f8dd9d commands: do not use b2h with a string
The function h2b expects a bytearray and must not be used on a string.
This is also true for nullstrings ('').

Related: OS#6869
Change-Id: I0e28e6ec476901bf19aa0f8640e41c74aa6e3aa2
2025-10-21 17:17:21 +02:00
Oliver Smith
e5f39fbd34 Pass pylint 3.3.4 from debian trixie
************* Module osmo-smdpp
osmo-smdpp.py:657:72: E0606: Possibly using variable 'iccid_str' before assignment (possibly-used-before-assignment)

=> False-positive: code paths that don't set iccid_str raise an error, so
   this shouldn't be a problem.

************* Module pySim-smpp2sim
pySim-smpp2sim.py:427:4: E1101: Module 'twisted.internet.reactor' has no 'run' member (no-member)

=> False-positive: pylint doesn't recognize dynamically set attributes.

************* Module es9p_client
contrib/es9p_client.py:126:11: E0606: Possibly using variable 'opts' before assignment (possibly-used-before-assignment)

=> Real bug, should be "self.opts".

Related: https://stackoverflow.com/a/18712867
Change-Id: Id042ba0944b58d98d27e1222ac373c7206158a91
2025-10-02 09:06:44 +02:00
Harald Welte
947154639c pySim.esim.saip.FsNodeADF: Fix __str__ method
It's quite common for a FsNodeADF to not have a df_name, so we need
to guard against that during stringification to avoid an exception.

Change-Id: I919d7c46575e0ebcdf3b979347a5cdd1a9feb294
2025-09-24 17:59:17 +00:00
Kian-Meng Ang
4ee99c18cd Fix typos
Found via `codespell -S tests -L ist,adn,ciph,ue,ot,readd,te,oce,tye`

Change-Id: I00a72e4f479dcef88f7d1058ce53edd0129d336a
2025-09-24 17:59:17 +00:00
Eric Wild
5d2e2ee259 bsp: fix maxpayloadsize
Change-Id: I08f544877b79681ad1f758a1ca31c292eae9f868
2025-09-24 15:04:36 +00:00
Harald Welte
92841f2cd5 docs/suci-keytool.rst: spelling fix
Change-Id: Idb45086d9d5963072fbc97835d551e2f78ad847f
2025-09-04 18:57:27 +02:00
Bjoern Riemer
caa955b3ac Identify cards by Historical bytes of ATR
- try to identify the CardModel by just comparing the Historical Bytes if matching by Whole ATR failed
- add decompose ATR code from pyscard-contrib

Related: OS#6837
Change-Id: Id7555e42290d232a0e0efc47e7d97575007d846f
2025-08-28 21:44:24 +00:00
Bjoern Riemer
4dddcf932a Make sure to select MF before probing for files/Addons
Change-Id: I685367c282f57a8856668a3172a9391a5bbcf2e2
2025-08-28 21:44:24 +00:00
Oliver Smith
10fe0e3aae docs: fix authors line exceeding the page
Fix that the authors get cut off as they exceed the page in the PDF
version. This can currently be seen here:
https://downloads.osmocom.org/docs/pysim/master/osmopysim-usermanual.pdf

Change-Id: Iacbba6c2f74bf2b9f96057e71bde017a11f703a8
2025-08-27 14:31:13 +02:00
Oliver Smith
076fec267a requirements: set cmd2>=2.6.2,<3.0
Remove the previous workaround that set cmd2==2.4.3 in jenkins.sh. The
bug this worked around has been fixed in 2.6.2.

3.0 will break unless we use some new additional decorator.

Related: OS#6776
Change-Id: I4ba65ed486247c5670313b75f43a242d264df14b
2025-08-27 14:31:13 +02:00
Eric Wild
b4a12ecc14 smdp: update tls certs
Old expired today, new from https://www.gsma.com/solutions-and-impact/technologies/esim/wp-content/uploads/2021/07/SGP.26_v1.5-17-July-2025_files_v3.zip

Change-Id: I585efe3360a0aac2a49a79d5fef2789dea2b169d
2025-08-21 14:54:32 +00:00
Eric Wild
6cffb31b42 memory backed ephermal session store for easy concurrent runs
Change-Id: I05bfd6ff471ccf1c8c2b5f2b748b9d4125ddd4f7
2025-08-15 13:04:02 +02:00
Eric Wild
6aed97d6c8 smdpp: fix asn1tool OBJECT IDENTIFIER decoding
While at it make the linter happy.
The feature to ignore blocks is making slow progress:
https://github.com/astral-sh/ruff/issues/3711#

Change-Id: Ic678e6c4a4c1a01de87a8dce26f4a5e452e8562a
2025-08-15 13:04:02 +02:00
Eric Wild
cb7d5aa3a7 smdpp: add proper brp cert support
Change-Id: I6906732f7d193a9c2234075f4a82df5e0ed46100
2025-08-15 13:04:02 +02:00
Eric Wild
70fedb5a46 smdpp: verify cert chain
Change-Id: I1e4e4b1b032dc6a8b7d15bd80d533a50fe0cff15
2025-08-15 13:04:02 +02:00
Eric Wild
7798ea9c5c x509 cert: fix weird cert check
Change-Id: I18beab0e1b24579724704c4141a2c457b2d4cf99
2025-08-15 13:04:02 +02:00
Eric Wild
0b1d3c85fd smdpp: less verbose by default
Those data blobs are huge.

Change-Id: I04a72b8f52417862d4dcba1f0743700dd942ef49
2025-08-15 13:04:02 +02:00
Eric Wild
3c1a59640c smdp: clean up accidental commited trash
and update gitinore to ensure this does not happen again

Change-Id: Ieeb2da3bdb006b08e2f82bfc3b5252f4abf4420b
2025-08-15 13:04:01 +02:00
Eric Wild
ccefc98160 smdpp: add proper tls support, cert generation FOR TESTING
If TLS is enabled (default) it will automagically generate missing pem files + dh params.

A faithful reproduction of the certs found in SGP.26_v1.5_Certificates_18_07_2024.zip available at
https://www.gsma.com/solutions-and-impact/technologies/esim/gsma_resources/sgp-26-test-certificate-definition-v1-5/
can be generated by running contrib/generate_certs.py. This allows adjusting the expiry dates, CA flag,
and other parameters FOR TESTING. Certs can be used by the smdpp by running
$ python -u osmo-smdpp.py -c generated

Change-Id: I84b2666422b8ff565620f3827ef4d4d7635a21be
2025-06-25 10:22:42 +02:00
Eric Wild
79805d1dd7 smdpp: reorder imports
Change-Id: Ib72089fb75d71f0d33c9ea17e5491dd52558f532
2025-06-25 10:22:42 +02:00
Eric Wild
5969901be5 smdpp: Verify EID is within permitted range of EUM certificate
Change-Id: Ice704548cb62f14943927b5295007db13c807031
2025-06-21 13:18:38 +00:00
Eric Wild
5316f2b1cc smdpp: verify request headers
Change-Id: Ic1221bcb87a9975a013ab356266d3cb76d9241f1
2025-06-21 12:19:42 +00:00
Eric Wild
9572cbdb61 smdpp: update certs
TLS will expire again:
$ find . -iname "CERT*NIST*der" | xargs -ti  openssl x509 -in {} -inform DER -noout -checkend $((24*3600*90))
...
openssl x509 -in ./smdpp-data/generated/DPtls/CERT_S_SM_DP_TLS_NIST.der -inform DER -noout -checkend 7776000
Certificate will expire
...

Grabbed from SGP.26_v1.5_Certificates_18_07_2024.zip available at
https://www.gsma.com/solutions-and-impact/technologies/esim/gsma_resources/sgp-26-test-certificate-definition-v1-5/

Change-Id: I25442d6f55a385019bba1e47ad3d795120f850ad
2025-06-21 12:19:33 +00:00
Eric Wild
7fe7bff3d8 smdpp: optional deps
Works locally, too:
$ pip install -e ".[smdpp]"

Change-Id: If69b2bd5f8bc604443108c942c17eba9c22f4053
2025-06-16 13:37:43 +02:00
91 changed files with 2305 additions and 2385 deletions

6
.gitignore vendored
View File

@@ -7,6 +7,10 @@
/.local
/build
/pySim.egg-info
/smdpp-data/sm-dp-sessions
/smdpp-data/sm-dp-sessions*
dist
tags
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
smdpp-data/generated
smdpp-data/certs/dhparam2048.pem

View File

@@ -1,97 +0,0 @@
#!/usr/bin/env python3
"""Connect smartcard to a remote server, so the remote server can take control and
perform commands on it."""
import sys
import json
import logging
import argparse
from osmocom.utils import b2h
import websockets
from pySim.transport import init_reader, argparse_add_reader_args, LinkBase
from pySim.commands import SimCardCommands
from pySim.wsrc.client_blocking import WsClientBlocking
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
class CardWsClientBlocking(WsClientBlocking):
"""Implementation of the card (reader) client of the WSRC (WebSocket Remote Card) protocol"""
def __init__(self, ws_uri, tp: LinkBase):
super().__init__(ws_uri)
self.tp = tp
def perform_outbound_hello(self):
hello_data = {
'client_type': 'card',
'atr': b2h(self.tp.get_atr()),
# TODO: include various card information in the HELLO message
}
super().perform_outbound_hello(hello_data)
def handle_rx_c_apdu(self, rx: dict):
"""handle an inbound APDU transceive command"""
data, sw = self.tp.send_apdu(rx['command'])
tx = {
'response': data,
'sw': sw,
}
self.tx_json('r_apdu', tx)
def handle_rx_disconnect(self, rx: dict):
"""server tells us to disconnect"""
self.tx_json('disconnect_ack')
# FIXME: tear down connection and/or terminate entire program
def handle_rx_print(self, rx: dict):
"""print a message (text) given by server to the local console/log"""
print(rx['message'])
# no response
def handle_rx_reset_req(self, rx: dict):
"""server tells us to reset the card"""
self.tp.reset()
self.tx_json('reset_resp', {'atr': b2h(self.tp.get_atr())})
parser = argparse.ArgumentParser()
argparse_add_reader_args(parser)
parser.add_argument("--uri", default="ws://localhost:8765/",
help="URI of the sever to which to connect")
opts = parser.parse_args()
# open the card reader / slot
logger.info("Initializing Card Reader...")
tp = init_reader(opts)
logger.info("Connecting to Card...")
tp.connect()
scc = SimCardCommands(transport=tp)
logger.info("Detected Card with ATR: %s" % b2h(tp.get_atr()))
# TODO: gather various information about the card; print it
# create + connect the client to the server
cl = CardWsClientBlocking(opts.uri, tp)
logger.info("Connecting to remote server...")
try:
cl.connect()
print("Successfully connected to Server")
except ConnectionRefusedError as e:
print(e)
sys.exit(1)
try:
while True:
# endless loop: wait for inbound command from server + execute it
cl.rx_and_execute_cmd()
# TODO: clean handling of websocket disconnect
except websockets.exceptions.ConnectionClosedOK as e:
print(e)
sys.exit(1)
except KeyboardInterrupt as e:
print(e.__class__.__name__)
sys.exit(2)

View File

@@ -1,241 +0,0 @@
#!/usr/bin/env python
import json
import asyncio
import logging
from typing import Optional, Tuple
from websockets.asyncio.server import serve
from websockets.exceptions import ConnectionClosedError
from osmocom.utils import Hexstr, swap_nibbles
from pySim.utils import SwMatchstr, ResTuple, sw_match, dec_iccid
from pySim.exceptions import SwMatchError
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
card_clients = set()
user_clients = set()
class WsClient:
def __init__(self, websocket, hello: dict):
self.websocket = websocket
self.hello = hello
self.identity = {}
def __str__(self):
return '%s(ws=%s)' % (self.__class__.__name__, self.websocket)
async def rx_json(self):
rx = await self.websocket.recv()
rx_js = json.loads(rx)
logger.debug("Rx: %s", rx_js)
assert 'msg_type' in rx
return rx_js
async def tx_json(self, msg_type:str, d: dict = {}):
"""Transmit a json-serializable dict to the peer"""
d['msg_type'] = msg_type
d_js = json.dumps(d)
logger.debug("Tx: %s", d_js)
await self.websocket.send(d_js)
async def tx_hello_ack(self):
await self.tx_json('hello_ack')
async def xceive_json(self, msg_type:str, d:dict = {}, exp_msg_type:Optional[str] = None) -> dict:
await self.tx_json(msg_type, d)
rx = await self.rx_json()
if exp_msg_type:
assert rx['msg_type'] == exp_msg_type
return rx;
async def tx_error(self, message: str):
"""Transmit an error message to the peer"""
event = {
"message": message,
}
await self.tx_json('error', event)
async def ws_hdlr(self):
"""kind of a 'main' function for the websocket client: wait for incoming message,
and handle it."""
try:
async for message in self.websocket:
method = getattr(self, 'handle_rx_%s' % message['msg_type'], None)
if not method:
await self.tx_error("Unknonw msg_type: %s" % message['msg_type'])
else:
method(message)
except ConnectionClosedError:
# we handle this in the outer loop
pass
class CardClient(WsClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.state = 'init'
def __str__(self):
eid = self.identity.get('EID', None)
if eid:
return '%s(EID=%s)' % (self.__class__.__name__, eid)
iccid = self.identity.get('ICCID', None)
if iccid:
return '%s(ICCID=%s)' % (self.__class__.__name__, iccid)
return super().__str__()
"""A websocket client that represents a reader/card. This is what we use to talk to a card"""
async def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
"""transceive a single APDU with the card"""
message = await self.xceive_json('c_apdu', {'command': cmd}, 'r_apdu')
return message['response'], message['sw']
async def xceive_apdu(self, pdu: Hexstr) -> ResTuple:
"""transceive an APDU with the card, handling T=0 GET_RESPONSE cases"""
prev_pdu = pdu
data, sw = await self.xceive_apdu_raw(pdu)
if sw is not None:
while (sw[0:2] in ['9f', '61', '62', '63']):
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
pdu_gr = pdu[0:2] + 'c00000' + sw[2:4]
prev_pdu = pdu_gr
d, sw = await self.xceive_apdu_raw(pdu_gr)
data += d
if sw[0:2] == '6c':
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
pdu_gr = prev_pdu[0:8] + sw[2:4]
data, sw = await self.xceive_apdu_raw(pdu_gr)
return data, sw
async def xceive_apdu_checksw(self, pdu: Hexstr, sw: SwMatchstr = "9000") -> ResTuple:
"""like xceive_apdu, but checking the status word matches the expected pattern"""
rv = await self.xceive_apdu(pdu)
last_sw = rv[1]
if not sw_match(rv[1], sw):
raise SwMatchError(rv[1], sw.lower())
return rv
async def card_reset(self):
"""reset the card"""
rx = await self.xceive_json('reset_req', exp_msg_type='reset_resp')
async def get_iccid(self):
"""high-level method to obtain the ICCID of the card"""
await self.xceive_apdu_checksw('00a40000023f00') # SELECT MF
await self.xceive_apdu_checksw('00a40000022fe2') # SELECT EF.ICCID
res, sw = await self.xceive_apdu_checksw('00b0000000') # READ BINARY
return dec_iccid(res)
async def get_eid_sgp22(self):
"""high-level method to obtain the EID of a SGP.22 consumer eUICC"""
await self.xceive_apdu_checksw('00a4040410a0000005591010ffffffff8900000100')
res, sw = await self.xceive_apdu_checksw('80e2910006bf3e035c015a')
return res[-32:]
async def identify(self):
# identify the card by asking for its EID and/or ICCID
try:
eid = await self.get_eid_sgp22()
logger.debug("EID: %s", eid)
self.identity['EID'] = eid
except SwMatchError:
pass
try:
iccid = await self.get_iccid()
logger.debug("ICCID: %s", iccid)
self.identity['ICCID'] = iccid
except SwMatchError:
pass
logger.info("Card now in READY state")
self.state = 'ready'
@staticmethod
def find_client_for_id(id_type: str, id_str: str) -> Optional['CardClient']:
for c in card_clients:
print("testing card %s in state %s" % (c, c.state))
if c.state != 'ready':
continue
c_id = c.identity.get(id_type.upper(), None)
if c_id and c_id.lower() == id_str.lower():
return c
return None
class UserClient(WsClient):
"""A websocket client representing a user application like pySim-shell."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.state = 'init'
async def state_init(self):
"""Wait for incoming 'select_card' and process it."""
while True:
rx = await self.rx_json()
if rx['msg_type'] == 'select_card':
# look-up if the card can be found
card = CardClient.find_client_for_id(rx['id_type'], rx['id_str'])
if not card:
await self.tx_error('No CardClient found for %s == %s' % (rx['id_type'], rx['id_str']))
continue
# transition to next statee
self.state = 'associated'
card.state = 'associated'
self.card = card
await self.tx_json('select_card_ack', {'identities': card.identity})
break
else:
self.tx_error('Unknown message type %s' % rx['msg_type'])
async def state_selected(self):
while True:
rx = await self.rx_json()
if rx['msg_type'] == 'c_apdu':
rsp, sw = await self.card.xceive_apdu_raw(rx['command'])
await self.tx_json('r_apdu', {'response': rsp, 'sw': sw})
async def ws_conn_hdlr(websocket):
rx_raw = await websocket.recv()
rx = json.loads(rx_raw)
assert rx['msg_type'] == 'hello'
client_type = rx['client_type']
logger.info("New client (type %s) connection accepted", client_type)
if client_type == 'card':
card = CardClient(websocket, rx)
await card.tx_hello_ack()
card_clients.add(card)
# first obtain the identity of the card
await card.identify()
# then go into the "main loop"
try:
await card.ws_hdlr()
finally:
logger.info("%s: connection closed", card)
card_clients.remove(card)
elif client_type == 'user':
user = UserClient(websocket, rx)
await user.tx_hello_ack()
user_clients.add(user)
# first wait for the user to specify the select the card
await user.state_init()
try:
await user.state_selected()
finally:
logger.info("%s: connection closed", user)
user_clients.remove(user)
else:
logger.info("Rejecting client (unknown type %s) connection", client_type)
raise ValueError
async def main():
async with serve(ws_conn_hdlr, "localhost", 8765):
await asyncio.get_running_loop().create_future() # run forever
asyncio.run(main())

View File

@@ -24,20 +24,12 @@ import argparse
from Cryptodome.Cipher import AES
from osmocom.utils import h2b, b2h, Hexstr
from pySim.card_key_provider import CardKeyProviderCsv
from pySim.card_key_provider import CardKeyFieldCryptor
def dict_keys_to_upper(d: dict) -> dict:
return {k.upper():v for k,v in d.items()}
class CsvColumnEncryptor:
class CsvColumnEncryptor(CardKeyFieldCryptor):
def __init__(self, filename: str, transport_keys: dict):
self.filename = filename
self.transport_keys = dict_keys_to_upper(transport_keys)
def encrypt_col(self, colname:str, value: str) -> Hexstr:
key = self.transport_keys[colname]
cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV)
return b2h(cipher.encrypt(h2b(value)))
self.crypt = CardKeyFieldCryptor(transport_keys)
def encrypt(self) -> None:
with open(self.filename, 'r') as infile:
@@ -49,9 +41,8 @@ class CsvColumnEncryptor:
cw.writeheader()
for row in cr:
for key_colname in self.transport_keys:
if key_colname in row:
row[key_colname] = self.encrypt_col(key_colname, row[key_colname])
for fieldname in cr.fieldnames:
row[fieldname] = self.crypt.encrypt_field(fieldname, row[fieldname])
cw.writerow(row)
if __name__ == "__main__":
@@ -71,9 +62,5 @@ if __name__ == "__main__":
print("You must specify at least one key!")
sys.exit(1)
csv_column_keys = CardKeyProviderCsv.process_transport_keys(csv_column_keys)
for name, key in csv_column_keys.items():
print("Encrypting column %s using AES key %s" % (name, key))
cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
cce.encrypt()

View File

@@ -24,7 +24,7 @@ ICCID_HELP='The ICCID of the eSIM that shall be made available'
MATCHID_HELP='MatchingID that shall be used by profile download'
parser = argparse.ArgumentParser(description="""
Utility to manuall issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
Utility to manually issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint')
parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+')
parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server')
@@ -63,7 +63,7 @@ if __name__ == '__main__':
data = {}
for k, v in vars(opts).items():
if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']:
# remove keys from dict that shold not end up in JSON...
# remove keys from dict that should not end up in JSON...
continue
if v is not None:
data[k] = v

View File

@@ -68,7 +68,7 @@ parser_dl.add_argument('--confirmation-code',
# notification
parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification')
parser_ntf.add_argument('operation', choices=['enable','disable','delete'],
help='Profile Management Opreation whoise occurrence shall be notififed')
help='Profile Management Operation whoise occurrence shall be notififed')
parser_ntf.add_argument('--sequence-nr', type=int, required=True,
help='eUICC global notification sequence number')
parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL')
@@ -123,8 +123,8 @@ class Es9pClient:
'profileManagementOperation': PMO(self.opts.operation).to_bitstring(),
'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc,
}
if opts.iccid:
ntf_metadata['iccid'] = h2b(swap_nibbles(opts.iccid))
if self.opts.iccid:
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid))
if self.opts.operation == 'download':
pird = {

40
contrib/esim_gen_metadata.py Executable file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# (C) 2025 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
from osmocom.utils import h2b, swap_nibbles
from pySim.esim.es8p import ProfileMetadata
parser = argparse.ArgumentParser(description="""Utility program to generate profile metadata in the
StoreMetadataRequest format based on input values from the command line.""")
parser.add_argument('--iccid', required=True, help="ICCID of eSIM profile");
parser.add_argument('--spn', required=True, help="Service Provider Name");
parser.add_argument('--profile-name', required=True, help="eSIM Profile Name");
parser.add_argument('--profile-class', choices=['test', 'operational', 'provisioning'],
default='operational', help="Profile Class");
parser.add_argument('--outfile', required=True, help="Output File Name");
if __name__ == '__main__':
opts = parser.parse_args()
iccid_bin = h2b(swap_nibbles(opts.iccid))
pmd = ProfileMetadata(iccid_bin, spn=opts.spn, profile_name=opts.profile_name,
profile_class=opts.profile_class)
with open(opts.outfile, 'wb') as f:
f.write(pmd.gen_store_metadata_request())
print("Written StoreMetadataRequest to '%s'" % opts.outfile)

View File

@@ -1,206 +0,0 @@
#!/usr/bin/env python3
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# This is a script to generate a [partial] eSIM profile from the 'fsdump' of another USIM/ISIM. This is
# useful to generate an "as close as possible" eSIM from a physical USIM, as far as that is possible
# programmatically and in a portable way.
#
# Of course, this really only affects the file sytem aspects of the card. It's not possible
# to read the K/OPc or other authentication related parameters off a random USIM, and hence
# we cannot replicate that. Similarly, it's not possible to export the java applets from a USIM,
# and hence we cannot replicate those.
import argparse
from pySim.esim.saip import *
from pySim.ts_102_221 import *
class FsdumpToSaip:
def __init__(self, pes: ProfileElementSequence):
self.pes = pes
@staticmethod
def fcp_raw2saip_fcp(fname:str, fcp_raw: bytes) -> Dict:
"""Convert a raw TLV-encoded FCP to a SAIP dict-format (as needed by asn1encode)."""
ftype = fname.split('.')[0]
# use the raw FCP as basis so we don't get stuck with potentially old decoder bugs
# ore future format incompatibilities
fcp = FcpTemplate()
fcp.from_tlv(fcp_raw)
r = {}
r['fileDescriptor'] = fcp.child_by_type(FileDescriptor).to_bytes()
file_id = fcp.child_by_type(FileIdentifier)
if file_id:
r['fileID'] = file_id.to_bytes()
else:
if ftype in ['ADF']:
print('%s is an ADF but has no [mandatory] file_id!' % fname)
#raise ValueError('%s is an ADF but has no [mandatory] file_id!' % fname)
r['fileID'] = b'\x7f\xff' # FIXME: auto-increment
df_name = fcp.child_by_type(DfName)
if ftype in ['ADF']:
if not df_name:
raise ValueError('%s is an ADF but has no [mandatory] df_name!' % fname)
r['dfName'] = df_name.to_bytes()
lcsi_byte = fcp.child_by_type(LifeCycleStatusInteger).to_bytes()
if lcsi_byte != b'\x05':
r['lcsi'] = lcsi_byte
sa_ref = fcp.child_by_type(SecurityAttribReferenced)
if sa_ref:
r['securityAttributesReferenced'] = sa_ref.to_bytes()
file_size = fcp.child_by_type(FileSize)
if ftype in ['EF']:
if file_size:
r['efFileSize'] = file_size.to_bytes()
psdo = fcp.child_by_type(PinStatusTemplate_DO)
if ftype in ['MF', 'ADF', 'DF']:
if not psdo:
raise ValueError('%s is an %s but has no [mandatory] PinStatusTemplateDO' % fname)
else:
r['pinStatusTemplateDO'] = psdo.to_bytes()
sfid = fcp.child_by_type(ShortFileIdentifier)
if sfid and sfid.decoded:
if ftype not in ['EF']:
raise ValueError('%s is not an EF but has [forbidden] shortEFID' % fname)
r['shortEFID'] = sfid.to_bytes()
pinfo = fcp.child_by_type(ProprietaryInformation)
if pinfo and ftype in ['EF']:
spinfo = pinfo.child_by_type(SpecialFileInfo)
fill_p = pinfo.child_by_type(FillingPattern)
repeat_p = pinfo.child_by_type(RepeatPattern)
# only exists for BER-TLV files
max_fsize = pinfo.child_by_type(MaximumFileSize)
if spinfo or fill_p or repeat_p or max_fsize:
r['proprietaryEFInfo'] = {}
if spinfo:
r['proprietaryEFInfo']['specialFileInformation'] = spinfo.to_bytes()
if fill_p:
r['proprietaryEFInfo']['fillPattern'] = fill_p.to_bytes()
if repeat_p:
r['proprietaryEFInfo']['repeatPattern'] = repeat_p.to_bytes()
if max_fsize:
r['proprietaryEFInfo']['maximumFileSize'] = max_fsize.to_bytes()
# TODO: linkPath
return r
@staticmethod
def fcp_fsdump2saip(fsdump_ef: Dict):
"""Convert a file from its "fsdump" representation to the SAIP representation of a File type
in the decoded format as used by the asn1tools-generated codec."""
# first convert the FCP
path = fsdump_ef['path']
fdesc = FsdumpToSaip.fcp_raw2saip_fcp(path[-1], h2b(fsdump_ef['fcp_raw']))
r = [
('fileDescriptor', fdesc),
]
# then convert the body. We're doing a straight-forward conversion without any optimization
# like not encoding all-ff files. This should be done by a subsequent optimizer
if 'body' in fsdump_ef and fsdump_ef['body']:
if isinstance(fsdump_ef['body'], list):
for b_seg in fsdump_ef['body']:
r.append(('fillFileContent', h2b(b_seg)))
else:
r.append(('fillFileContent', h2b(fsdump_ef['body'])))
print(fsdump_ef['path'])
return r
def add_file_from_fsdump(self, fsdump_ef: Dict):
fid = int(fsdump_ef['fcp']['file_identifier'])
# determine NAA
if fsdump_ef['path'][0:1] == ['MF', 'ADF.USIM']:
naa = NaaUsim
elif fsdump_ef['path'][0:1] == ['MF', 'ADF.ISIM']:
naa = NaaIsim
else:
print("Unable to determine NAA for %s" % fsdump_ef['path'])
return
pes.pes_by_naa[naa.name]
for pe in pes:
print("PE %s" % pe)
if not isinstance(pe, FsProfileElement):
print("Skipping PE %s" % pe)
continue
if not pe.template:
print("No template for PE %s" % pe )
continue
if not fid in pe.template.files_by_fid:
print("File %04x not available in template; must create it using GenericFileMgmt" % fid)
parser = argparse.ArgumentParser()
parser.add_argument('fsdump', help='')
def has_unsupported_path_prefix(path: List[str]) -> bool:
# skip some paths from export as they don't exist in an eSIM profile
UNSUPPORTED_PATHS = [
['MF', 'DF.GSM'],
]
for p in UNSUPPORTED_PATHS:
prefix = path[:len(p)]
if prefix == p:
return True
# any ADF not USIM or ISIM are unsupported
SUPPORTED_ADFS = [ 'ADF.USIM', 'ADF.ISIM' ]
if len(path) == 2 and path[0] == 'MF' and path[1].startswith('ADF.') and path[1] not in SUPPORTED_ADFS:
return True
return False
import traceback
if __name__ == '__main__':
opts = parser.parse_args()
with open(opts.fsdump, 'r') as f:
fsdump = json.loads(f.read())
pes = ProfileElementSequence()
# FIXME: fsdump has strting-name path, but we need FID-list path for ProfileElementSequence
for path, fsdump_ef in fsdump['files'].items():
print("=" * 80)
#print(fsdump_ef)
if not 'fcp_raw' in fsdump_ef:
continue
if has_unsupported_path_prefix(fsdump_ef['path']):
print("Skipping eSIM-unsupported path %s" % ('/'.join(fsdump_ef['path'])))
continue
saip_dec = FsdumpToSaip.fcp_fsdump2saip(fsdump_ef)
#print(saip_dec)
try:
f = pes.add_file_at_path(Path(path), saip_dec)
print(repr(f))
except Exception as e:
print("EXCEPTION: %s" % traceback.format_exc())
#print("EXCEPTION: %s" % e)
continue
print("=== Tree === ")
pes.mf.print_tree()
# FIXME: export the actual PE Sequence

661
contrib/generate_smdpp_certs.py Executable file
View File

@@ -0,0 +1,661 @@
#!/usr/bin/env python3
"""
Faithfully reproduces the smdpp certs contained in SGP.26_v1.5_Certificates_18_07_2024.zip
available at https://www.gsma.com/solutions-and-impact/technologies/esim/gsma_resources/sgp-26-test-certificate-definition-v1-5/
Only usable for testing, it obviously uses a different CI key.
"""
import os
import binascii
from datetime import datetime
from cryptography import x509
from cryptography.x509.oid import NameOID, ExtensionOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
# Custom OIDs used in certificates
OID_CERTIFICATE_POLICIES_CI = "2.23.146.1.2.1.0" # CI cert policy
OID_CERTIFICATE_POLICIES_TLS = "2.23.146.1.2.1.3" # DPtls cert policy
OID_CERTIFICATE_POLICIES_AUTH = "2.23.146.1.2.1.4" # DPauth cert policy
OID_CERTIFICATE_POLICIES_PB = "2.23.146.1.2.1.5" # DPpb cert policy
# Subject Alternative Name OIDs
OID_CI_RID = "2.999.1" # CI Registered ID
OID_DP_RID = "2.999.10" # DP+ Registered ID
OID_DP2_RID = "2.999.12" # DP+2 Registered ID
OID_DP4_RID = "2.999.14" # DP+4 Registered ID
OID_DP8_RID = "2.999.18" # DP+8 Registered ID
class SimplifiedCertificateGenerator:
def __init__(self):
self.backend = default_backend()
# Store generated CI keys to sign other certs
self.ci_certs = {} # {"BRP": cert, "NIST": cert}
self.ci_keys = {} # {"BRP": key, "NIST": key}
def get_curve(self, curve_type):
"""Get the appropriate curve object."""
if curve_type == "BRP":
return ec.BrainpoolP256R1()
else:
return ec.SECP256R1()
def generate_key_pair(self, curve):
"""Generate a new EC key pair."""
private_key = ec.generate_private_key(curve, self.backend)
return private_key
def load_private_key_from_hex(self, hex_key, curve):
"""Load EC private key from hex string."""
key_bytes = binascii.unhexlify(hex_key.replace(":", "").replace(" ", "").replace("\n", ""))
key_int = int.from_bytes(key_bytes, 'big')
return ec.derive_private_key(key_int, curve, self.backend)
def generate_ci_cert(self, curve_type):
"""Generate CI certificate for either BRP or NIST curve."""
curve = self.get_curve(curve_type)
private_key = self.generate_key_pair(curve)
# Build subject and issuer (self-signed) - same for both
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "Test CI"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "TESTCERT"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSPTEST"),
x509.NameAttribute(NameOID.COUNTRY_NAME, "IT"),
])
# Build certificate - all parameters same for both
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject)
builder = builder.issuer_name(issuer)
builder = builder.not_valid_before(datetime(2020, 4, 1, 8, 27, 51))
builder = builder.not_valid_after(datetime(2055, 4, 1, 8, 27, 51))
builder = builder.serial_number(0xb874f3abfa6c44d3)
builder = builder.public_key(private_key.public_key())
# Add extensions - all same for both
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True
)
builder = builder.add_extension(
x509.CertificatePolicies([
x509.PolicyInformation(
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_CI),
policy_qualifiers=None
)
]),
critical=True
)
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=False,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False
),
critical=True
)
builder = builder.add_extension(
x509.SubjectAlternativeName([
x509.RegisteredID(x509.ObjectIdentifier(OID_CI_RID))
]),
critical=False
)
builder = builder.add_extension(
x509.CRLDistributionPoints([
x509.DistributionPoint(
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
relative_name=None,
reasons=None,
crl_issuer=None
),
x509.DistributionPoint(
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
relative_name=None,
reasons=None,
crl_issuer=None
)
]),
critical=False
)
certificate = builder.sign(private_key, hashes.SHA256(), self.backend)
self.ci_keys[curve_type] = private_key
self.ci_certs[curve_type] = certificate
return certificate, private_key
def generate_dp_cert(self, curve_type, subject_cn, serial, key_hex,
cert_policy_oid, rid_oid, validity_start, validity_end):
"""Generate a DP certificate signed by CI - works for both BRP and NIST."""
curve = self.get_curve(curve_type)
private_key = self.load_private_key_from_hex(key_hex, curve)
ci_cert = self.ci_certs[curve_type]
ci_key = self.ci_keys[curve_type]
subject = x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
])
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject)
builder = builder.issuer_name(ci_cert.subject)
builder = builder.not_valid_before(validity_start)
builder = builder.not_valid_after(validity_end)
builder = builder.serial_number(serial)
builder = builder.public_key(private_key.public_key())
builder = builder.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.SubjectAlternativeName([
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
]),
critical=False
)
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False
),
critical=True
)
builder = builder.add_extension(
x509.CertificatePolicies([
x509.PolicyInformation(
x509.ObjectIdentifier(cert_policy_oid),
policy_qualifiers=None
)
]),
critical=True
)
builder = builder.add_extension(
x509.CRLDistributionPoints([
x509.DistributionPoint(
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
relative_name=None,
reasons=None,
crl_issuer=None
),
x509.DistributionPoint(
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
relative_name=None,
reasons=None,
crl_issuer=None
)
]),
critical=False
)
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
return certificate, private_key
def generate_tls_cert(self, curve_type, subject_cn, dns_name, serial, key_hex,
rid_oid, validity_start, validity_end):
"""Generate a TLS certificate signed by CI."""
curve = self.get_curve(curve_type)
private_key = self.load_private_key_from_hex(key_hex, curve)
ci_cert = self.ci_certs[curve_type]
ci_key = self.ci_keys[curve_type]
subject = x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
])
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject)
builder = builder.issuer_name(ci_cert.subject)
builder = builder.not_valid_before(validity_start)
builder = builder.not_valid_after(validity_end)
builder = builder.serial_number(serial)
builder = builder.public_key(private_key.public_key())
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False
),
critical=True
)
builder = builder.add_extension(
x509.ExtendedKeyUsage([
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
]),
critical=True
)
builder = builder.add_extension(
x509.CertificatePolicies([
x509.PolicyInformation(
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_TLS),
policy_qualifiers=None
)
]),
critical=False
)
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(dns_name),
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
]),
critical=False
)
builder = builder.add_extension(
x509.CRLDistributionPoints([
x509.DistributionPoint(
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
relative_name=None,
reasons=None,
crl_issuer=None
),
x509.DistributionPoint(
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
relative_name=None,
reasons=None,
crl_issuer=None
)
]),
critical=False
)
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
return certificate, private_key
def generate_eum_cert(self, curve_type, key_hex):
"""Generate EUM certificate signed by CI."""
curve = self.get_curve(curve_type)
private_key = self.load_private_key_from_hex(key_hex, curve)
ci_cert = self.ci_certs[curve_type]
ci_key = self.ci_keys[curve_type]
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
x509.NameAttribute(NameOID.COMMON_NAME, "EUM Test"),
])
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject)
builder = builder.issuer_name(ci_cert.subject)
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 28, 37))
builder = builder.not_valid_after(datetime(2054, 3, 24, 9, 28, 37))
builder = builder.serial_number(0x12345678)
builder = builder.public_key(private_key.public_key())
builder = builder.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=False,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=False,
encipher_only=False,
decipher_only=False
),
critical=True
)
builder = builder.add_extension(
x509.CertificatePolicies([
x509.PolicyInformation(
x509.ObjectIdentifier("2.23.146.1.2.1.2"), # EUM policy
policy_qualifiers=None
)
]),
critical=True
)
builder = builder.add_extension(
x509.SubjectAlternativeName([
x509.RegisteredID(x509.ObjectIdentifier("2.999.5"))
]),
critical=False
)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=0),
critical=True
)
builder = builder.add_extension(
x509.CRLDistributionPoints([
x509.DistributionPoint(
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
relative_name=None,
reasons=None,
crl_issuer=None
)
]),
critical=False
)
# Name Constraints
constrained_name = x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032"),
])
name_constraints = x509.NameConstraints(
permitted_subtrees=[
x509.DirectoryName(constrained_name)
],
excluded_subtrees=None
)
builder = builder.add_extension(
name_constraints,
critical=True
)
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
return certificate, private_key
def generate_euicc_cert(self, curve_type, eum_cert, eum_key, key_hex):
"""Generate eUICC certificate signed by EUM."""
curve = self.get_curve(curve_type)
private_key = self.load_private_key_from_hex(key_hex, curve)
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032123451234512345678901235"),
x509.NameAttribute(NameOID.COMMON_NAME, "Test eUICC"),
])
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject)
builder = builder.issuer_name(eum_cert.subject)
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 48, 58))
builder = builder.not_valid_after(datetime(7496, 1, 24, 9, 48, 58))
builder = builder.serial_number(0x0200000000000001)
builder = builder.public_key(private_key.public_key())
builder = builder.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(eum_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
critical=False
)
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False
),
critical=True
)
builder = builder.add_extension(
x509.CertificatePolicies([
x509.PolicyInformation(
x509.ObjectIdentifier("2.23.146.1.2.1.1"), # eUICC policy
policy_qualifiers=None
)
]),
critical=True
)
certificate = builder.sign(eum_key, hashes.SHA256(), self.backend)
return certificate, private_key
def save_cert_and_key(self, cert, key, cert_path_der, cert_path_pem, key_path_sk, key_path_pk):
"""Save certificate and key in various formats."""
# Create directories if needed
os.makedirs(os.path.dirname(cert_path_der), exist_ok=True)
with open(cert_path_der, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.DER))
if cert_path_pem:
with open(cert_path_pem, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
if key and key_path_sk:
with open(key_path_sk, "wb") as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
if key and key_path_pk:
with open(key_path_pk, "wb") as f:
f.write(key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
def main():
gen = SimplifiedCertificateGenerator()
output_dir = "smdpp-data/generated"
os.makedirs(output_dir, exist_ok=True)
print("=== Generating CI Certificates ===")
for curve_type in ["BRP", "NIST"]:
ci_cert, ci_key = gen.generate_ci_cert(curve_type)
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
gen.save_cert_and_key(
ci_cert, ci_key,
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.der",
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.pem",
None, None
)
print(f"Generated CI {curve_type} certificate")
print("\n=== Generating DPauth Certificates ===")
dpauth_configs = [
("BRP", "TEST SM-DP+", 256, "93:fb:33:d0:58:4f:34:9b:07:f8:b5:d2:af:93:d7:c3:e3:54:b3:49:a3:b9:13:50:2e:6a:bc:07:0e:4d:49:29", OID_DP_RID, "DPauth"),
("NIST", "TEST SM-DP+", 256, "0a:7c:c1:c2:44:e6:0c:52:cd:5b:78:07:ab:8c:36:0c:26:52:46:01:50:7d:ca:bc:5d:d5:98:b5:a6:16:d5:d5", OID_DP_RID, "DPauth"),
("BRP", "TEST SM-DP+2", 512, "0c:17:35:5c:01:1d:0f:e8:d7:da:dd:63:f1:97:85:cf:6c:51:cb:cd:46:6a:e8:8b:e8:f8:1b:c1:05:88:46:f6", OID_DP2_RID, "DP2auth"),
("NIST", "TEST SM-DP+2", 512, "9c:32:a0:95:d4:88:42:d9:ff:a4:04:f7:12:51:2a:a2:c5:42:5a:1a:26:38:6a:b6:a1:45:d5:81:1e:03:91:41", OID_DP2_RID, "DP2auth"),
]
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dpauth_configs:
cert, key = gen.generate_dp_cert(
curve_type, cn, serial, key_hex,
OID_CERTIFICATE_POLICIES_AUTH, rid_oid,
datetime(2020, 4, 1, 8, 31, 30),
datetime(2030, 3, 30, 8, 31, 30)
)
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
gen.save_cert_and_key(
cert, key,
f"{output_dir}/DPauth/CERT_S_SM_{name_prefix}{suffix}.der",
None,
f"{output_dir}/DPauth/SK_S_SM_{name_prefix}{suffix}.pem",
f"{output_dir}/DPauth/PK_S_SM_{name_prefix}{suffix}.pem"
)
print(f"Generated {name_prefix} {curve_type} certificate")
print("\n=== Generating DPpb Certificates ===")
dppb_configs = [
("BRP", "TEST SM-DP+", 257, "75:ff:32:2f:41:66:16:da:e1:a4:84:ef:71:d4:87:4f:b0:df:32:95:fd:35:c2:cb:a4:89:fb:b2:bb:9c:7b:f6", OID_DP_RID, "DPpb"),
("NIST", "TEST SM-DP+", 257, "dc:d6:94:b7:78:95:7e:8e:9a:dd:bd:d9:44:33:e9:ef:8f:73:d1:1e:49:1c:48:d4:25:a3:8a:94:91:bd:3b:ed", OID_DP_RID, "DPpb"),
("BRP", "TEST SM-DP+2", 513, "9c:ae:2e:1a:56:07:a9:d5:78:38:2e:ee:93:2e:25:1f:52:30:4f:86:ee:b1:f1:70:8c:db:d3:c0:7b:e2:cd:3d", OID_DP2_RID, "DP2pb"),
("NIST", "TEST SM-DP+2", 513, "66:93:11:49:63:9d:ba:ac:1d:c3:d3:06:c5:8b:d2:df:d2:2f:73:bf:63:ac:86:31:98:32:90:b5:7f:90:93:45", OID_DP2_RID, "DP2pb"),
]
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dppb_configs:
cert, key = gen.generate_dp_cert(
curve_type, cn, serial, key_hex,
OID_CERTIFICATE_POLICIES_PB, rid_oid,
datetime(2020, 4, 1, 8, 34, 46),
datetime(2030, 3, 30, 8, 34, 46)
)
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
gen.save_cert_and_key(
cert, key,
f"{output_dir}/DPpb/CERT_S_SM_{name_prefix}{suffix}.der",
None,
f"{output_dir}/DPpb/SK_S_SM_{name_prefix}{suffix}.pem",
f"{output_dir}/DPpb/PK_S_SM_{name_prefix}{suffix}.pem"
)
print(f"Generated {name_prefix} {curve_type} certificate")
print("\n=== Generating DPtls Certificates ===")
dptls_configs = [
("BRP", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "3f:67:15:28:02:b3:f4:c7:fa:e6:79:58:55:f6:82:54:1e:45:e3:5e:ff:f4:e8:a0:55:65:a0:f1:91:2a:78:2e", OID_DP_RID, "DP_TLS_BRP"),
("NIST", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "a0:3e:7c:e4:55:04:74:be:a4:b7:a8:73:99:ce:5a:8c:9f:66:1b:68:0f:94:01:39:ff:f8:4e:9d:ec:6a:4d:8c", OID_DP_RID, "DP_TLS_NIST"),
("NIST", "testsmdpplus2.example.com", "testsmdpplus2.example.com", 12, "4e:65:61:c6:40:88:f6:69:90:7a:db:e3:94:b1:1a:84:24:2e:03:3a:82:a8:84:02:31:63:6d:c9:1b:4e:e3:f5", OID_DP2_RID, "DP2_TLS"),
("NIST", "testsmdpplus4.example.com", "testsmdpplus4.example.com", 14, "f2:65:9d:2f:52:8f:4b:11:37:40:d5:8a:0d:2a:f3:eb:2b:48:e1:22:c2:b6:0a:6a:f6:fc:96:ad:86:be:6f:a4", OID_DP4_RID, "DP4_TLS"),
("NIST", "testsmdpplus8.example.com", "testsmdpplus8.example.com", 18, "ff:6e:4a:50:9b:ad:db:38:10:88:31:c2:3c:cc:2d:44:30:7a:f2:81:e9:25:96:7f:8c:df:1d:95:54:a0:28:8d", OID_DP8_RID, "DP8_TLS"),
]
for curve_type, cn, dns, serial, key_hex, rid_oid, name_prefix in dptls_configs:
cert, key = gen.generate_tls_cert(
curve_type, cn, dns, serial, key_hex, rid_oid,
datetime(2024, 7, 9, 15, 29, 36),
datetime(2025, 8, 11, 15, 29, 36)
)
gen.save_cert_and_key(
cert, key,
f"{output_dir}/DPtls/CERT_S_SM_{name_prefix}.der",
None,
f"{output_dir}/DPtls/SK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem",
f"{output_dir}/DPtls/PK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem"
)
print(f"Generated {name_prefix} certificate")
print("\n=== Generating EUM Certificates ===")
eum_configs = [
("BRP", "12:9b:0a:b1:3f:17:e1:4a:40:b6:fa:4e:d8:23:e0:cf:46:5b:7b:3d:73:24:05:e6:29:5d:3b:23:b0:45:c9:9a"),
("NIST", "25:e6:75:77:28:e1:e9:51:13:51:9c:dc:34:55:5c:29:ba:ed:23:77:3a:c5:af:dd:dc:da:d9:84:89:8a:52:f0"),
]
eum_certs = {}
eum_keys = {}
for curve_type, key_hex in eum_configs:
cert, key = gen.generate_eum_cert(curve_type, key_hex)
eum_certs[curve_type] = cert
eum_keys[curve_type] = key
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
gen.save_cert_and_key(
cert, key,
f"{output_dir}/EUM/CERT_EUM{suffix}.der",
None,
f"{output_dir}/EUM/SK_EUM{suffix}.pem",
f"{output_dir}/EUM/PK_EUM{suffix}.pem"
)
print(f"Generated EUM {curve_type} certificate")
print("\n=== Generating eUICC Certificates ===")
euicc_configs = [
("BRP", "8d:c3:47:a7:6d:b7:bd:d6:22:2d:d7:5e:a1:a1:68:8a:ca:81:1e:4c:bc:6a:7f:6a:ef:a4:b2:64:19:62:0b:90"),
("NIST", "11:e1:54:67:dc:19:4f:33:71:83:e4:60:c9:f6:32:60:09:1e:12:e8:10:26:cd:65:61:e1:7c:6d:85:39:cc:9c"),
]
for curve_type, key_hex in euicc_configs:
cert, key = gen.generate_euicc_cert(curve_type, eum_certs[curve_type], eum_keys[curve_type], key_hex)
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
gen.save_cert_and_key(
cert, key,
f"{output_dir}/eUICC/CERT_EUICC{suffix}.der",
None,
f"{output_dir}/eUICC/SK_EUICC{suffix}.pem",
f"{output_dir}/eUICC/PK_EUICC{suffix}.pem"
)
print(f"Generated eUICC {curve_type} certificate")
print("\n=== Certificate generation complete! ===")
print(f"All certificates saved to: {output_dir}/")
if __name__ == "__main__":
main()

View File

@@ -82,10 +82,6 @@ case "$JOB_TYPE" in
pip install -r requirements.txt
# XXX: workaround for https://github.com/python-cmd2/cmd2/issues/1414
# 2.4.3 was the last stable release not affected by this bug (OS#6776)
pip install cmd2==2.4.3
rm -rf docs/_build
make -C "docs" html latexpdf

View File

@@ -56,7 +56,7 @@ parser_rpe.add_argument('--output-file', required=True, help='Output file name')
parser_rpe.add_argument('--identification', default=[], type=int, action='append', help='Remove PEs matching specified identification')
parser_rpe.add_argument('--type', default=[], action='append', help='Remove PEs matching specified type')
parser_rn = subparsers.add_parser('remove-naa', help='Remove speciifed NAAs from PE-Sequence')
parser_rn = subparsers.add_parser('remove-naa', help='Remove specified NAAs from PE-Sequence')
parser_rn.add_argument('--output-file', required=True, help='Output file name')
parser_rn.add_argument('--naa-type', required=True, choices=NAAs.keys(), help='Network Access Application type to remove')
# TODO: add an --naa-index or the like, so only one given instance can be removed
@@ -329,7 +329,7 @@ def do_info(pes: ProfileElementSequence, opts):
print("Security domain Instance AID: %s" % b2h(sd.decoded['instance']['instanceAID']))
# FIXME: 'applicationSpecificParametersC9' parsing to figure out enabled SCP
for key in sd.keys:
print("\tKVN=0x%02x, KID=0x%02x, %s" % (key.key_version_number, key.key_identifier, key.key_components))
print("\t%s" % repr(key))
# RFM
print()

View File

@@ -27,5 +27,5 @@ PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH add-app-i
# Display the contents of the resulting application PE:
PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH info --apps
# For an explaination of --uicc-toolkit-app-spec-pars, see:
# For an explanation of --uicc-toolkit-app-spec-pars, see:
# ETSI TS 102 226, section 8.2.1.3.2.2.1

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# A more useful verion of the 'unber' tool provided with asn1c:
# A more useful version of the 'unber' tool provided with asn1c:
# Give a hierarchical decode of BER/DER-encoded ASN.1 TLVs
import sys

View File

@@ -18,9 +18,17 @@ sys.path.insert(0, os.path.abspath('..'))
# -- Project information -----------------------------------------------------
project = 'osmopysim-usermanual'
copyright = '2009-2023 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
copyright = '2009-2025 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
# PDF: Avoid that the authors list exceeds the page by inserting '\and'
# manually as line break (https://github.com/sphinx-doc/sphinx/issues/6875)
latex_elements = {
"maketitle":
r"""\author{Sylvain Munaut, Harald Welte, Philipp Maier, \and Supreeth Herle, Merlin Chlosta}
\sphinxmaketitle
"""
}
# -- General configuration ---------------------------------------------------

View File

@@ -48,7 +48,6 @@ pySim consists of several parts:
sim-rest
suci-keytool
saip-tool
wsrc
Indices and tables

View File

@@ -49,7 +49,7 @@ Two modes are possible:
Ki and OPc will be generated during each programming cycle. This means fresh keys are generated, even when the
``--num`` remains unchanged.
The parameter ``--num`` specifies a card individual number. This number will be manged into the random seed so that
The parameter ``--num`` specifies a card individual number. This number will be managed into the random seed so that
it serves as an identifier for a particular set of randomly generated parameters.
In the example above the parameters ``--mcc``, and ``--mnc`` are specified as well, since they identify the GSM
@@ -77,7 +77,7 @@ the parameter ``--type``. The following card types are supported:
Specifying the card reader:
It is most common to use ``pySim-prog`` together whith a PCSC reader. The PCSC reader number is specified via the
It is most common to use ``pySim-prog`` together with a PCSC reader. The PCSC reader number is specified via the
``--pcsc-device`` or ``-p`` option. However, other reader types (such as serial readers and modems) are supported. Use
the ``--help`` option of ``pySim-prog`` for more information.

View File

@@ -21,9 +21,9 @@ osmo-smdpp currently
* [by default] uses test certificates copied from GSMA SGP.26 into `./smdpp-data/certs`, assuming that your
osmo-smdpp would be running at the host name `testsmdpplus1.example.com`. You can of course replace those
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with mathcing eUICCs.
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with matching eUICCs.
* doesn't understand profile state. Any profile can always be downloaded any number of times, irrespective
of the EID or whether it was donwloaded before. This is actually very useful for R&D and testing, as it
of the EID or whether it was downloaded before. This is actually very useful for R&D and testing, as it
doesn't require you to generate new profiles all the time. This logic of course is unsuitable for
production usage.
* doesn't perform any personalization, so the IMSI/ICCID etc. are always identical (the ones that are stored in
@@ -40,16 +40,21 @@ osmo-smdpp currently
Running osmo-smdpp
------------------
osmo-smdpp does not have built-in TLS support as the used *twisted* framework appears to have
problems when using the example elliptic curve certificates (both NIST and Brainpool) from GSMA.
osmo-smdpp comes with built-in TLS support which is enabled by default. However, it is always possible to
disable the built-in TLS support if needed.
In order to use osmo-smdpp without the built-in TLS support, it has to be put behind a TLS reverse proxy,
which terminates the ES9+ HTTPS traffic from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
NOTE: The built in TLS support in osmo-smdpp makes use of the python *twisted* framework. Older versions
of this framework appear to have problems when using the example elliptic curve certificates (both NIST and
Brainpool) from GSMA.
So in order to use it, you have to put it behind a TLS reverse proxy, which terminates the ES9+
HTTPS from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
nginx as TLS proxy
~~~~~~~~~~~~~~~~~~
If you use `nginx` as web server, you can use the following configuration snippet::
If you chose to use `nginx` as TLS reverse proxy, you can use the following configuration snippet::
upstream smdpp {
server localhost:8000;
@@ -92,32 +97,43 @@ The `smdpp-data/upp` directory contains the UPP (Unprotected Profile Package) us
commandline options
~~~~~~~~~~~~~~~~~~~
Typically, you just run it without any arguments, and it will bind its plain-HTTP ES9+ interface to
`localhost` TCP port 8000.
Typically, you just run osmo-smdpp without any arguments, and it will bind its built-in HTTPS ES9+ interface to
`localhost` TCP port 443. In this case an external TLS reverse proxy is not needed.
osmo-smdpp currently doesn't have any configuration file.
There are command line options for binding:
Bind the HTTP ES9+ to a port other than 8000::
Bind the HTTPS ES9+ to a port other than 443::
./osmo-smdpp.py -p 8001
./osmo-smdpp.py -p 8443
Disable the built-in TLS support and bind the plain-HTTP ES9+ to a port 8000::
./osmo-smdpp.py -p 8000 --nossl
Bind the HTTP ES9+ to a different local interface::
./osmo-smdpp.py -H 127.0.0.1
./osmo-smdpp.py -H 127.0.0.2
DNS setup for your LPA
~~~~~~~~~~~~~~~~~~~~~~
The LPA must resolve `testsmdpplus1.example.com` to the IP address of your TLS proxy.
It must also accept the TLS certificates used by your TLS proxy.
It must also accept the TLS certificates used by your TLS proxy. In case osmo-smdpp is used with built-in TLS support,
it will use the certificates provided in smdpp-data.
NOTE: The HTTPS ES9+ interface cannot be addressed by the LPA directly via its IP address. The reason for this is that
the included SGP.26 (DPtls) test certificates explicitly restrict the hostname to `testsmdpplus1.example.com` in the
`X509v3 Subject Alternative Name` extension. Using a bare IP address as hostname may cause the certificate to be
rejected by the LPA.
Supported eUICC
~~~~~~~~~~~~~~~
If you run osmo-smdpp with the included SGP.26 certificates, you must use an eUICC with matching SGP.26
If you run osmo-smdpp with the included SGP.26 (DPauth, DPpb) certificates, you must use an eUICC with matching SGP.26
certificates, i.e. the EUM certificate must be signed by a SGP.26 test root CA and the eUICC certificate
in turn must be signed by that SGP.26 EUM certificate.

View File

@@ -75,7 +75,7 @@ The response body is a JSON document, either
#. key freshness failure
#. unspecified card error
Example (succcess):
Example (success):
::
{

View File

@@ -17,7 +17,7 @@ In any case, in order to operate a SUCI-enabled 5G SA network, you will have to
#. deploy the public key on your USIMs
#. deploy the private key on your 5GC, specifically the UDM function
pysim contains (int its `contrib` directory) a small utility program that can make it easy to generate
pysim contains (in its `contrib` directory) a small utility program that can make it easy to generate
such keys: `suci-keytool.py`
Generating keys

View File

@@ -30,7 +30,7 @@ This guide covers the basic workflow of provisioning SIM cards with the 5G SUCI
For specific information on sysmocom SIM cards, refer to
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the curent
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the current
sysmoISIM-SJA5 product
* the `sysmoISIM-SJA2 User Manual <https://sysmocom.de/manuals/sysmousim-manual.pdf>`__ for the older
sysmoISIM-SJA2 product

View File

@@ -1,31 +0,0 @@
WebSocket Remote Card (WSRC)
============================
WSRC (*Web Socket Remote Card*) is a mechanism by which card readers can be made remotely available
via a computer network. The transport mechanism is (as the name implies) a WebSocket. This transport
method was chosen to be as firewall/NAT friendly as possible.
WSRC Network Architecture
-------------------------
In a WSRC network, there are three major elements:
* The **WSRC Card Client** which exposes a locally attached smart card (usually via a Smart Card Reader)
to a remote *WSRC Server*
* The **WSRC Server** manges incoming connections from both *WSRC Card Clients* as well as *WSRC User Clients*
* The **WSRC User Client** is a user application, like for example pySim-shell, which is accessing a remote
card by connecting to the *WSRC Server* which relays the information to the selected *WSRC Card Client*
WSRC Protocol
-------------
The WSRC protocl consits of JSON objects being sent over a websocket. The websocket communication itself
is based on HTTP and should usually operate via TLS for security reasons.
The detailed protocol is currently still WIP. The plan is to document it here.
pySim implementations
---------------------
TBD

View File

@@ -17,28 +17,124 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import sys
import argparse
import uuid
import os
import functools
from typing import Optional, Dict, List
from pprint import pprint as pp
import base64
from base64 import b64decode
from klein import Klein
from twisted.web.iweb import IRequest
# asn1tools issue https://github.com/eerimoq/asn1tools/issues/194
# must be first here
import asn1tools
import asn1tools.codecs.ber
import asn1tools.codecs.der
# do not move the code
def fix_asn1_oid_decoding():
fix_asn1_schema = """
TestModule DEFINITIONS ::= BEGIN
TestOid ::= SEQUENCE {
oid OBJECT IDENTIFIER
}
END
"""
from osmocom.utils import h2b, b2h, swap_nibbles
fix_asn1_asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
fix_asn1_oid_string = '2.999.10'
fix_asn1_encoded = fix_asn1_asn1.encode('TestOid', {'oid': fix_asn1_oid_string})
fix_asn1_decoded = fix_asn1_asn1.decode('TestOid', fix_asn1_encoded)
import pySim.esim.rsp as rsp
from pySim.esim import saip, PMO
from pySim.esim.es8p import *
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
if (fix_asn1_decoded['oid'] != fix_asn1_oid_string):
# ASN.1 OBJECT IDENTIFIER Decoding Issue:
#
# In ASN.1 BER/DER encoding, the first two arcs of an OBJECT IDENTIFIER are
# combined into a single value: (40 * arc0) + arc1. This is encoded as a base-128
# variable-length quantity (and commonly known as VLQ or base-128 encoding)
# as specified in ITU-T X.690 §8.19, it can span multiple bytes if
# the value is large.
#
# For arc0 = 0 or 1, arc1 must be in [0, 39]. For arc0 = 2, arc1 can be any non-negative integer.
# All subsequent arcs (arc2, arc3, ...) are each encoded as a separate base-128 VLQ.
#
# The decoding bug occurs when the decoder does not properly split the first
# subidentifier for arc0 = 2 and arc1 >= 40. Instead of decoding:
# - arc0 = 2
# - arc1 = (first_subidentifier - 80)
# it may incorrectly interpret the first_subidentifier as arc0 = (first_subidentifier // 40),
# arc1 = (first_subidentifier % 40), which is only valid for arc1 < 40.
#
# This patch handles it properly for all valid OBJECT IDENTIFIERs
# with large second arcs, by applying the ASN.1 rules:
# - if first_subidentifier < 40: arc0 = 0, arc1 = first_subidentifier
# - elif first_subidentifier < 80: arc0 = 1, arc1 = first_subidentifier - 40
# - else: arc0 = 2, arc1 = first_subidentifier - 80
#
# This problem is not uncommon, see for example https://github.com/randombit/botan/issues/4023
def fixed_decode_object_identifier(data, offset, end_offset):
"""Decode ASN.1 OBJECT IDENTIFIER from bytes to dotted string, fixing large second arc handling."""
def read_subidentifier(data, offset):
value = 0
while True:
b = data[offset]
value = (value << 7) | (b & 0x7F)
offset += 1
if not (b & 0x80):
break
return value, offset
subid, offset = read_subidentifier(data, offset)
if subid < 40:
first = 0
second = subid
elif subid < 80:
first = 1
second = subid - 40
else:
first = 2
second = subid - 80
arcs = [first, second]
while offset < end_offset:
subid, offset = read_subidentifier(data, offset)
arcs.append(subid)
return '.'.join(str(x) for x in arcs)
asn1tools.codecs.ber.decode_object_identifier = fixed_decode_object_identifier
asn1tools.codecs.der.decode_object_identifier = fixed_decode_object_identifier
# test our patch
asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
decoded = asn1.decode('TestOid', fix_asn1_encoded)['oid']
assert fix_asn1_oid_string == str(decoded)
fix_asn1_oid_decoding()
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature # noqa: E402
from cryptography import x509 # noqa: E402
from cryptography.exceptions import InvalidSignature # noqa: E402
from cryptography.hazmat.primitives import hashes # noqa: E402
from cryptography.hazmat.primitives.asymmetric import ec, dh # noqa: E402
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption, ParameterFormat # noqa: E402
from pathlib import Path # noqa: E402
import json # noqa: E402
import sys # noqa: E402
import argparse # noqa: E402
import uuid # noqa: E402
import os # noqa: E402
import functools # noqa: E402
from typing import Optional, Dict, List # noqa: E402
from pprint import pprint as pp # noqa: E402
import base64 # noqa: E402
from base64 import b64decode # noqa: E402
from klein import Klein # noqa: E402
from twisted.web.iweb import IRequest # noqa: E402
from osmocom.utils import h2b, b2h, swap_nibbles # noqa: E402
import pySim.esim.rsp as rsp # noqa: E402
from pySim.esim import saip, PMO # noqa: E402
from pySim.esim.es8p import ProfileMetadata,UnprotectedProfilePackage,ProtectedProfilePackage,BoundProfilePackage,BspInstance # noqa: E402
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id # noqa: E402
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError # noqa: E402
import logging # noqa: E402
logger = logging.getLogger(__name__)
# HACK: make this configurable
DATA_DIR = './smdpp-data'
@@ -54,6 +150,173 @@ def set_headers(request: IRequest):
request.setHeader('Content-Type', 'application/json;charset=UTF-8')
request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
def validate_request_headers(request: IRequest):
"""Validate mandatory HTTP headers according to SGP.22."""
content_type = request.getHeader('Content-Type')
if not content_type or not content_type.startswith('application/json'):
raise ApiError('1.2.1', '2.1', 'Invalid Content-Type header')
admin_protocol = request.getHeader('X-Admin-Protocol')
if admin_protocol and not admin_protocol.startswith('gsma/rsp/v'):
raise ApiError('1.2.2', '2.1', 'Unsupported X-Admin-Protocol version')
def get_eum_certificate_variant(eum_cert) -> str:
"""Determine EUM certificate variant by checking Certificate Policies extension.
Returns 'O' for old variant, or 'NEW' for Ov3/A/B/C variants."""
try:
cert_policies_ext = eum_cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.CERTIFICATE_POLICIES
)
for policy in cert_policies_ext.value:
policy_oid = policy.policy_identifier.dotted_string
logger.debug(f"Found certificate policy: {policy_oid}")
if policy_oid == '2.23.146.1.2.1.2':
logger.debug("Detected EUM certificate variant: O (old)")
return 'O'
elif policy_oid == '2.23.146.1.2.1.0.0.0':
logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)")
return 'NEW'
except x509.ExtensionNotFound:
logger.debug("No Certificate Policies extension found")
except Exception as e:
logger.debug(f"Error checking certificate policies: {e}")
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
"""Extract permitted IINs from EUM certificate using the appropriate method
based on certificate variant (O vs Ov3/A/B/C).
Returns list of permitted IINs (basically prefixes that valid EIDs must start with)."""
# Determine certificate variant first
cert_variant = get_eum_certificate_variant(eum_cert)
permitted_iins = []
if cert_variant == 'O':
# Old variant - use nameConstraints extension
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
else:
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
unique_iins = list(set(permitted_iins))
logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}")
return unique_iins
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
"""Parse the GSMA permittedEins extension using correct ASN.1 structure.
PermittedEins ::= SEQUENCE OF PrintableString
Each string contains an IIN (Issuer Identification Number) - a prefix of valid EIDs."""
permitted_iins = []
try:
permitted_eins_oid = x509.ObjectIdentifier('2.23.146.1.2.2.0') # sgp26: 2.23.146.1.2.2.0 = ASN1:SEQUENCE:permittedEins
for ext in eum_cert.extensions:
if ext.oid == permitted_eins_oid:
logger.debug(f"Found GSMA permittedEins extension: {ext.oid}")
# Get the DER-encoded extension value
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
if isinstance(ext_der, bytes):
try:
permitted_eins_schema = """
PermittedEins DEFINITIONS ::= BEGIN
PermittedEins ::= SEQUENCE OF PrintableString
END
"""
decoder = asn1tools.compile_string(permitted_eins_schema)
decoded_strings = decoder.decode('PermittedEins', ext_der)
for iin_string in decoded_strings:
# Each string contains an IIN -> prefix of euicc EID
iin_clean = iin_string.strip().upper()
# IINs is 8 chars per sgp22, var len according to sgp29, fortunately we don't care
if (len(iin_clean) == 8 and
all(c in '0123456789ABCDEF' for c in iin_clean) and
len(iin_clean) % 2 == 0):
permitted_iins.append(iin_clean)
logger.debug(f"Found permitted IIN (GSMA): {iin_clean}")
else:
logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
except Exception as e:
logger.debug(f"Error parsing GSMA permittedEins extension: {e}")
except Exception as e:
logger.debug(f"Error accessing GSMA certificate extensions: {e}")
return permitted_iins
def _parse_name_constraints_eins(eum_cert) -> List[str]:
"""Parse permitted IINs from nameConstraints extension (variant O)."""
permitted_iins = []
try:
# Look for nameConstraints extension
name_constraints_ext = eum_cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.NAME_CONSTRAINTS
)
name_constraints = name_constraints_ext.value
# Check permittedSubtrees for IIN constraints
if name_constraints.permitted_subtrees:
for subtree in name_constraints.permitted_subtrees:
if isinstance(subtree, x509.DirectoryName):
for attribute in subtree.value:
# IINs for O in serialNumber
if attribute.oid == x509.oid.NameOID.SERIAL_NUMBER:
serial_value = attribute.value.upper()
# sgp22 8, sgp29 var len, fortunately we don't care
if (len(serial_value) == 8 and
all(c in '0123456789ABCDEF' for c in serial_value) and
len(serial_value) % 2 == 0):
permitted_iins.append(serial_value)
logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
except x509.ExtensionNotFound:
logger.debug("No nameConstraints extension found")
except Exception as e:
logger.debug(f"Error parsing nameConstraints: {e}")
return permitted_iins
def validate_eid_range(eid: str, eum_cert) -> bool:
"""Validate that EID is within the permitted EINs of the EUM certificate."""
if not eid or len(eid) != 32:
logger.debug(f"Invalid EID format: {eid}")
return False
try:
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
if not permitted_eins:
logger.debug("Warning: No permitted EINs found in EUM certificate")
return False
eid_normalized = eid.upper()
logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
for permitted_ein in permitted_eins:
if eid_normalized.startswith(permitted_ein):
logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
return True
logger.debug(f"EID {eid_normalized} is not in any permitted EIN list")
return False
except Exception as e:
logger.debug(f"Error validating EID: {e}")
return False
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
r = {'subjectCode': subject_code, 'reasonCode': reason_code }
if subject_id:
@@ -72,12 +335,6 @@ def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_da
if status_code_data:
js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
from cryptography import x509
def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
"""convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
@@ -125,22 +382,36 @@ class SmDppHttpServer:
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
"""Find CI certificate for given key identifier."""
for cert in self.ci_certs:
print("cert: %s" % cert)
logger.debug("cert: %s" % cert)
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
print(subject_exts)
logger.debug(subject_exts)
subject_pkid = subject_exts[0].value
print(subject_pkid)
logger.debug(subject_pkid)
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
return cert
return None
def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
def validate_certificate_chain_for_verification(self, euicc_ci_pkid_list: List[bytes]) -> bool:
"""Validate that SM-DP+ has valid certificate chains for the given CI PKIDs."""
for ci_pkid in euicc_ci_pkid_list:
ci_cert = self.ci_get_cert_for_pkid(ci_pkid)
if ci_cert:
# Check if our DPauth certificate chains to this CI
try:
cs = CertificateSet(ci_cert)
cs.verify_cert_chain(self.dp_auth.cert)
return True
except VerifyError:
continue
return False
def __init__(self, server_hostname: str, ci_certs_path: str, common_cert_path: str, use_brainpool: bool = False, in_memory: bool = False):
self.server_hostname = server_hostname
self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
self.ci_certs = self.load_certs_from_path(ci_certs_path)
# load DPauth cert + key
self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2)
cert_dir = os.path.join(DATA_DIR, 'certs')
cert_dir = common_cert_path
if use_brainpool:
self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_BRP.der'))
self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_BRP.pem'))
@@ -155,7 +426,15 @@ class SmDppHttpServer:
else:
self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
if in_memory:
self.rss = rsp.RspSessionStore(in_memory=True)
logger.info("Using in-memory session storage")
else:
# Use different session database files for BRP and NIST to avoid file locking during concurrent runs
session_db_suffix = "BRP" if use_brainpool else "NIST"
db_path = os.path.join(DATA_DIR, f"sm-dp-sessions-{session_db_suffix}")
self.rss = rsp.RspSessionStore(filename=db_path, in_memory=False)
logger.info(f"Using file-based session storage: {db_path}")
@app.handle_errors(ApiError)
def handle_apierror(self, request: IRequest, failure):
@@ -179,11 +458,10 @@ class SmDppHttpServer:
functionality, such as JSON decoding/encoding and debug-printing."""
@functools.wraps(func)
def _api_wrapper(self, request: IRequest):
# TODO: evaluate User-Agent + X-Admin-Protocol header
# TODO: reject any non-JSON Content-type
validate_request_headers(request)
content = json.loads(request.content.read())
print("Rx JSON: %s" % json.dumps(content))
logger.debug("Rx JSON: %s" % json.dumps(content))
set_headers(request)
output = func(self, request, content)
@@ -191,7 +469,7 @@ class SmDppHttpServer:
return ''
build_resp_header(output)
print("Tx JSON: %s" % json.dumps(output))
logger.debug("Tx JSON: %s" % json.dumps(output))
return json.dumps(output)
return _api_wrapper
@@ -210,7 +488,7 @@ class SmDppHttpServer:
euiccInfo1_bin = b64decode(content['euiccInfo1'])
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
print("Rx euiccInfo1: %s" % euiccInfo1)
logger.debug("Rx euiccInfo1: %s" % euiccInfo1)
#euiccInfo1['svn']
# TODO: If euiccCiPKIdListForSigningV3 is present ...
@@ -218,6 +496,12 @@ class SmDppHttpServer:
pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
# Validate that SM-DP+ supports certificate chains for verification
verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', [])
if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list):
raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA Certificate with a Public Key supported by the eUICC')
# verify it supports one of the keys indicated by euiccCiPKIdListForSigning
ci_cert = None
for x in pkid_list:
@@ -232,13 +516,6 @@ class SmDppHttpServer:
if not ci_cert:
raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
# TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
# * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
# supported by the eUICC (indicated by euiccCiPKIdListForVerification).
# * Using a certificate chain that the eUICC and the LPA both support:
#euiccInfo1['euiccCiPKIdListForVerification']
# raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
# Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
# SHALL be unique within the scope and lifetime of each SM-DP+.
transactionId = uuid.uuid4().hex.upper()
@@ -254,9 +531,9 @@ class SmDppHttpServer:
'serverAddress': self.server_hostname,
'serverChallenge': serverChallenge,
}
print("Tx serverSigned1: %s" % serverSigned1)
logger.debug("Tx serverSigned1: %s" % serverSigned1)
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
logger.debug("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
output = {}
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
@@ -285,7 +562,7 @@ class SmDppHttpServer:
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
print("Rx %s: %s" % authenticateServerResp)
logger.debug("Rx %s: %s" % authenticateServerResp)
if authenticateServerResp[0] == 'authenticateResponseError':
r_err = authenticateServerResp[1]
#r_err['transactionId']
@@ -336,10 +613,12 @@ class SmDppHttpServer:
if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
# TODO: verify EID of eUICC cert is within permitted range of EUM cert
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
print("EID (from eUICC cert): %s" % ss.eid)
logger.debug("EID (from eUICC cert): %s" % ss.eid)
# Verify EID is within permitted range of EUM certificate
if not validate_eid_range(ss.eid, eum_cert):
logger.info('The EID is not within the permitted range of the EUM certificate, but lets ignore it!')
# Verify that the serverChallenge attached to the ongoing RSP session matches the
# serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -
@@ -350,6 +629,7 @@ class SmDppHttpServer:
# If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...]
# TODO: We really do a very simplistic job here, this needs to be properly implemented later,
# considering all the various cases, profile state, etc.
iccid_str = None
if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication':
cpca = euiccSigned1['ctxParams1'][1]
matchingId = cpca.get('matchingId', None)
@@ -372,7 +652,7 @@ class SmDppHttpServer:
# there's currently no other option in the ctxParams1 choice, so this cannot happen
raise ApiError('1.3.1', '2.2', 'ctxParams1 missing mandatory ctxParamsForCommonAuthentication')
# FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
# FIXME: we actually want to perform the profile binding herr, and read the profile metadata from the profile
# Put together profileMetadata + _bin
ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId)
@@ -414,7 +694,7 @@ class SmDppHttpServer:
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
print("Rx %s: %s" % prepDownloadResp)
logger.debug("Rx %s: %s" % prepDownloadResp)
if prepDownloadResp[0] == 'downloadResponseError':
r_err = prepDownloadResp[1]
@@ -436,37 +716,37 @@ class SmDppHttpServer:
# store otPK.EUICC.ECKA in session state
ss.euicc_otpk = euiccSigned2['euiccOtpk']
print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
logger.debug("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
# Reference value of CERT.DPpb.ECDDSA
print("curve = %s" % self.dp_pb.get_curve())
logger.debug("curve = %s" % self.dp_pb.get_curve())
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
# extract the public key in (hopefully) the right format for the ES8+ interface
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk))
logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
ss.host_id = b'mahlzeit'
# Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
print("shared_secret: %s" % b2h(ss.shared_secret))
logger.debug("shared_secret: %s" % b2h(ss.shared_secret))
# TODO: Check if this order requires a Confirmation Code verification
# Perform actual protection + binding of profile package (or return pre-bound one)
with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f:
upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
# HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
# HACK: Use empty PPP as we're still debugging the configureISDP step, and we want to avoid
# cluttering the log with stuff happening after the failure
#upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
if False:
# Use random keys
bpp = BoundProfilePackage.from_upp(upp)
else:
# Use sesssion keys
# Use session keys
ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
bpp = BoundProfilePackage.from_ppp(ppp)
@@ -486,22 +766,22 @@ class SmDppHttpServer:
request.setResponseCode(204)
pendingNotification_bin = b64decode(content['pendingNotification'])
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
print("Rx %s: %s" % pendingNotification)
logger.debug("Rx %s: %s" % pendingNotification)
if pendingNotification[0] == 'profileInstallationResult':
profileInstallRes = pendingNotification[1]
pird = profileInstallRes['profileInstallationResultData']
transactionId = b2h(pird['transactionId'])
ss = self.rss.get(transactionId, None)
if ss is None:
print("Unable to find session for transactionId")
return
logger.warning(f"Unable to find session for transactionId: {transactionId}")
return None # Will return HTTP 204 with empty body
profileInstallRes['euiccSignPIR']
# TODO: use original data, don't re-encode?
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
# verify eUICC signature
if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
raise Exception('ECDSA signature verification failed on notification')
print("Profile Installation Final Result: ", pird['finalResult'])
logger.debug("Profile Installation Final Result: %s", pird['finalResult'])
# remove session state
del self.rss[transactionId]
elif pendingNotification[0] == 'otherSignedNotification':
@@ -526,7 +806,7 @@ class SmDppHttpServer:
iccid = other_notif.get('iccid', None)
if iccid:
iccid = swap_nibbles(b2h(iccid))
print("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
logger.debug("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
else:
raise ValueError(pendingNotification)
@@ -539,7 +819,7 @@ class SmDppHttpServer:
@rsp_api_wrapper
def cancelSession(self, request: IRequest, content: dict) -> dict:
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
print("Rx JSON: %s" % content)
logger.debug("Rx JSON: %s" % content)
transactionId = content['transactionId']
# Verify that the received transactionId is known and relates to an ongoing RSP session
@@ -549,7 +829,7 @@ class SmDppHttpServer:
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
print("Rx %s: %s" % cancelSessionResponse)
logger.debug("Rx %s: %s" % cancelSessionResponse)
if cancelSessionResponse[0] == 'cancelSessionResponseError':
# FIXME: print some error
@@ -581,15 +861,53 @@ class SmDppHttpServer:
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
#parser.add_argument("-v", "--verbose", help="increase output verbosity", action='count', default=0)
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP(S) to", default="localhost")
parser.add_argument("-p", "--port", help="TCP port to bind HTTP(S) to", default=443)
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
parser.add_argument("-s", "--nossl", help="disable built in SSL/TLS support", action='store_true', default=False)
parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False)
parser.add_argument("-b", "--brainpool", help="Use Brainpool curves instead of NIST",
action='store_true', default=False)
parser.add_argument("-m", "--in-memory", help="Use ephermal in-memory session storage (for concurrent runs)",
action='store_true', default=False)
args = parser.parse_args()
hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=False)
#hs.app.run(endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
hs.app.run(args.host, args.port)
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
common_cert_path = os.path.join(DATA_DIR, args.certdir)
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool)
if(args.nossl):
hs.app.run(args.host, args.port)
else:
curve_type = 'BRP' if args.brainpool else 'NIST'
cert_derpath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.der'
cert_pempath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.pem'
cert_skpath = Path(common_cert_path) / 'DPtls' / f'SK_S_SM_DP_TLS_{curve_type}.pem'
dhparam_path = Path(common_cert_path) / "dhparam2048.pem"
if not dhparam_path.exists():
print("Generating dh params, this takes a few seconds..")
# Generate DH parameters with 2048-bit key size and generator 2
parameters = dh.generate_parameters(generator=2, key_size=2048)
pem_data = parameters.parameter_bytes(encoding=Encoding.PEM,format=ParameterFormat.PKCS3)
with open(dhparam_path, 'wb') as file:
file.write(pem_data)
print("DH params created successfully")
if not cert_pempath.exists():
print("Translating tls server cert from DER to PEM..")
with open(cert_derpath, 'rb') as der_file:
der_cert_data = der_file.read()
cert = x509.load_der_x509_certificate(der_cert_data)
pem_cert = cert.public_bytes(Encoding.PEM) #.decode('utf-8')
with open(cert_pempath, 'wb') as pem_file:
pem_file.write(pem_cert)
SERVER_STRING = f'ssl:{args.port}:privateKey={cert_skpath}:certKey={cert_pempath}:dhParameters={dhparam_path}'
print(SERVER_STRING)
hs.app.run(host=HOSTNAME, port=args.port, endpoint_description=SERVER_STRING)
if __name__ == "__main__":
main(sys.argv)

View File

@@ -1,154 +0,0 @@
#!/usr/bin/python3
from pySim.ota import *
from pySim.sms import SMS_SUBMIT, SMS_DELIVER, AddressField
from pySim.utils import h2b, h2b
# pre-defined SPI values for use in test cases below
SPI_CC_POR_CIPHERED_CC = {
'counter':'no_counter',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':True,
'por_rc_cc_ds': 'cc',
'por': 'por_required'
}
SPI_CC_POR_UNCIPHERED_CC = {
'counter':'no_counter',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'cc',
'por': 'por_required'
}
SPI_CC_POR_UNCIPHERED_NOCC = {
'counter':'no_counter',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'no_rc_cc_ds',
'por': 'por_required'
}
# SJA5 SAMPLE cards provisioned by execute_ipr.py
OTA_KEYSET_SJA5_SAMPLES = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3,
algo_auth='triple_des_cbc2', kid_idx=3,
kic=h2b('300102030405060708090a0b0c0d0e0f'),
kid=h2b('301102030405060708090a0b0c0d0e0f'))
OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
algo_auth='aes_cmac', kid_idx=2,
kic=h2b('200102030405060708090a0b0c0d0e0f'),
kid=h2b('201102030405060708090a0b0c0d0e0f'))
# TS.48 profile on sysmoEUICC1-C2G
OTA_KEYSET_C2G_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
algo_auth='aes_cmac', kid_idx=2,
kic=h2b('66778899AABBCCDD1122334455EEFF10'),
kid=h2b('112233445566778899AABBCCDDEEFF10'))
# ISD-R on sysmoEUICC1-C2G
OTA_KEYSET_C2G_AES128_ISDR = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('B52F9C5938D1C19ED73E1AE772937FD7'),
kid=h2b('3BC696ACD1EEC95A6624F7330D22FC81'))
# ISD-A on sysmoEUICC1-C2G
OTA_KEYSET_C2G_AES128_ISDA = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('8DAAD1DAAA8D7C9000E3BBED8B7556E7'),
kid=h2b('5392D503AE050DDEAF81AFAEFF275A2B'))
# TODO: AES192
# TODO: AES256
testcases = [
{
'name': '3DES-SJA5-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_CIPHERED_CC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
},
'response': {
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
}
}, {
'name': '3DES-SJA5-UNCIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_CC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
},
'response': {
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
}
}, {
'name': '3DES-SJA5-UNCIPHERED-NOCC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
},
'response': {
'encoded_resp': '027100000e0ab0001100000000000000016132',
}
}, {
'name': 'AES128-C2G-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_C2G_AES128_ISDR,
'spi': SPI_CC_POR_CIPHERED_CC,
'request': {
#'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'apdu': h2b('80ec800300'),
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
},
'response': {
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
}
}
]
for t in testcases:
print()
print("==== TESTCASE: %s" % t['name'])
od = t['ota_keyset']
# RAM: B00000
# SIM RFM: B00010
# USIM RFM: B00011
# ISD-R: 000001
# ECASD: 000002
tar = h2b('000001')
dialect = OtaDialectSms()
outp = dialect.encode_cmd(od, tar, t['spi'], apdu=t['request']['apdu'])
print("result: %s" % b2h(outp))
#assert(b2h(outp) == t['request']['encoded_cmd'])
with_udh = b'\x02\x70\x00' + outp
print("with_udh: %s" % b2h(with_udh))
# processing of the response from the card
da = AddressField('12345678', 'unknown', 'isdn_e164')
#tpdu = SMS_SUBMIT(tp_udhi=True, tp_mr=0x23, tp_da=da, tp_pid=0x7F, tp_dcs=0xF6, tp_udl=3, tp_ud=with_udh)
tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=da, tp_pid=0x7F, tp_dcs=0xF6, tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
print("TPDU: %s" % tpdu)
print("tpdu: %s" % b2h(tpdu.to_bytes()))
#assert(b2h(tpdu.to_bytes()) == t['request']['encoded_tpdu'])
r = dialect.decode_resp(od, t['spi'], t['response']['encoded_resp'])
print("RESP: ", r)

View File

@@ -586,7 +586,7 @@ def read_params_csv(opts, imsi=None, iccid=None):
else:
row['mnc'] = row.get('mnc', mnc_from_imsi(row.get('imsi'), False))
# NOTE: We might concider to specify a new CSV field "mnclen" in our
# NOTE: We might consider to specify a new CSV field "mnclen" in our
# CSV files for a better automatization. However, this only makes sense
# when the tools and databases we export our files from will also add
# such a field.

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
#
# Utility to display some informations about a SIM card
# Utility to display some information about a SIM card
#
#
# Copyright (C) 2009 Sylvain Munaut <tnt@246tNt.com>

View File

@@ -22,19 +22,25 @@ from typing import List, Optional
import json
import traceback
import re
import cmd2
from packaging import version
from cmd2 import style
import logging
from pySim.log import PySimLogger
from osmocom.utils import auto_uint8
# cmd2 >= 2.3.0 has deprecated the bg/fg in favor of Bg/Fg :(
if version.parse(cmd2.__version__) < version.parse("2.3.0"):
from cmd2 import fg, bg # pylint: disable=no-name-in-module
RED = fg.red
YELLOW = fg.yellow
LIGHT_RED = fg.bright_red
LIGHT_GREEN = fg.bright_green
else:
from cmd2 import Fg, Bg # pylint: disable=no-name-in-module
RED = Fg.RED
YELLOW = Fg.YELLOW
LIGHT_RED = Fg.LIGHT_RED
LIGHT_GREEN = Fg.LIGHT_GREEN
from cmd2 import CommandSet, with_default_category, with_argparser
@@ -63,10 +69,12 @@ from pySim.ts_102_222 import Ts102222Commands
from pySim.gsm_r import DF_EIRENE
from pySim.cat import ProactiveCommand
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
from pySim.card_key_provider import CardKeyProviderCsv
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
from pySim.app import init_card
log = PySimLogger.get("main")
class Cmd2Compat(cmd2.Cmd):
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
@@ -92,15 +100,19 @@ class PysimApp(Cmd2Compat):
(C) 2021-2023 by Harald Welte, sysmocom - s.f.m.c. GmbH and contributors
Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/shell.html """
def __init__(self, card, rs, sl, ch, script=None):
def __init__(self, verbose, card, rs, sl, ch, script=None):
if version.parse(cmd2.__version__) < version.parse("2.0.0"):
kwargs = {'use_ipython': True}
else:
kwargs = {'include_ipy': True}
self.verbose = verbose
self._onchange_verbose('verbose', False, self.verbose);
# pylint: disable=unexpected-keyword-arg
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
auto_load_commands=False, startup_script=script, **kwargs)
PySimLogger.setup(self.poutput, {logging.WARN: YELLOW})
self.intro = style(self.BANNER, fg=RED)
self.default_category = 'pySim-shell built-in commands'
self.card = None
@@ -126,6 +138,9 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
self.add_settable(Settable2Compat('apdu_strict', bool,
'Enforce APDU responses according to ISO/IEC 7816-3, table 12', self,
onchange_cb=self._onchange_apdu_strict))
self.add_settable(Settable2Compat('verbose', bool,
'Enable/disable verbose logging', self,
onchange_cb=self._onchange_verbose))
self.equip(card, rs)
def equip(self, card, rs):
@@ -210,6 +225,13 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
else:
self.card._scc._tp.apdu_strict = False
def _onchange_verbose(self, param_name, old, new):
PySimLogger.set_verbose(new)
if new == True:
PySimLogger.set_level(logging.DEBUG)
else:
PySimLogger.set_level(logging.INFO)
class Cmd2ApduTracer(ApduTracer):
def __init__(self, cmd2_app):
self.cmd2 = cmd2_app
@@ -265,7 +287,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
def do_apdu(self, opts):
"""Send a raw APDU to the card, and print SW + Response.
CAUTION: this command bypasses the logical channel handling of pySim-shell and card state changes are not
tracked. Dpending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
tracked. Depending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
a different file."""
# When sending raw APDUs we access the scc object through _scc member of the card object. It should also be
@@ -336,7 +358,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
def _process_card(self, first, script_path):
# Early phase of card initialzation (this part may fail with an exception)
# Early phase of card initialization (this part may fail with an exception)
try:
rs, card = init_card(self.sl)
rc = self.equip(card, rs)
@@ -377,7 +399,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
bulk_script_parser = argparse.ArgumentParser()
bulk_script_parser.add_argument('SCRIPT_PATH', help="path to the script file")
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exeption occurs',
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exception occurs',
action='store_true')
bulk_script_parser.add_argument('--tries', type=int, default=2,
help='how many tries before trying the next card')
@@ -477,6 +499,23 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
"""Echo (print) a string on the console"""
self.poutput(' '.join(opts.STRING))
query_card_key_parser = argparse.ArgumentParser()
query_card_key_parser.add_argument('FIELDS', help="fields to query", type=str, nargs='+')
query_card_key_parser.add_argument('--key', help='lookup key (typically \'ICCID\' or \'EID\')',
type=str, required=True)
query_card_key_parser.add_argument('--value', help='lookup key match value (e.g \'8988211000000123456\')',
type=str, required=True)
@cmd2.with_argparser(query_card_key_parser)
@cmd2.with_category(CUSTOM_CATEGORY)
def do_query_card_key(self, opts):
"""Manually query the Card Key Provider"""
result = card_key_provider_get(opts.FIELDS, opts.key, opts.value)
self.poutput("Result:")
if result == {}:
self.poutput(" (none)")
for k in result.keys():
self.poutput(" %s: %s" % (str(k), str(result.get(k))))
@cmd2.with_category(CUSTOM_CATEGORY)
def do_version(self, opts):
"""Print the pySim software version."""
@@ -731,7 +770,7 @@ class PySimCommands(CommandSet):
body = {}
for t in tags:
result = self._cmd.lchan.retrieve_data(t)
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
body[t] = b2h(val)
else:
raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename))
@@ -915,36 +954,53 @@ class Iso7816Commands(CommandSet):
raise RuntimeError("cannot find %s for ICCID '%s'" % (field, iccid))
return result
@staticmethod
def __select_pin_nr(pin_type:str, pin_nr:int) -> int:
if pin_type:
# pylint: disable=unsubscriptable-object
return pin_names.inverse[pin_type]
return pin_nr
@staticmethod
def __add_pin_nr_to_ArgumentParser(chv_parser):
group = chv_parser.add_mutually_exclusive_group()
group.add_argument('--pin-type',
choices=[x for x in pin_names.values()
if (x.startswith('PIN') or x.startswith('2PIN'))],
help='Specifiy pin type (default is PIN1)')
group.add_argument('--pin-nr', type=auto_uint8, default=0x01,
help='PIN Number, 1=PIN1, 0x81=2PIN1 or custom value (see also TS 102 221, Table 9.3")')
verify_chv_parser = argparse.ArgumentParser()
verify_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
verify_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried')
__add_pin_nr_to_ArgumentParser(verify_chv_parser)
@cmd2.with_argparser(verify_chv_parser)
def do_verify_chv(self, opts):
"""Verify (authenticate) using specified CHV (PIN) code, which is how the specifications
call it if you authenticate yourself using the specified PIN. There usually is at least PIN1 and
PIN2."""
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
(data, sw) = self._cmd.lchan.scc.verify_chv(opts.pin_nr, h2b(pin))
2PIN1 (see also TS 102 221 Section 9.5.1 / Table 9.3)."""
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
(data, sw) = self._cmd.lchan.scc.verify_chv(pin_nr, h2b(pin))
self._cmd.poutput("CHV verification successful")
unblock_chv_parser = argparse.ArgumentParser()
unblock_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
unblock_chv_parser.add_argument('PUK', nargs='?', type=is_decimal,
help='PUK code value. If none given, CSV file will be queried')
unblock_chv_parser.add_argument('NEWPIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried')
__add_pin_nr_to_ArgumentParser(unblock_chv_parser)
@cmd2.with_argparser(unblock_chv_parser)
def do_unblock_chv(self, opts):
"""Unblock PIN code using specified PUK code"""
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
puk = self.get_code(opts.PUK, "PUK" + str(opts.pin_nr))
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
puk = self.get_code(opts.PUK, "PUK" + str(pin_nr))
(data, sw) = self._cmd.lchan.scc.unblock_chv(
opts.pin_nr, h2b(puk), h2b(new_pin))
pin_nr, h2b(puk), h2b(new_pin))
self._cmd.poutput("CHV unblock successful")
change_chv_parser = argparse.ArgumentParser()
@@ -952,42 +1008,42 @@ class Iso7816Commands(CommandSet):
help='PIN code value. If none given, CSV file will be queried')
change_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried')
change_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
__add_pin_nr_to_ArgumentParser(change_chv_parser)
@cmd2.with_argparser(change_chv_parser)
def do_change_chv(self, opts):
"""Change PIN code to a new PIN code"""
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
(data, sw) = self._cmd.lchan.scc.change_chv(
opts.pin_nr, h2b(pin), h2b(new_pin))
pin_nr, h2b(pin), h2b(new_pin))
self._cmd.poutput("CHV change successful")
disable_chv_parser = argparse.ArgumentParser()
disable_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
disable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried')
__add_pin_nr_to_ArgumentParser(disable_chv_parser)
@cmd2.with_argparser(disable_chv_parser)
def do_disable_chv(self, opts):
"""Disable PIN code using specified PIN code"""
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
(data, sw) = self._cmd.lchan.scc.disable_chv(opts.pin_nr, h2b(pin))
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
(data, sw) = self._cmd.lchan.scc.disable_chv(pin_nr, h2b(pin))
self._cmd.poutput("CHV disable successful")
enable_chv_parser = argparse.ArgumentParser()
enable_chv_parser.add_argument(
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
__add_pin_nr_to_ArgumentParser(enable_chv_parser)
enable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
help='PIN code value. If none given, CSV file will be queried')
@cmd2.with_argparser(enable_chv_parser)
def do_enable_chv(self, opts):
"""Enable PIN code using specified PIN code"""
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
(data, sw) = self._cmd.lchan.scc.enable_chv(opts.pin_nr, h2b(pin))
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
(data, sw) = self._cmd.lchan.scc.enable_chv(pin_nr, h2b(pin))
self._cmd.poutput("CHV enable successful")
def do_deactivate_file(self, opts):
@@ -1071,16 +1127,23 @@ argparse_add_reader_args(option_parser)
global_group = option_parser.add_argument_group('General Options')
global_group.add_argument('--script', metavar='PATH', default=None,
help='script with pySim-shell commands to be executed automatically at start-up')
global_group.add_argument('--csv', metavar='FILE',
default=None, help='Read card data from CSV file')
global_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help='per-CSV-column AES transport key')
global_group.add_argument("--card_handler", dest="card_handler_config", metavar="FILE",
help="Use automatic card handling machine")
global_group.add_argument("--noprompt", help="Run in non interactive mode",
action='store_true', default=False)
global_group.add_argument("--skip-card-init", help="Skip all card/profile initialization",
action='store_true', default=False)
global_group.add_argument("--verbose", help="Enable verbose logging",
action='store_true', default=False)
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
card_key_group.add_argument('--csv', metavar='FILE',
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
help='Read card data from CSV file')
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help=argparse.SUPPRESS, dest='column_key')
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help='per-column AES transport key', dest='column_key')
adm_group = global_group.add_mutually_exclusive_group()
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
@@ -1095,23 +1158,27 @@ option_parser.add_argument("command", nargs='?',
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
help="Optional Arguments for command")
if __name__ == '__main__':
startup_errors = False
opts = option_parser.parse_args()
# Ensure that we are able to print formatted warnings from the beginning.
PySimLogger.setup(print, {logging.WARN: YELLOW})
if (opts.verbose):
PySimLogger.set_verbose(True)
PySimLogger.set_level(logging.DEBUG)
else:
PySimLogger.set_verbose(False)
PySimLogger.set_level(logging.INFO)
# Register csv-file as card data provider, either from specified CSV
# or from CSV file in home directory
csv_column_keys = {}
for par in opts.csv_column_key:
column_keys = {}
for par in opts.column_key:
name, key = par.split(':')
csv_column_keys[name] = key
csv_default = str(Path.home()) + "/.osmocom/pysim/card_data.csv"
if opts.csv:
card_key_provider_register(CardKeyProviderCsv(opts.csv, csv_column_keys))
if os.path.isfile(csv_default):
card_key_provider_register(CardKeyProviderCsv(csv_default, csv_column_keys))
column_keys[name] = key
if os.path.isfile(opts.csv):
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
# Init card reader driver
sl = init_reader(opts, proactive_handler = Proact())
@@ -1127,7 +1194,7 @@ if __name__ == '__main__':
# able to tolerate and recover from that.
try:
rs, card = init_card(sl, opts.skip_card_init)
app = PysimApp(card, rs, sl, ch)
app = PysimApp(opts.verbose, card, rs, sl, ch)
except:
startup_errors = True
print("Card initialization (%s) failed with an exception:" % str(sl))
@@ -1139,7 +1206,7 @@ if __name__ == '__main__':
print(" it should also be noted that some readers may behave strangely when no card")
print(" is inserted.)")
print("")
app = PysimApp(None, None, sl, ch)
app = PysimApp(opts.verbose, None, None, sl, ch)
# If the user supplies an ADM PIN at via commandline args authenticate
# immediately so that the user does not have to use the shell commands

View File

@@ -84,7 +84,7 @@ def tcp_connected_callback(p: protocol.Protocol):
logger.error("%s: connected!" % p)
class ProactChannel:
"""Representation of a single proective channel."""
"""Representation of a single protective channel."""
def __init__(self, channels: 'ProactChannels', chan_nr: int):
self.channels = channels
self.chan_nr = chan_nr

View File

@@ -23,7 +23,6 @@ from pySim.apdu_source.gsmtap import GsmtapApduSource
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive
from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap
from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource
from pySim.apdu_source.stdin_hex import StdinHexApduSource
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
@@ -33,11 +32,10 @@ logger = colorlog.getLogger()
# merge all of the command sets into one global set. This will override instructions,
# the one from the 'last' set in the addition below will prevail.
from pySim.apdu.ts_51_011 import ApduCommands as SimApduCommands
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
ApduCommands = SimApduCommands + UiccApduCommands + UsimApduCommands #+ GpApduCommands
ApduCommands = UiccApduCommands + UsimApduCommands #+ GpApduCommands
class DummySimLink(LinkBase):
@@ -153,7 +151,7 @@ global_group.add_argument('--no-suppress-select', action='store_false', dest='su
global_group.add_argument('--no-suppress-status', action='store_false', dest='suppress_status',
help="""
Don't suppress displaying STATUS APDUs. We normally suppress them as they don't provide any
information that was not already received in resposne to the most recent SEELCT.""")
information that was not already received in response to the most recent SEELCT.""")
global_group.add_argument('--show-raw-apdu', action='store_true', dest='show_raw_apdu',
help="""Show the raw APDU in addition to its parsed form.""")
@@ -190,11 +188,7 @@ parser_rspro_pyshark_live.add_argument('-i', '--interface', required=True,
parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
Read APDUs from a TCA Loader log file.""")
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
help='Name of te log file to be read')
parser_stdin_hex = subparsers.add_parser('stdin-hex', help="""
Read APDUs as hex-string from stdin.""")
help='Name of the log file to be read')
if __name__ == '__main__':
@@ -211,8 +205,6 @@ if __name__ == '__main__':
s = PysharkGsmtapPcap(opts.pcap_file)
elif opts.source == 'tca-loader-log':
s = TcaLoaderLogApduSource(opts.log_file)
elif opts.source == 'stdin-hex':
s = StdinHexApduSource()
else:
raise ValueError("unsupported source %s", opts.source)

View File

@@ -1,339 +0,0 @@
# coding=utf-8
"""APDU definitions/decoders of 3GPP TS 51.011, the classic SIM spec.
(C) 2022 by Harald Welte <laforge@osmocom.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
from construct import GreedyRange, Struct
from pySim.construct import *
from pySim.filesystem import *
from pySim.runtime import RuntimeLchan
from pySim.apdu import ApduCommand, ApduCommandSet
from typing import Optional, Dict, Tuple
logger = logging.getLogger(__name__)
# TS 51.011 Section 9.2.1
class SimSelect(ApduCommand, n='SELECT', ins=0xA4, cla=['A0']):
_apdu_case = 4
def process_on_lchan(self, lchan: RuntimeLchan):
path = [self.cmd_data[i:i+2] for i in range(0, len(self.cmd_data), 2)]
for file in path:
file_hex = b2h(file)
sels = lchan.selected_file.get_selectables(['FIDS'])
if file_hex in sels:
if self.successful:
#print("\tSELECT %s" % sels[file_hex])
lchan.selected_file = sels[file_hex]
else:
#print("\tSELECT %s FAILED" % sels[file_hex])
pass
continue
logger.warning('SELECT UNKNOWN FID %s (%s)' % (file_hex, '/'.join([b2h(x) for x in path])))
if len(self.cmd_data) != 2:
raise ValueError('Expecting a 2-byte FID')
# decode the SELECT response
if self.successful:
self.file = lchan.selected_file
if 'body' in self.rsp_dict:
# not every SELECT is asking for the FCP in response...
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
return None
# TS 51.011 Section 9.2.2
class SimStatus(ApduCommand, n='STATUS', ins=0xF2, cla=['A0']):
_apdu_case = 2
def process_on_lchan(self, lchan):
if self.successful:
if 'body' in self.rsp_dict:
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
def _decode_binary_p1p2(p1, p2) -> Dict:
ret = {}
if p1 & 0x80:
ret['file'] = 'sfi'
ret['sfi'] = p1 & 0x1f
ret['offset'] = p2
else:
ret['file'] = 'currently_selected_ef'
ret['offset'] = ((p1 & 0x7f) << 8) & p2
return ret
# TS 51.011 Section 9.2.3 / 31.101
class ReadBinary(ApduCommand, n='READ BINARY', ins=0xB0, cla=['A0']):
_apdu_case = 2
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short reads
if self.cmd_dict['offset'] != 0 or self.lr < self.file.size[0]:
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 51.011 Section 9.2.4 / 31.101
class UpdateBinary(ApduCommand, n='UPDATE BINARY', ins=0xD6, cla=['A0']):
_apdu_case = 3
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short writes
if self.cmd_dict['offset'] != 0 or self.lc < self.file.size[0]:
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
def _decode_record_p1p2(p1, p2):
ret = {}
ret['record_number'] = p1
if p2 >> 3 == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = p2 >> 3
mode = p2 & 0x7
if mode == 2:
ret['mode'] = 'next_record'
elif mode == 3:
ret['mode'] = 'previous_record'
elif mode == 8:
ret['mode'] = 'absolute_current'
return ret
# TS 51.011 Section 9.2.5
class ReadRecord(ApduCommand, n='READ RECORD', ins=0xB2, cla=['A0']):
_apdu_case = 2
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 51.011 Section 9.2.6
class UpdateRecord(ApduCommand, n='UPDATE RECORD', ins=0xDC, cla=['A0']):
_apdu_case = 3
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
# TS 51.011 Section 9.2.7
class Seek(ApduCommand, n='SEEK', ins=0xA2, cla=['A0']):
_apdu_case = 4
_construct_rsp = GreedyRange(Int8ub)
def _decode_p1p2(self):
ret = {}
sfi = self.p2 >> 3
if sfi == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = sfi
mode = self.p2 & 0x7
if mode in [0x4, 0x5]:
if mode == 0x4:
ret['mode'] = 'forward_search'
else:
ret['mode'] = 'backward_search'
ret['record_number'] = self.p1
self.col_id = '%02u' % ret['record_number']
elif mode == 6:
ret['mode'] = 'enhanced_search'
# TODO: further decode
elif mode == 7:
ret['mode'] = 'proprietary_search'
return ret
def _decode_cmd(self):
ret = self._decode_p1p2()
if self.cmd_data:
if ret['mode'] == 'enhanced_search':
ret['search_indication'] = b2h(self.cmd_data[:2])
ret['search_string'] = b2h(self.cmd_data[2:])
else:
ret['search_string'] = b2h(self.cmd_data)
return ret
def process_on_lchan(self, lchan):
self._determine_file(lchan)
return self.to_dict()
# TS 51.011 Section 9.2.8
class Increase(ApduCommand, n='INCREASE', ins=0x32, cla=['A0']):
_apdu_case = 4
PinConstructP2 = BitStruct('scope'/Enum(Flag, global_mf=0, specific_df_adf=1),
BitsInteger(2), 'reference_data_nr'/BitsInteger(5))
# TS 51.011 Section 9.2.9
class VerifyChv(ApduCommand, n='VERIFY CHV', ins=0x20, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
@staticmethod
def _pin_process(apdu):
processed = {
'scope': apdu.cmd_dict['p2']['scope'],
'referenced_data_nr': apdu.cmd_dict['p2']['reference_data_nr'],
}
if apdu.lc == 0:
# this is just a question on the counters remaining
processed['mode'] = 'check_remaining_attempts'
else:
processed['pin'] = b2h(apdu.cmd_data)
if apdu.sw[0] == 0x63:
processed['remaining_attempts'] = apdu.sw[1] & 0xf
return processed
@staticmethod
def _pin_is_success(sw):
if sw[0] == 0x63:
return True
else:
return False
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.10
class ChangeChv(ApduCommand, n='CHANGE CHV', ins=0x24, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.11
class DisableChv(ApduCommand, n='DISABLE CHV', ins=0x26, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.12
class EnableChv(ApduCommand, n='ENABLE CHV', ins=0x28, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.13
class UnblockChv(ApduCommand, n='UNBLOCK CHV', ins=0x2C, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.14
class Invalidate(ApduCommand, n='INVALIDATE', ins=0x04, cla=['A0']):
_apdu_case = 1
_construct_p1 = BitStruct(BitsInteger(4),
'select_mode'/Enum(BitsInteger(4), ef_by_file_id=0,
path_from_mf=8, path_from_current_df=9))
# TS 51.011 Section 9.2.15
class Rehabilitate(ApduCommand, n='REHABILITATE', ins=0x44, cla=['A0']):
_apdu_case = 1
_construct_p1 = Invalidate._construct_p1
# TS 51.011 Section 9.2.16
class RunGsmAlgorithm(ApduCommand, n='RUN GSM ALGORITHM', ins=0x88, cla=['A0']):
_apdu_case = 4
_construct = Struct('rand'/Bytes(16))
_construct_rsp = Struct('sres'/Bytes(4), 'kc'/Bytes(8))
# TS 51.011 Section 9.2.17
class Sleep(ApduCommand, n='SLEEP', ins=0xFA, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.18
class GetResponse(ApduCommand, n='GET RESPONSE', ins=0xC0, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.19
class TerminalProfile(ApduCommand, n='TERMINAL PROFILE', ins=0x10, cla=['A0']):
_apdu_case = 3
# TS 51.011 Section 9.2.20
class Envelope(ApduCommand, n='ENVELOPE', ins=0xC2, cla=['A0']):
_apdu_case = 4
# TS 51.011 Section 9.2.21
class Fetch(ApduCommand, n='FETCH', ins=0x12, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.22
class TerminalResponse(ApduCommand, n='TERMINAL RESPONSE', ins=0x14, cla=['A0']):
_apdu_case = 3
ApduCommands = ApduCommandSet('TS 51.011', cmds=[SimSelect, SimStatus, ReadBinary, UpdateBinary, ReadRecord,
UpdateRecord, Seek, Increase, VerifyChv, ChangeChv, DisableChv,
EnableChv, UnblockChv, Invalidate, Rehabilitate, RunGsmAlgorithm,
Sleep, GetResponse, TerminalProfile, Envelope, Fetch, TerminalResponse])

View File

@@ -1,39 +0,0 @@
# coding=utf-8
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.utils import h2b
from pySim.gsmtap import GsmtapSource
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_102_222 import ApduCommands as UiccAdmApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
from . import ApduSource, PacketType, CardReset
ApduCommands = UiccApduCommands + UiccAdmApduCommands + UsimApduCommands + GpApduCommands
class StdinHexApduSource(ApduSource):
"""ApduSource for reading apdu hex-strings from stdin."""
def read_packet(self) -> PacketType:
while True:
command = input("C-APDU >")
response = '9000'
return ApduCommands.parse_cmd_bytes(h2b(command) + h2b(response))
raise StopIteration

View File

@@ -31,7 +31,6 @@ from pySim.exceptions import SwMatchError
# CardModel is created, which will add the ATR-based matching and
# calling of SysmocomSJA2.add_files. See CardModel.apply_matching_models
import pySim.sysmocom_sja2
#import pySim.sysmocom_euicc1
# we need to import these modules so that the various sub-classes of
# CardProfile are created, which will be used in init_card() to iterate

View File

@@ -317,7 +317,7 @@ class ADF_ARAM(CardADF):
store_ref_ar_do_parse = argparse.ArgumentParser()
# REF-DO
store_ref_ar_do_parse.add_argument(
'--device-app-id', required=True, help='Identifies the specific device application that the rule appplies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
'--device-app-id', required=True, help='Identifies the specific device application that the rule applies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
aid_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
aid_grp.add_argument(
'--aid', help='Identifies the specific SE application for which rules are to be stored. Can be a partial AID, containing for example only the RID. (5-16 or 0 hex bytes)')
@@ -399,7 +399,7 @@ class ADF_ARAM(CardADF):
sw_aram = {
'ARA-M': {
'6381': 'Rule successfully stored but an access rule already exists',
'6382': 'Rule successfully stored bu contained at least one unknown (discarded) BER-TLV',
'6382': 'Rule successfully stored but contained at least one unknown (discarded) BER-TLV',
'6581': 'Memory Problem',
'6700': 'Wrong Length in Lc',
'6981': 'DO is not supported by the ARA-M/ARA-C',

View File

@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
operation with pySim-shell.
"""
# (C) 2021-2024 by Sysmocom s.f.m.c. GmbH
# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier, Harald Welte
@@ -31,128 +31,161 @@ operation with pySim-shell.
from typing import List, Dict, Optional
from Cryptodome.Cipher import AES
from osmocom.utils import h2b, b2h
from pySim.log import PySimLogger
import abc
import csv
import logging
log = PySimLogger.get("CARDKEY")
card_key_providers = [] # type: List['CardKeyProvider']
# well-known groups of columns relate to a given functionality. This avoids having
# to specify the same transport key N number of times, if the same key is used for multiple
# fields of one group, like KIC+KID+KID of one SD.
CRYPT_GROUPS = {
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
class CardKeyFieldCryptor:
"""
A Card key field encryption class that may be used by Card key provider implementations to add support for
a column-based encryption to protect sensitive material (cryptographic key material, ADM keys, etc.).
The sensitive material is encrypted using a "key-encryption key", occasionally also known as "transport key"
before it is stored into a file or database (see also GSMA FS.28). The "transport key" is then used to decrypt
the key material on demand.
"""
# well-known groups of columns relate to a given functionality. This avoids having
# to specify the same transport key N number of times, if the same key is used for multiple
# fields of one group, like KIC+KID+KID of one SD.
__CRYPT_GROUPS = {
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
}
class CardKeyProvider(abc.ABC):
"""Base class, not containing any concrete implementation."""
VALID_KEY_FIELD_NAMES = ['ICCID', 'EID', 'IMSI' ]
# check input parameters, but do nothing concrete yet
def _verify_get_data(self, fields: List[str] = [], key: str = 'ICCID', value: str = "") -> Dict[str, str]:
"""Verify multiple fields for identified card.
Args:
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
key : look-up key to identify card data, such as 'ICCID'
value : value for look-up key to identify card data
Returns:
dictionary of {field, value} strings for each requested field from 'fields'
"""
if key not in self.VALID_KEY_FIELD_NAMES:
raise ValueError("Key field name '%s' is not a valid field name, valid field names are: %s" %
(key, str(self.VALID_KEY_FIELD_NAMES)))
return {}
def get_field(self, field: str, key: str = 'ICCID', value: str = "") -> Optional[str]:
"""get a single field from CSV file using a specified key/value pair"""
fields = [field]
result = self.get(fields, key, value)
return result.get(field)
@abc.abstractmethod
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
"""Get multiple card-individual fields for identified card.
Args:
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
key : look-up key to identify card data, such as 'ICCID'
value : value for look-up key to identify card data
Returns:
dictionary of {field, value} strings for each requested field from 'fields'
"""
class CardKeyProviderCsv(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified CSV file.
Supports column-based encryption as it is generally a bad idea to store cryptographic key material in
plaintext. Instead, the key material should be encrypted by a "key-encryption key", occasionally also
known as "transport key" (see GSMA FS.28)."""
IV = b'\x23' * 16
csv_file = None
filename = None
def __init__(self, filename: str, transport_keys: dict):
"""
Args:
filename : file name (path) of CSV file containing card-individual key/data
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
respective field (column) of the CSV. This is done so that different fields
(columns) can use different transport keys, which is strongly recommended by
GSMA FS.28
"""
self.csv_file = open(filename, 'r')
if not self.csv_file:
raise RuntimeError("Could not open CSV file '%s'" % filename)
self.filename = filename
self.transport_keys = self.process_transport_keys(transport_keys)
__IV = b'\x23' * 16
@staticmethod
def process_transport_keys(transport_keys: dict):
def __dict_keys_to_upper(d: dict) -> dict:
return {k.upper():v for k,v in d.items()}
@staticmethod
def __process_transport_keys(transport_keys: dict, crypt_groups: dict):
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
new_dict = {}
for name, key in transport_keys.items():
if name in CRYPT_GROUPS:
for field in CRYPT_GROUPS[name]:
if name in crypt_groups:
for field in crypt_groups[name]:
new_dict[field] = key
else:
new_dict[name] = key
return new_dict
def _decrypt_field(self, field_name: str, encrypted_val: str) -> str:
"""decrypt a single field, if we have a transport key for the field of that name."""
if not field_name in self.transport_keys:
def __init__(self, transport_keys: dict):
"""
Create new field encryptor/decryptor object and set transport keys, usually one for each column. In some cases
it is also possible to use a single key for multiple columns (see also __CRYPT_GROUPS)
Args:
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
respective field (column) of the CSV. This is done so that different fields
(columns) can use different transport keys, which is strongly recommended by
GSMA FS.28
"""
self.transport_keys = self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys),
self.__CRYPT_GROUPS)
for name, key in self.transport_keys.items():
log.debug("Encrypting/decrypting field %s using AES key %s" % (name, key))
def decrypt_field(self, field_name: str, encrypted_val: str) -> str:
"""
Decrypt a single field. The decryption is only applied if we have a transport key is known under the provided
field name, otherwise the field is treated as plaintext and passed through as it is.
Args:
field_name : name of the field to decrypt (used to identify which key to use)
encrypted_val : encrypted field value
Returns:
plaintext field value
"""
if not field_name.upper() in self.transport_keys:
return encrypted_val
cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, self.IV)
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
return b2h(cipher.decrypt(h2b(encrypted_val)))
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
super()._verify_get_data(fields, key, value)
def encrypt_field(self, field_name: str, plaintext_val: str) -> str:
"""
Encrypt a single field. The encryption is only applied if we have a transport key is known under the provided
field name, otherwise the field is treated as non sensitive and passed through as it is.
Args:
field_name : name of the field to decrypt (used to identify which key to use)
encrypted_val : encrypted field value
Returns:
plaintext field value
"""
if not field_name.upper() in self.transport_keys:
return plaintext_val
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
return b2h(cipher.encrypt(h2b(plaintext_val)))
class CardKeyProvider(abc.ABC):
"""Base class, not containing any concrete implementation."""
@abc.abstractmethod
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
"""
Get multiple card-individual fields for identified card. This method should not fail with an exception in
case the entry, columns or even the key column itsself is not found.
Args:
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
key : look-up key to identify card data, such as 'ICCID'
value : value for look-up key to identify card data
Returns:
dictionary of {field : value, ...} strings for each requested field from 'fields'. In case nothing is
fond None shall be returned.
"""
def __str__(self):
return type(self).__name__
class CardKeyProviderCsv(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified CSV file."""
def __init__(self, csv_filename: str, transport_keys: dict):
"""
Args:
csv_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor)
"""
self.csv_file = open(csv_filename, 'r')
if not self.csv_file:
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
self.csv_filename = csv_filename
self.crypt = CardKeyFieldCryptor(transport_keys)
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
self.csv_file.seek(0)
cr = csv.DictReader(self.csv_file)
if not cr:
raise RuntimeError(
"Could not open DictReader for CSV-File '%s'" % self.filename)
raise RuntimeError("Could not open DictReader for CSV-File '%s'" % self.csv_filename)
cr.fieldnames = [field.upper() for field in cr.fieldnames]
rc = {}
if key not in cr.fieldnames:
return None
return_dict = {}
for row in cr:
if row[key] == value:
for f in fields:
if f in row:
rc.update({f: self._decrypt_field(f, row[f])})
return_dict.update({f: self.crypt.decrypt_field(f, row[f])})
else:
raise RuntimeError("CSV-File '%s' lacks column '%s'" %
(self.filename, f))
return rc
raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.csv_filename, f))
if return_dict == {}:
return None
return return_dict
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
@@ -163,11 +196,11 @@ def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key
provider_list : override the list of providers from the global default
"""
if not isinstance(provider, CardKeyProvider):
raise ValueError("provider is not a card data provier")
raise ValueError("provider is not a card data provider")
provider_list.append(provider)
def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
def card_key_provider_get(fields: list[str], key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
"""Query all registered card data providers for card-individual [key] data.
Args:
@@ -178,17 +211,21 @@ def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_p
Returns:
dictionary of {field, value} strings for each requested field from 'fields'
"""
key = key.upper()
fields = [f.upper() for f in fields]
for p in provider_list:
if not isinstance(p, CardKeyProvider):
raise ValueError(
"provider list contains element which is not a card data provier")
raise ValueError("Provider list contains element which is not a card data provider")
log.debug("Searching for card key data (key=%s, value=%s, provider=%s)" % (key, value, str(p)))
result = p.get(fields, key, value)
if result:
log.debug("Found card data: %s" % (str(result)))
return result
return {}
raise ValueError("Unable to find card key data (key=%s, value=%s, fields=%s)" % (key, value, str(fields)))
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> Optional[str]:
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> str:
"""Query all registered card data providers for a single field.
Args:
@@ -199,11 +236,7 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
Returns:
dictionary of {field, value} strings for the requested field
"""
for p in provider_list:
if not isinstance(p, CardKeyProvider):
raise ValueError(
"provider list contains element which is not a card data provier")
result = p.get_field(field, key, value)
if result:
return result
return None
fields = [field]
result = card_key_provider_get(fields, key, value, card_key_providers)
return result.get(field.upper())

View File

@@ -112,6 +112,8 @@ class UiccCardBase(SimCardBase):
def probe(self) -> bool:
# EF.DIR is a mandatory EF on all ICCIDs; however it *may* also exist on a TS 51.011 SIM
ef_dir = EF_DIR()
# select MF first
self.file_exists("3f00")
return self.file_exists(ef_dir.fid)
def read_aids(self) -> List[Hexstr]:

View File

@@ -2,7 +2,7 @@
mainly) ETSI TS 102 223, ETSI TS 101 220 and USIM Application Toolkit (SAT)
as described in 3GPP TS 31.111."""
# (C) 2021-2024 by Harald Welte <laforge@osmocom.org>
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -316,19 +316,19 @@ class FileList(COMPR_TLV_IE, tag=0x92):
_construct = Struct('number_of_files'/Int8ub,
'files'/GreedyRange(FileId))
# TS 102 223 Secton 8.19
# TS 102 223 Section 8.19
class LocationInformation(COMPR_TLV_IE, tag=0x93):
pass
# TS 102 223 Secton 8.20
# TS 102 223 Section 8.20
class IMEI(COMPR_TLV_IE, tag=0x94):
_construct = BcdAdapter(GreedyBytes)
# TS 102 223 Secton 8.21
# TS 102 223 Section 8.21
class HelpRequest(COMPR_TLV_IE, tag=0x95):
pass
# TS 102 223 Secton 8.22
# TS 102 223 Section 8.22
class NetworkMeasurementResults(COMPR_TLV_IE, tag=0x96):
_construct = BcdAdapter(GreedyBytes)

View File

@@ -141,7 +141,7 @@ class SimCardCommands:
Returns:
Tuple of (decoded_data, sw)
"""
cmd = cmd_constr.build(cmd_data) if cmd_data else ''
cmd = cmd_constr.build(cmd_data) if cmd_data else b''
lc = i2h([len(cmd)]) if cmd_data else ''
le = '00' if resp_constr else ''
pdu = ''.join([cla, ins, p1, p2, lc, b2h(cmd), le])
@@ -285,7 +285,7 @@ class SimCardCommands:
return self.send_apdu_checksw(self.cla_byte + "a40304")
def select_adf(self, aid: Hexstr) -> ResTuple:
"""Execute SELECT a given Applicaiton ADF.
"""Execute SELECT a given Application ADF.
Args:
aid : application identifier as hex string
@@ -577,7 +577,7 @@ class SimCardCommands:
Args:
rand : 16 byte random data as hex string (RAND)
autn : 8 byte Autentication Token (AUTN)
autn : 8 byte Authentication Token (AUTN)
context : 16 byte random data ('3g' or 'gsm')
"""
# 3GPP TS 31.102 Section 7.1.2.1

View File

@@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
block_nr = self.block_nr
ciphertext = self._encrypt(padded_data)
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext))
block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20])
return ciphertext
def decrypt(self, data:bytes) -> bytes:
@@ -149,10 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC):
temp_data = self.mac_chain + tag_and_length + data
old_mcv = self.mac_chain
c_mac = self._auth(temp_data)
# DEBUG: Show MAC computation details
logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
ret = tag_and_length + data + c_mac
logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret))
tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20])
return ret
def verify(self, ciphertext: bytes) -> bool:
@@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
s_enc = out[l:2*l]
s_mac = out[l*2:3*l]
logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
return s_enc, s_mac, initial_mac_chaining_value
@@ -231,9 +246,21 @@ class BspInstance:
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
assert tag <= 255
assert len(plaintext) <= self.max_payload_size
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
# DEBUG: Show what we're processing
logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20])
ciphered = self.c_algo.encrypt(plaintext)
logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
maced = self.m_algo.auth(tag, ciphered)
logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}")
return maced
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
@@ -255,7 +282,7 @@ class BspInstance:
def mac_only_one(self, tag: int, plaintext: bytes) -> bytes:
"""MAC a single plaintext TLV. Returns the protected ciphertext."""
assert tag <= 255
assert len(plaintext) < self.max_payload_size
assert len(plaintext) <= self.max_payload_size
maced = self.m_algo.auth(tag, plaintext)
# The data block counter for ICV calculation is incremented also for each segment with C-MAC only.
self.c_algo.block_nr += 1

View File

@@ -116,7 +116,7 @@ class param:
pass
class Es2PlusApiFunction(JsonHttpApiFunction):
"""Base classs for representing an ES2+ API Function."""
"""Base class for representing an ES2+ API Function."""
pass
# ES2+ DownloadOrder function (SGP.22 section 5.3.1)

View File

@@ -25,6 +25,9 @@ import pySim.esim.rsp as rsp
from pySim.esim.bsp import BspInstance
from pySim.esim import PMO
import logging
logger = logging.getLogger(__name__)
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
@@ -73,10 +76,11 @@ def gen_replace_session_keys(ppk_enc: bytes, ppk_cmac: bytes, initial_mcv: bytes
class ProfileMetadata:
"""Representation of Profile metadata. Right now only the mandatory bits are
supported, but in general this should follow the StoreMetadataRequest of SGP.22 5.5.3"""
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str):
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str, profile_class = 'operational'):
self.iccid_bin = iccid_bin
self.spn = spn
self.profile_name = profile_name
self.profile_class = profile_class
self.icon = None
self.icon_type = None
self.notifications = []
@@ -102,6 +106,14 @@ class ProfileMetadata:
'serviceProviderName': self.spn,
'profileName': self.profile_name,
}
if self.profile_class == 'test':
smr['profileClass'] = 0
elif self.profile_class == 'provisioning':
smr['profileClass'] = 1
elif self.profile_class == 'operational':
smr['profileClass'] = 2
else:
raise ValueError('Unsupported Profile Class %s' % self.profile_class)
if self.icon:
smr['icon'] = self.icon
smr['iconType'] = self.icon_type
@@ -196,8 +208,12 @@ class BoundProfilePackage(ProfilePackage):
# 'initialiseSecureChannelRequest'
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
# firstSequenceOf87
logger.debug("BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
# sequenceOF88
logger.debug("BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
if self.ppp: # we have to use session keys

View File

@@ -1,4 +1,4 @@
"""GSMA eSIM RSP ES9+ interface according ot SGP.22 v2.5"""
"""GSMA eSIM RSP ES9+ interface according to SGP.22 v2.5"""
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#

View File

@@ -159,7 +159,7 @@ class ApiError(Exception):
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
class JsonHttpApiFunction(abc.ABC):
"""Base classs for representing an HTTP[s] API Function."""
"""Base class for representing an HTTP[s] API Function."""
# the below class variables are expected to be overridden in derived classes
path = None

View File

@@ -90,13 +90,61 @@ class RspSessionState:
# FIXME: how to add the public key from smdp_otpk to an instance of EllipticCurvePrivateKey?
del state['_smdp_otsk']
del state['_smdp_ot_curve']
# automatically recover all the remainig state
# automatically recover all the remaining state
self.__dict__.update(state)
class RspSessionStore(shelve.DbfilenameShelf):
"""A derived class as wrapper around the database-backed non-volatile storage 'shelve', in case we might
need to extend it in the future. We use it to store RspSessionState objects indexed by transactionId."""
class RspSessionStore:
"""A wrapper around the database-backed storage 'shelve' for storing RspSessionState objects.
Can be configured to use either file-based storage or in-memory storage.
We use it to store RspSessionState objects indexed by transactionId."""
def __init__(self, filename: Optional[str] = None, in_memory: bool = False):
self._in_memory = in_memory
if in_memory:
self._shelf = shelve.Shelf(dict())
else:
if filename is None:
raise ValueError("filename is required for file-based session store")
self._shelf = shelve.open(filename)
# dunder magic
def __getitem__(self, key):
return self._shelf[key]
def __setitem__(self, key, value):
self._shelf[key] = value
def __delitem__(self, key):
del self._shelf[key]
def __contains__(self, key):
return key in self._shelf
def __iter__(self):
return iter(self._shelf)
def __len__(self):
return len(self._shelf)
# everything else
def __getattr__(self, name):
"""Delegate attribute access to the underlying shelf object."""
return getattr(self._shelf, name)
def close(self):
"""Close the session store."""
if hasattr(self._shelf, 'close'):
self._shelf.close()
if self._in_memory:
# For in-memory store, clear the reference
self._shelf = None
def sync(self):
"""Synchronize the cache with the underlying storage."""
if hasattr(self._shelf, 'sync'):
self._shelf.sync()
def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes:
"""Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This

View File

@@ -21,8 +21,6 @@ import io
import os
from typing import Tuple, List, Optional, Dict, Union
from collections import OrderedDict
from difflib import SequenceMatcher, Match
import asn1tools
import zipfile
from pySim import javacard
@@ -46,29 +44,6 @@ asn1 = compile_asn1_subdir('saip')
logger = logging.getLogger(__name__)
class NonMatch(Match):
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
@classmethod
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
The caller must ensure that the input list is ordered, non-overlapping and only contains
matches at equal offsets in a and b."""
res = []
cur = 0
for match in l:
if match.a != match.b:
return ValueError('only works for equal-offset matches')
assert match.a >= cur
nm_len = match.a - cur
if nm_len > 0:
# there's no point in generating zero-lenth non-matching sections
res.append(cls(a=cur, b=cur, size=nm_len))
cur = match.a + match.size
if size > cur:
res.append(cls(a=cur, b=cur, size=size-cur))
return res
class Naa:
"""A class defining a Network Access Application (NAA)"""
name = None
@@ -169,6 +144,9 @@ class File:
def file_size(self) -> Optional[int]:
"""Return the size of the file in bytes."""
if self.file_type in ['LF', 'CY']:
if self._file_size and self.nb_rec is None and self.rec_len:
self.nb_rec = self._file_size // self.rec_len
return self.nb_rec * self.rec_len
elif self.file_type in ['TR', 'BT']:
return self._file_size
@@ -316,6 +294,13 @@ class File:
dfName = fileDescriptor.get('dfName', None)
if dfName:
self.df_name = dfName
dfName = fileDescriptor.get('dfName', None)
if dfName:
self.df_name = dfName
efFileSize = fileDescriptor.get('efFileSize', None)
if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
pefi = fileDescriptor.get('proprietaryEFInfo', {})
securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None)
if securityAttributesReferenced:
@@ -325,13 +310,11 @@ class File:
fdb_dec = fd_dec['file_descriptor_byte']
self.shareable = fdb_dec['shareable']
if fdb_dec['file_type'] == 'working_ef':
efFileSize = fileDescriptor.get('efFileSize', None)
if fd_dec['num_of_rec']:
self.nb_rec = fd_dec['num_of_rec']
if fd_dec['record_len']:
self.rec_len = fd_dec['record_len']
if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
if self.rec_len and self.nb_rec == None:
# compute the number of records from file size and record length
self.nb_rec = self._file_size // self.rec_len
@@ -359,7 +342,7 @@ class File:
self.fill_pattern = pefi['fillPattern']
self.fill_pattern_repeat = False
elif fdb_dec['file_type'] == 'df':
# only set it, if an earlier call to from_template() didn't alrady set it, as
# only set it, if an earlier call to from_template() didn't already set it, as
# the template can differentiate between MF, DF and ADF (unlike FDB)
if not self.file_type:
self.file_type = 'DF'
@@ -377,6 +360,9 @@ class File:
fd = self.get_tuplelist_item(l, 'fileDescriptor')
if not fd and not self._template_derived:
raise ValueError("%s: No fileDescriptor found in tuple, and none set by template before" % self)
if self.name == "EF.SUCI_Calc_Info":
breakpoint()
#breakpoint()
if fd:
self.from_fileDescriptor(dict(fd))
# BODY
@@ -411,6 +397,16 @@ class File:
stream = io.BytesIO()
# Providing file content within "fillFileContent" / "fillFileOffset" shall have the same effect as
# creating a file with a fill/repeat pattern and thereafter updating the content via Update.
# Step 0: determine file size
#file_size = self._file_size
#for k, v in l:
# if k != 'fileDescriptor':
# continue
# file_desc = v
# if 'efFileSize' in file_desc:
# file_size = self._decode_file_size(file_desc['efFileSize'])
# Step 1: Fill with pattern from Fcp or Template
if self.fill_pattern:
stream.write(self.expand_fill_pattern())
@@ -431,35 +427,12 @@ class File:
return ValueError("Unknown key '%s' in tuple list" % k)
return stream.getvalue()
def file_content_to_tuples(self, optimize:bool = True) -> List[Tuple]:
if not self.file_type in ['TR', 'LF', 'CY', 'BT']:
return []
if not optimize:
# simplistic approach: encode the full file, ignoring the template/default
return [('fillFileContent', self.body)]
# Try to 'compress' the file body, based on the default file contents.
if self.template:
default = self.template.expand_default_value_pattern(length=len(self.body))
if not default:
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
else:
if default == self.body:
# 100% match: return an empty tuple list to make eUICC use the default
return []
sm = SequenceMatcher(a=default, b=self.body)
else:
# no template at all: we can only remove padding
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
matching_blocks = sm.get_matching_blocks()
# we can only make use of matches that have the same offset in 'a' and 'b'
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
ret = []
cur = 0
for block in non_matching_blocks:
ret.append(('fillFileOffset', block.a - cur))
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
return ret
def file_content_to_tuples(self) -> List[Tuple]:
# FIXME: simplistic approach. needs optimization. We should first check if the content
# matches the expanded default value from the template. If it does, return empty list.
# Next, we should compute the diff between the default value and self.body, and encode
# that as a sequence of fillFileOffset and fillFileContent tuples.
return [('fillFileContent', self.body)]
def __str__(self) -> str:
return "File(%s)" % self.pe_name
@@ -475,7 +448,7 @@ class File:
class ProfileElement:
"""Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly,
but ist more likely sub-classed with a specific class for the specific profile element type, like e.g
but it's more likely sub-classed with a specific class for the specific profile element type, like e.g
ProfileElementHeader, ProfileElementMF, ...
"""
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
@@ -488,7 +461,7 @@ class ProfileElement:
'genericFileManagement': 'gfm-header',
'akaParameter': 'aka-header',
'cdmaParameter': 'cdma-header',
# note how they couldn't even consistently captialize the 'header' suffix :(
# note how they couldn't even consistently capitalize the 'header' suffix :(
'application': 'app-Header',
'pukCodes': 'puk-Header',
'pinCodes': 'pin-Header',
@@ -676,7 +649,7 @@ class FsProfileElement(ProfileElement):
# this is a template that belongs into the [A]DF of another template
# 1) find the PE for the referenced template
parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid)
# 2) resolve te [A]DF that forms the base of that parent PE
# 2) resolve the [A]DF that forms the base of that parent PE
pe_df = parent_pe.files[template.parent.base_df().pe_name].node
self.pe_sequence.cur_df = pe_df
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
@@ -697,7 +670,7 @@ class FsProfileElement(ProfileElement):
self.add_file(file)
def create_file(self, pename: str) -> File:
"""Programatically create a file by its PE-Name."""
"""Programmatically create a file by its PE-Name."""
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
file = File(pename, None, template.files_by_pename.get(pename, None))
self.add_file(file)
@@ -1033,9 +1006,9 @@ class SecurityDomainKey:
self.key_components = key_components
def __repr__(self) -> str:
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=%s, Comp=%s)' % (self.key_version_number,
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=0x%x, Comp=%s)' % (self.key_version_number,
self.key_identifier,
self.key_usage_qualifier,
build_construct(KeyUsageQualifier, self.key_usage_qualifier)[0],
repr(self.key_components))
@classmethod
@@ -1457,7 +1430,7 @@ class ProfileElementHeader(ProfileElement):
iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None,
**kwargs):
"""You would usually initialize an instance either with a "decoded" argument (as read from
a DER-encoded SAIP file via asn1tools), or [some of] the othe arguments in case you're
a DER-encoded SAIP file via asn1tools), or [some of] the other arguments in case you're
constructing a Profile Header from scratch.
Args:
@@ -1610,7 +1583,7 @@ class ProfileElementSequence:
def _rebuild_pes_by_naa(self) -> None:
"""rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form,
which basically means for every NAA there's a lsit of instances, and each consists
which basically means for every NAA there's a list of instances, and each consists
of a list of a list of PEs."""
self.pres_by_naa = {}
petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end']
@@ -1738,7 +1711,7 @@ class ProfileElementSequence:
i += 1
def get_index_by_pe(self, pe: ProfileElement) -> int:
"""Return a list with the indicies of all instances of PEs of petype."""
"""Return a list with the indices of all instances of PEs of petype."""
ret = []
i = 0
for cur in self.pe_list:
@@ -1759,7 +1732,7 @@ class ProfileElementSequence:
self.insert_at_index(idx+1, pe_new)
def get_index_by_type(self, petype: str) -> List[int]:
"""Return a list with the indicies of all instances of PEs of petype."""
"""Return a list with the indices of all instances of PEs of petype."""
ret = []
i = 0
for pe in self.pe_list:
@@ -1784,7 +1757,7 @@ class ProfileElementSequence:
for service in naa.mandatory_services:
if service in hdr.decoded['eUICC-Mandatory-services']:
del hdr.decoded['eUICC-Mandatory-services'][service]
# remove any associaed mandatory filesystem templates
# remove any associated mandatory filesystem templates
for template in naa.templates:
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
@@ -2088,7 +2061,8 @@ class FsNodeADF(FsNodeDF):
super().__init__(fid, parent, file, name)
def __str__(self):
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name))
# self.df_name is usually None for an ADF like ADF.USIM or ADF.ISIM so we need to guard against it
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name) if self.df_name else None)
class FsNodeMF(FsNodeDF):
"""The MF (Master File) in the filesystem hierarchy."""

View File

@@ -673,7 +673,7 @@ class FilesUsimDf5GS(ProfileTemplate):
FileTemplate(0x4f06, 'EF.UAC_AIC', 'TR', None, 4, 2, 0x06, None, True, ass_serv=[126]),
FileTemplate(0x4f07, 'EF.SUCI_Calc_Info', 'TR', None, None, 2, 0x07, 'FF...FF', False, ass_serv=[124]),
FileTemplate(0x4f08, 'EF.OPL5G', 'LF', None, 10, 10, 0x08, 'FF...FF', False, ['nb_rec'], ass_serv=[129]),
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130]),
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130], pe_name='ef-supinai'),
FileTemplate(0x4f0a, 'EF.Routing_Indicator', 'TR', None, 4, 2, 0x0a, 'F0FFFFFF', False, ass_serv=[124]),
]
@@ -818,7 +818,7 @@ class FilesIsimOptional(ProfileTemplate):
base_path = Path('ADF.ISIM')
extends = FilesIsimMandatory
files = [
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5]),
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5], pe_name='ef-pcscf'),
FileTemplate(0x6f3c, 'EF.SMS', 'LF', 10, 176, 5, None, '00FF...FF', False, ass_serv=[6,8]),
FileTemplate(0x6f42, 'EF.SMSP', 'LF', 1, 38, 5, None, 'FF...FF', False, ass_serv=[8]),
FileTemplate(0x6f43, 'EF.SMSS', 'TR', None, 2, 5, None, 'FFFF', False, ass_serv=[6,8]),

View File

@@ -68,7 +68,7 @@ class CheckBasicStructure(ProfileConstraintChecker):
def check_optional_ordering(self, pes: ProfileElementSequence):
"""Check the ordering of optional PEs following the respective mandatory ones."""
# ordering and required depenencies
# ordering and required dependencies
self._is_after_if_exists(pes,'opt-usim', 'usim')
self._is_after_if_exists(pes,'opt-isim', 'isim')
self._is_after_if_exists(pes,'gsm-access', 'usim')

View File

@@ -149,7 +149,7 @@ class CertificateSet:
check_signed(c, self.root_cert)
return
parent_cert = self.intermediate_certs.get(aki, None)
if not aki:
if not parent_cert:
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki))
check_signed(c, parent_cert)
# if we reach here, we passed (no exception raised)

View File

@@ -39,7 +39,7 @@ from osmocom.utils import h2b, b2h, is_hex, auto_int, auto_uint8, auto_uint16, i
from osmocom.tlv import bertlv_parse_one
from osmocom.construct import filter_dict, parse_construct, build_construct
from pySim.utils import sw_match
from pySim.utils import sw_match, decomposeATR
from pySim.jsonpath import js_path_modify
from pySim.commands import SimCardCommands
from pySim.exceptions import SwMatchError
@@ -86,7 +86,7 @@ class CardFile:
self.service = service
self.shell_commands = [] # type: List[CommandSet]
# Note: the basic properties (fid, name, ect.) are verified when
# Note: the basic properties (fid, name, etc.) are verified when
# the file is attached to a parent file. See method add_file() in
# class Card DF
@@ -266,7 +266,7 @@ class CardFile:
def get_profile(self):
"""Get the profile associated with this file. If this file does not have any
profile assigned, try to find a file above (usually the MF) in the filesystem
hirarchy that has a profile assigned
hierarchy that has a profile assigned
"""
# If we have a profile set, return it
@@ -679,7 +679,7 @@ class TransparentEF(CardEF):
Args:
fid : File Identifier (4 hex digits)
sfid : Short File Identifier (2 hex digits, optional)
name : Brief name of the file, lik EF_ICCID
name : Brief name of the file, like EF_ICCID
desc : Description of the file
parent : Parent CardFile object within filesystem hierarchy
size : tuple of (minimum_size, recommended_size)
@@ -982,11 +982,11 @@ class LinFixedEF(CardEF):
Args:
fid : File Identifier (4 hex digits)
sfid : Short File Identifier (2 hex digits, optional)
name : Brief name of the file, lik EF_ICCID
name : Brief name of the file, like EF_ICCID
desc : Description of the file
parent : Parent CardFile object within filesystem hierarchy
rec_len : Tuple of (minimum_length, recommended_length)
leftpad: On write, data must be padded from the left to fit pysical record length
leftpad: On write, data must be padded from the left to fit physical record length
"""
super().__init__(fid=fid, sfid=sfid, name=name, desc=desc, parent=parent, **kwargs)
self.rec_len = rec_len
@@ -1422,7 +1422,7 @@ class BerTlvEF(CardEF):
Args:
fid : File Identifier (4 hex digits)
sfid : Short File Identifier (2 hex digits, optional)
name : Brief name of the file, lik EF_ICCID
name : Brief name of the file, like EF_ICCID
desc : Description of the file
parent : Parent CardFile object within filesystem hierarchy
size : tuple of (minimum_size, recommended_size)
@@ -1455,7 +1455,7 @@ class BerTlvEF(CardEF):
export_str += "delete_all\n"
for t in tags:
result = lchan.retrieve_data(t)
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
export_str += ("set_data 0x%02x %s\n" % (t, b2h(val)))
return export_str.strip()
@@ -1495,7 +1495,7 @@ class CardApplication:
self.name = name
self.adf = adf
self.sw = sw or {}
# back-reference from ADF to Applicaiton
# back-reference from ADF to Application
if self.adf:
self.aid = aid or self.adf.aid
self.adf.application = self
@@ -1545,6 +1545,13 @@ class CardModel(abc.ABC):
if atr == card_atr:
print("Detected CardModel:", cls.__name__)
return True
# if nothing found try to just compare the Historical Bytes of the ATR
card_atr_hb = decomposeATR(card_atr)['hb']
for atr in cls._atrs:
atr_hb = decomposeATR(atr)['hb']
if atr_hb == card_atr_hb:
print("Detected CardModel:", cls.__name__)
return True
return False
@staticmethod
@@ -1565,7 +1572,7 @@ class Path:
p = p.split('/')
elif len(p) and isinstance(p[0], int):
p = ['%04x' % x for x in p]
# make sure internal representation alwas is uppercase only
# make sure internal representation always is uppercase only
self.list = [x.upper() for x in p]
def __str__(self) -> str:

View File

@@ -627,7 +627,7 @@ class ADF_SD(CardADF):
kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b''
kcv = b2h(kcv_bin)
if self._cmd.lchan.scc.scp:
# encrypte key data with DEK of current SCP
# encrypted key data with DEK of current SCP
kcb = b2h(self._cmd.lchan.scc.scp.encrypt_key(h2b(opts.key_data[i])))
else:
# (for example) during personalization, DEK might not be required)
@@ -755,7 +755,7 @@ class ADF_SD(CardADF):
inst_load_parser = argparse.ArgumentParser()
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
help='AID of the loded file')
help='AID of the loaded file')
inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='',
help='AID of the Security Domain into which the file shalle be added')
inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='',
@@ -845,7 +845,7 @@ class ADF_SD(CardADF):
# TODO:tune chunk_len based on the overhead of the used SCP?
# build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
# transfer this in vaious chunks to the card
# transfer this in various chunks to the card
total_size = len(remainder)
block_nr = 0
while len(remainder):

View File

@@ -104,4 +104,4 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
# KID 0x02: SK.CASD.AUT (PK) and KS.CASD.AUT (Non-PK)
# KID 0x03: SK.CASD.CT (P) and KS.CASD.CT (Non-PK)
# KVN 0x75 KID 0x01: 16-byte DES key for Ciphered Load File Data Block
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s upport
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s support

View File

@@ -97,7 +97,7 @@ class CapFile():
raise ValueError("invalid cap file, %s missing!" % required_components[component])
def get_loadfile(self) -> bytes:
"""Get the executeable loadfile as hexstring"""
"""Get the executable loadfile as hexstring"""
# Concatenate all cap file components in the specified order
# see also: Java Card Platform Virtual Machine Specification, v3.2, section 6.3
loadfile = self.__component['Header']

View File

@@ -495,7 +495,7 @@ class IsimCard(UiccCardBase):
class MagicSimBase(abc.ABC, SimCard):
"""
Theses cards uses several record based EFs to store the provider infos,
These cards uses several record based EFs to store the provider infos,
each possible provider uses a specific record number in each EF. The
indexes used are ( where N is the number of providers supported ) :
- [2 .. N+1] for the operator name
@@ -644,7 +644,7 @@ class MagicSim(MagicSimBase):
class FakeMagicSim(SimCard):
"""
Theses cards have a record based EF 3f00/000c that contains the provider
These cards have a record based EF 3f00/000c that contains the provider
information. See the program method for its format. The records go from
1 to N.
"""

View File

@@ -296,7 +296,7 @@ def dec_addr_tlv(hexstr):
elif addr_type == 0x01: # IPv4
# Skip address tye byte i.e. first byte in value list
# Skip the unused byte in Octect 4 after address type byte as per 3GPP TS 31.102
# Skip the unused byte in Octet 4 after address type byte as per 3GPP TS 31.102
ipv4 = tlv[2][2:]
content = '.'.join(str(x) for x in ipv4)
return (content, '01')

125
pySim/log.py Normal file
View File

@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
""" pySim: Logging
"""
#
# (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier <pmaier@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
from cmd2 import style
class _PySimLogHandler(logging.Handler):
def __init__(self, log_callback):
super().__init__()
self.log_callback = log_callback
def emit(self, record):
formatted_message = self.format(record)
self.log_callback(formatted_message, record)
class PySimLogger:
"""
Static class to centralize the log output of PySim applications. This class can be used to print log messages from
any pySim module. Configuration of the log behaviour (see setup and set_ methods) is entirely optional. In case no
print callback is set (see setup method), the logger will pass the log messages directly to print() without applying
any formatting to the original log message.
"""
LOG_FMTSTR = "%(levelname)s: %(message)s"
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
__formatter = logging.Formatter(LOG_FMTSTR)
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
# No print callback by default, means that log messages are passed directly to print()
print_callback = None
# No specific color scheme by default
colors = {}
# The logging default is non-verbose logging on logging level DEBUG. This is a safe default that works for
# applications that ignore the presence of the PySimLogger class.
verbose = False
logging.root.setLevel(logging.DEBUG)
def __init__(self):
raise RuntimeError('static class, do not instantiate')
@staticmethod
def setup(print_callback = None, colors:dict = {}):
"""
Set a print callback function and color scheme. This function call is optional. In case this method is not
called, default settings apply.
Args:
print_callback : A callback function that accepts the resulting log string as input. The callback should
have the following format: print_callback(message:str)
colors : An optional dict through which certain log levels can be assigned a color.
(e.g. {logging.WARN: YELLOW})
"""
PySimLogger.print_callback = print_callback
PySimLogger.colors = colors
@staticmethod
def set_verbose(verbose:bool = False):
"""
Enable/disable verbose logging. (has no effect in case no print callback is set, see method setup)
Args:
verbose: verbosity (True = verbose logging, False = normal logging)
"""
PySimLogger.verbose = verbose;
@staticmethod
def set_level(level:int = logging.DEBUG):
"""
Set the logging level.
Args:
level: Logging level, valis log leves are: DEBUG, INFO, WARNING, ERROR and CRITICAL
"""
logging.root.setLevel(level)
@staticmethod
def _log_callback(message, record):
if not PySimLogger.print_callback:
# In case no print callback has been set display the message as if it were printed trough a normal
# python print statement.
print(record.message)
else:
# When a print callback is set, use it to display the log line. Apply color if the API user chose one
if PySimLogger.verbose:
formatted_message = logging.Formatter.format(PySimLogger.__formatter_verbose, record)
else:
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
color = PySimLogger.colors.get(record.levelno)
if color:
PySimLogger.print_callback(style(formatted_message, fg = color))
else:
PySimLogger.print_callback(formatted_message)
@staticmethod
def get(log_facility: str):
"""
Set up and return a new python logger object
Args:
log_facility : Name of log facility (e.g. "MAIN", "RUNTIME"...)
"""
logger = logging.getLogger(log_facility)
handler = _PySimLogHandler(log_callback=PySimLogger._log_callback)
logger.addHandler(handler)
return logger

View File

@@ -302,7 +302,7 @@ class OtaAlgoAuthDES3(OtaAlgoAuth):
class OtaAlgoCryptAES(OtaAlgoCrypt):
name = 'AES'
enum_name = 'aes_cbc'
blocksize = 16
blocksize = 16 # TODO: is this needed?
def _encrypt(self, data:bytes) -> bytes:
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
return cipher.encrypt(data)
@@ -356,20 +356,20 @@ class OtaDialectSms(OtaDialect):
# CHL + SPI (+ KIC + KID)
part_head = self.hdr_construct.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
print("part_head: %s" % b2h(part_head))
#print("part_head: %s" % b2h(part_head))
# CNTR + PCNTR (CNTR not used)
part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
print("part_cnt: %s" % b2h(part_cnt))
#print("part_cnt: %s" % b2h(part_cnt))
envelope_data = part_head + part_cnt + apdu
print("envelope_data: %s" % b2h(envelope_data))
#print("envelope_data: %s" % b2h(envelope_data))
# 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
# CPL from and including CPI to end of secured data, including any padding for ciphering
cpl = len(envelope_data) + len_sig
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
print("envelope_data with cpl: %s" % b2h(envelope_data))
#print("envelope_data with cpl: %s" % b2h(envelope_data))
if spi['rc_cc_ds'] == 'cc':
cc = otak.auth.sign(envelope_data)
@@ -383,7 +383,7 @@ class OtaDialectSms(OtaDialect):
else:
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
print("envelope_data with sig: %s" % b2h(envelope_data))
#print("envelope_data with sig: %s" % b2h(envelope_data))
# encrypt as needed
if spi['ciphering']: # ciphering is requested
@@ -395,7 +395,7 @@ class OtaDialectSms(OtaDialect):
else:
envelope_data = part_head + envelope_data
print("envelope_data: %s" % b2h(envelope_data))
#print("envelope_data: %s" % b2h(envelope_data))
if len(envelope_data) > 140:
raise ValueError('Cannot encode command in a single SMS; Fragmentation not implemented')
@@ -492,7 +492,6 @@ class OtaDialectSms(OtaDialect):
else:
raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
print(res)
# TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
if res.response_status == 'por_ok' and len(res['secured_data']):
dec = CompactRemoteResp.parse(res['secured_data'])

View File

@@ -110,7 +110,7 @@ class CardProfile:
@abc.abstractmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
"""Try to see if the specific profile matches the card. This method is a
placeholder that is overloaded by specific dirived classes. The method
placeholder that is overloaded by specific derived classes. The method
actively probes the card to make sure the profile class matches the
physical card. This usually also means that the card is reset during
the process, so this method must not be called at random times. It may

View File

@@ -23,6 +23,9 @@ from osmocom.tlv import bertlv_parse_one
from pySim.exceptions import *
from pySim.filesystem import *
from pySim.log import PySimLogger
log = PySimLogger.get("RUNTIME")
def lchan_nr_from_cla(cla: int) -> int:
"""Resolve the logical channel number from the CLA byte."""
@@ -44,6 +47,7 @@ class RuntimeState:
card : pysim.cards.Card instance
profile : CardProfile instance
"""
self.mf = CardMF(profile=profile)
self.card = card
self.profile = profile
@@ -60,10 +64,13 @@ class RuntimeState:
self.card.set_apdu_parameter(
cla=self.profile.cla, sel_ctrl=self.profile.sel_ctrl)
# make sure MF is selected before probing for Addons
self.lchan[0].select('MF')
for addon_cls in self.profile.addons:
addon = addon_cls()
if addon.probe(self.card):
print("Detected %s Add-on \"%s\"" % (self.profile, addon))
log.info("Detected %s Add-on \"%s\"" % (self.profile, addon))
for f in addon.files_in_mf:
self.mf.add_file(f)
@@ -97,18 +104,18 @@ class RuntimeState:
apps_taken = []
if aids_card:
aids_taken = []
print("AIDs on card:")
log.info("AIDs on card:")
for a in aids_card:
for f in apps_profile:
if f.aid in a:
print(" %s: %s (EF.DIR)" % (f.name, a))
log.info(" %s: %s (EF.DIR)" % (f.name, a))
aids_taken.append(a)
apps_taken.append(f)
aids_unknown = set(aids_card) - set(aids_taken)
for a in aids_unknown:
print(" unknown: %s (EF.DIR)" % a)
log.info(" unknown: %s (EF.DIR)" % a)
else:
print("warning: EF.DIR seems to be empty!")
log.warn("EF.DIR seems to be empty!")
# Some card applications may not be registered in EF.DIR, we will actively
# probe for those applications
@@ -123,7 +130,7 @@ class RuntimeState:
_data, sw = self.card.select_adf_by_aid(f.aid)
self.selected_adf = f
if sw == "9000":
print(" %s: %s" % (f.name, f.aid))
log.info(" %s: %s" % (f.name, f.aid))
apps_taken.append(f)
except (SwMatchError, ProtocolError):
pass
@@ -147,7 +154,7 @@ class RuntimeState:
# select MF to reset internal state and to verify card really works
self.lchan[0].select('MF', cmd_app)
self.lchan[0].selected_adf = None
# store ATR as part of our card identies dict
# store ATR as part of our card identities dict
self.identity['ATR'] = atr
return atr
@@ -321,7 +328,7 @@ class RuntimeLchan:
# If we succeed, we know that the file exists on the card and we may
# proceed with creating a new CardEF object in the local file model at
# run time. In case the file does not exist on the card, we just abort.
# The state on the card (selected file/application) wont't be changed,
# The state on the card (selected file/application) won't be changed,
# so we do not have to update any state in that case.
(data, _sw) = self.scc.select_file(fid)
except SwMatchError as swm:
@@ -470,11 +477,15 @@ class RuntimeLchan:
def get_file_for_filename(self, name: str):
"""Get the related CardFile object for a specified filename."""
if is_hex(name):
name = name.lower()
sels = self.selected_file.get_selectables()
return sels[name]
def activate_file(self, name: str):
"""Request ACTIVATE FILE of specified file."""
if is_hex(name):
name = name.lower()
sels = self.selected_file.get_selectables()
f = sels[name]
data, sw = self.scc.activate_file(f.fid)
@@ -507,6 +518,47 @@ class RuntimeLchan:
dec_data = self.selected_file.decode_hex(data)
return (dec_data, sw)
def __get_writeable_size(self):
""" Determine the writable size (file or record) using the cached FCP parameters of the currently selected
file. Return None in case the writeable size cannot be determined (no FCP available, FCP lacks size
information).
"""
fcp = self.selected_file_fcp
if not fcp:
return None
structure = fcp.get('file_descriptor', {}).get('file_descriptor_byte', {}).get('structure')
if not structure:
return None
if structure == 'transparent':
return fcp.get('file_size')
elif structure == 'linear_fixed':
return fcp.get('file_descriptor', {}).get('record_len')
else:
return None
def __check_writeable_size(self, data_len):
""" Guard against unsuccessful writes caused by attempts to write data that exceeds the file limits. """
writeable_size = self.__get_writeable_size()
if not writeable_size:
return
if isinstance(self.selected_file, TransparentEF):
writeable_name = "file"
elif isinstance(self.selected_file, LinFixedEF):
writeable_name = "record"
else:
writeable_name = "object"
if data_len > writeable_size:
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
(data_len, writeable_name, writeable_size, data_len - writeable_size))
elif data_len < writeable_size:
log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
def update_binary(self, data_hex: str, offset: int = 0):
"""Update transparent EF binary data.
@@ -517,6 +569,7 @@ class RuntimeLchan:
if not isinstance(self.selected_file, TransparentEF):
raise TypeError("Only works with TransparentEF, but %s is %s" % (self.selected_file,
self.selected_file.__class__.__mro__))
self.__check_writeable_size(len(data_hex) // 2 + offset)
return self.scc.update_binary(self.selected_file.fid, data_hex, offset, conserve=self.rs.conserve_write)
def update_binary_dec(self, data: dict):
@@ -564,6 +617,7 @@ class RuntimeLchan:
if not isinstance(self.selected_file, LinFixedEF):
raise TypeError("Only works with Linear Fixed EF, but %s is %s" % (self.selected_file,
self.selected_file.__class__.__mro__))
self.__check_writeable_size(len(data_hex) // 2)
return self.scc.update_record(self.selected_file.fid, rec_nr, data_hex,
conserve=self.rs.conserve_write,
leftpad=self.selected_file.leftpad)

View File

@@ -32,7 +32,7 @@ class SecureChannel(abc.ABC):
pass
def send_apdu_wrapper(self, send_fn: callable, pdu: Hexstr, *args, **kwargs) -> ResTuple:
"""Wrapper function to wrap command APDU and unwrap repsonse APDU around send_apdu callable."""
"""Wrapper function to wrap command APDU and unwrap response APDU around send_apdu callable."""
pdu_wrapped = b2h(self.wrap_cmd_apdu(h2b(pdu)))
res, sw = send_fn(pdu_wrapped, *args, **kwargs)
res_unwrapped = b2h(self.unwrap_rsp_apdu(h2b(sw), h2b(res)))

View File

@@ -30,13 +30,6 @@ from smpp.pdu import pdu_types, operations
BytesOrHex = typing.Union[Hexstr, bytes]
# 07
# 00 03 000201 # part 01 of 02 in reference 00
# 70 00
# 05
# 00 03 000202
class UserDataHeader:
# a single IE in the user data header
ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))

View File

@@ -162,28 +162,6 @@ class EF_SIM_AUTH_KEY(TransparentEF):
'key'/Bytes(16),
'op_opc' /Bytes(16))
class EF_HTTPS_CFG(TransparentEF):
def __init__(self, fid='6f2a', name='EF.HTTPS_CFG'):
super().__init__(fid, name=name, desc='HTTPS configuration')
class EF_HTTPS_KEYS(TransparentEF):
KeyRecord = Struct('security_domain'/Int8ub,
'key_type'/Enum(Int8ub, des=0x80, psk=0x85, aes=0x88),
'key_version'/Int8ub,
'key_id'/Int8ub,
'key_length'/Int8ub,
'key'/Bytes(this.key_length))
def __init__(self, fid='6f2b', name='EF.HTTPS_KEYS'):
super().__init__(fid, name=name, desc='HTTPS PSK and DEK keys')
self._construct = GreedyRange(self.KeyRecord)
class EF_HTTPS_POLL(TransparentEF):
TimeUnit = Enum(Int8ub, seconds=0, minutes=1, hours=2, days=3, ten_days=4)
def __init__(self, fid='6f2c', name='EF.HTTPS_POLL'):
super().__init__(fid, name=name, desc='HTTPS polling interval')
self._construct = Struct(Const(b'\x82'), 'time_unit'/self.TimeUnit, 'value'/Int8ub,
'adm_session_triggering_tlv'/GreedyBytes)
class DF_SYSTEM(CardDF):
def __init__(self):
@@ -202,9 +180,6 @@ class DF_SYSTEM(CardDF):
EF_0348_COUNT(),
EF_GP_COUNT(),
EF_GP_DIV_DATA(),
EF_HTTPS_CFG(),
EF_HTTPS_KEYS(),
EF_HTTPS_POLL(),
]
self.add_files(files)

View File

@@ -200,7 +200,7 @@ class LinkBase(abc.ABC):
# It *was* successful after all -- the extra pieces FETCH handled
# need not concern the caller.
rv = (rv[0], '9000')
# proactive sim as per TS 102 221 Setion 7.4.2
# proactive sim as per TS 102 221 Section 7.4.2
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
# Setting this in case we later decide not to send a terminal
@@ -228,7 +228,7 @@ class LinkBase(abc.ABC):
# Structure as per TS 102 223 V4.4.0 Section 6.8
# Testing hint: The value of tail does not influence the behavior
# of an SJA2 that sent ans SMS, so this is implemented only
# of an SJA2 that sent an SMS, so this is implemented only
# following TS 102 223, and not fully tested.
ti_list_bin = [x.to_tlv() for x in ti_list]
tail = b''.join(ti_list_bin)
@@ -333,13 +333,11 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
from pySim.transport.pcsc import PcscSimLink
from pySim.transport.modem_atcmd import ModemATCommandLink
from pySim.transport.calypso import CalypsoSimLink
from pySim.transport.wsrc import WsrcSimLink
SerialSimLink.argparse_add_reader_args(arg_parser)
PcscSimLink.argparse_add_reader_args(arg_parser)
ModemATCommandLink.argparse_add_reader_args(arg_parser)
CalypsoSimLink.argparse_add_reader_args(arg_parser)
WsrcSimLink.argparse_add_reader_args(arg_parser)
arg_parser.add_argument('--apdu-trace', action='store_true',
help='Trace the command/response APDUs exchanged with the card')
@@ -362,9 +360,6 @@ def init_reader(opts, **kwargs) -> LinkBase:
elif opts.modem_dev is not None:
from pySim.transport.modem_atcmd import ModemATCommandLink
sl = ModemATCommandLink(opts, **kwargs)
elif opts.wsrc_server_url is not None:
from pySim.transport.wsrc import WsrcSimLink
sl = WsrcSimLink(opts, **kwargs)
else: # Serial reader is default
print("No reader/driver specified; falling back to default (Serial reader)")
from pySim.transport.serial import SerialSimLink

View File

@@ -1,101 +0,0 @@
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
from typing import Optional
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
from pySim.transport import LinkBase
from pySim.utils import ResTuple
from pySim.wsrc.client_blocking import WsClientBlocking
class UserWsClientBlocking(WsClientBlocking):
def perform_outbound_hello(self):
hello_data = {
'client_type': 'user',
}
super().perform_outbound_hello(hello_data)
def select_card(self, id_type:str, id_str:str):
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
'select_card_ack')
return rx
def reset_card(self):
self.transceive_json('reset_req', {}, 'reset_resp')
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
return rx['response'], rx['sw']
class WsrcSimLink(LinkBase):
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
name = 'WSRC'
def __init__(self, opts: argparse.Namespace, **kwargs):
super().__init__(**kwargs)
self.identities = {}
self.server_url = opts.wsrc_server_url
if opts.wsrc_eid:
self.id_type = 'eid'
self.id_str = opts.wsrc_eid
elif opts.wsrc_iccid:
self.id_type = 'iccid'
self.id_str = opts.wsrc_iccid
self.client = UserWsClientBlocking(self.server_url)
self.client.connect()
def __del__(self):
# FIXME: disconnect from server
pass
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
self.connect()
def connect(self):
rx = self.client.select_card(self.id_type, self.id_str)
self.identitites = rx['identities']
def get_atr(self) -> Hexstr:
return self.identities['atr']
def disconnect(self):
self.__delete__()
def _reset_card(self):
self.client.reset_card()
return 1
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
return self.client.xceive_apdu_raw(pdu)
def __str__(self) -> str:
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
@staticmethod
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
can be accessed via a network.""")
wsrc_group.add_argument('--wsrc-server-url',
help='URI of the WSRC server to connect to')
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
help='ICCID of the card to open via WSRC')
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
help='EID of the card to open via WSRC')

View File

@@ -750,7 +750,7 @@ class EF_ARR(LinFixedEF):
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
def do_read_arr_record(self, opts):
"""Read one EF.ARR record in flattened, human-friendly form."""
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
(data, _sw) = self._cmd.lchan.read_record_dec(opts.RECORD_NR)
data = self._cmd.lchan.selected_file.flatten(data)
self._cmd.poutput_json(data, opts.oneline)

View File

@@ -208,7 +208,7 @@ EF_5G_PROSE_ST_map = {
5: '5G ProSe configuration data for usage information reporting',
}
# Mapping between USIM Enbled Service Number and its description
# Mapping between USIM Enabled Service Number and its description
EF_EST_map = {
1: 'Fixed Dialling Numbers (FDN)',
2: 'Barred Dialling Numbers (BDN)',
@@ -389,7 +389,10 @@ class EF_SUCI_Calc_Info(TransparentEF):
# remaining data holds Home Network Public Key Data Object
hpkl = EF_SUCI_Calc_Info.HnetPubkeyList()
hpkl.from_tlv(in_bytes[pos:])
hnet_pubkey_list = self._compact_pubkey_list(hpkl.to_dict()['hnet_pubkey_list'])
hnet_pubkey_list = []
if hpkl.to_dict()['hnet_pubkey_list']:
hnet_pubkey_list = self._compact_pubkey_list(hpkl.to_dict()['hnet_pubkey_list'])
return {
'prot_scheme_id_list': prot_scheme_id_list,

View File

@@ -119,7 +119,7 @@ class EF_AC_GBAUAPI(LinFixedEF):
"""The use of this EF is eescribed in 3GPP TS 31.130"""
class AppletNafAccessControl(BER_TLV_IE, tag=0x80):
# the use of Int8ub as length field in Prefixed is strictly speaking incorrect, as it is a BER-TLV
# length field whihc will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
# length field which will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
# safely be assumed shorter than that
_construct = Struct('aid'/Prefixed(Int8ub, GreedyBytes),
'naf_id'/Prefixed(Int8ub, GreedyBytes))

View File

@@ -1007,7 +1007,7 @@ class EF_ICCID(TransparentEF):
def _encode_hex(self, abstract, **kwargs):
return enc_iccid(abstract['iccid'])
# TS 102 221 Section 13.3 / TS 31.101 Secction 13 / TS 51.011 Section 10.1.2
# TS 102 221 Section 13.3 / TS 31.101 Section 13 / TS 51.011 Section 10.1.2
class EF_PL(TransRecEF):
_test_de_encode = [
( '6465', "de" ),

View File

@@ -15,6 +15,7 @@ from osmocom.tlv import bertlv_encode_tag, bertlv_encode_len
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
# Copyright (C) 2009-2022 Ludovic Rousseau
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -330,7 +331,7 @@ def derive_mnc(digit1: int, digit2: int, digit3: int = 0x0f) -> int:
mnc = 0
# 3-rd digit is optional for the MNC. If present
# the algorythm is the same as for the MCC.
# the algorithm is the same as for the MCC.
if digit3 != 0x0f:
return derive_mcc(digit1, digit2, digit3)
@@ -410,7 +411,7 @@ def get_addr_type(addr):
fqdn_flag = True
for i in addr_list:
# Only Alpha-numeric characters and hyphen - RFC 1035
# Only Alphanumeric characters and hyphen - RFC 1035
import re
if not re.match("^[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)?$", i):
fqdn_flag = False
@@ -476,7 +477,7 @@ def expand_hex(hexstring, length):
"""Expand a given hexstring to a specified length by replacing "." or ".."
with a filler that is derived from the neighboring nibbles respective
bytes. Usually this will be the nibble respective byte before "." or
"..", execpt when the string begins with "." or "..", then the nibble
"..", except when the string begins with "." or "..", then the nibble
respective byte after "." or ".." is used.". In case the string cannot
be expanded for some reason, the input string is returned unmodified.
@@ -585,10 +586,138 @@ def parse_command_apdu(apdu: bytes) -> int:
raise ValueError('invalid APDU (%s), too short!' % b2h(apdu))
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
def normalizeATR(atr):
"""Transform an ATR in list of integers.
valid input formats are
"3B A7 00 40 18 80 65 A2 08 01 01 52"
"3B:A7:00:40:18:80:65:A2:08:01:01:52"
Args:
atr: string
Returns:
list of bytes
>>> normalize("3B:A7:00:40:18:80:65:A2:08:01:01:52")
[59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82]
"""
atr = atr.replace(":", "")
atr = atr.replace(" ", "")
res = []
while len(atr) >= 2:
byte, atr = atr[:2], atr[2:]
res.append(byte)
if len(atr) > 0:
raise ValueError("warning: odd string, remainder: %r" % atr)
atr = [int(x, 16) for x in res]
return atr
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
def decomposeATR(atr_txt):
"""Decompose the ATR in elementary fields
Args:
atr_txt: ATR as a hex bytes string
Returns:
dictionary of field and values
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
{ 'T0': {'value': 167},
'TB': {1: {'value': 0}},
'TC': {2: {'value': 24}},
'TD': {1: {'value': 64}},
'TS': {'value': 59},
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
'hbn': 7}
"""
ATR_PROTOCOL_TYPE_T0 = 0
atr_txt = normalizeATR(atr_txt)
atr = {}
# the ATR itself as a list of integers
atr["atr"] = atr_txt
# store TS and T0
atr["TS"] = {"value": atr_txt[0]}
TDi = atr_txt[1]
atr["T0"] = {"value": TDi}
hb_length = TDi & 15
pointer = 1
# protocol number
pn = 1
# store number of historical bytes
atr["hbn"] = TDi & 0xF
while pointer < len(atr_txt):
# Check TAi is present
if (TDi | 0xEF) == 0xFF:
pointer += 1
if "TA" not in atr:
atr["TA"] = {}
atr["TA"][pn] = {"value": atr_txt[pointer]}
# Check TBi is present
if (TDi | 0xDF) == 0xFF:
pointer += 1
if "TB" not in atr:
atr["TB"] = {}
atr["TB"][pn] = {"value": atr_txt[pointer]}
# Check TCi is present
if (TDi | 0xBF) == 0xFF:
pointer += 1
if "TC" not in atr:
atr["TC"] = {}
atr["TC"][pn] = {"value": atr_txt[pointer]}
# Check TDi is present
if (TDi | 0x7F) == 0xFF:
pointer += 1
if "TD" not in atr:
atr["TD"] = {}
TDi = atr_txt[pointer]
atr["TD"][pn] = {"value": TDi}
if (TDi & 0x0F) != ATR_PROTOCOL_TYPE_T0:
atr["TCK"] = True
pn += 1
else:
break
# Store historical bytes
atr["hb"] = {"value": atr_txt[pointer + 1 : pointer + 1 + hb_length]}
# Store TCK
last = pointer + 1 + hb_length
if "TCK" in atr:
try:
atr["TCK"] = {"value": atr_txt[last]}
except IndexError:
atr["TCK"] = {"value": -1}
last += 1
if len(atr_txt) > last:
atr["extra"] = atr_txt[last:]
if len(atr["hb"]["value"]) < hb_length:
missing = hb_length - len(atr["hb"]["value"])
if missing > 1:
(t1, t2) = ("s", "are")
else:
(t1, t2) = ("", "is")
atr["warning"] = "ATR is truncated: %d byte%s %s missing" % (missing, t1, t2)
return atr
class DataObject(abc.ABC):
"""A DataObject (DO) in the sense of ISO 7816-4. Contrary to 'normal' TLVs where one
simply has any number of different TLVs that may occur in any order at any point, ISO 7816
has the habit of specifying TLV data but with very spcific ordering, or specific choices of
has the habit of specifying TLV data but with very specific ordering, or specific choices of
tags at specific points in a stream. This class tries to represent this."""
def __init__(self, name: str, desc: Optional[str] = None, tag: Optional[int] = None):
@@ -710,7 +839,7 @@ class TL0_DataObject(DataObject):
class DataObjectCollection:
"""A DataObjectCollection consits of multiple Data Objects identified by their tags.
"""A DataObjectCollection consists of multiple Data Objects identified by their tags.
A given encoded DO may contain any of them in any order, and may contain multiple instances
of each DO."""

View File

View File

@@ -1,65 +0,0 @@
"""Connect smartcard to a remote server, so the remote server can take control and
perform commands on it."""
import abc
import json
import logging
from typing import Optional
from websockets.sync.client import connect
from osmocom.utils import b2h
logger = logging.getLogger(__name__)
class WsClientBlocking(abc.ABC):
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
def __init__(self, ws_uri: str):
self.ws_uri = ws_uri
self.ws = None
def connect(self):
self.ws = connect(uri=self.ws_uri)
self.perform_outbound_hello()
def tx_json(self, msg_type: str, d: dict = {}):
"""JSON-Encode and transmit a message to the given websocket."""
d['msg_type'] = msg_type
d_js = json.dumps(d)
logger.debug("Tx: %s", d_js)
self.ws.send(d_js)
def tx_error(self, message: str):
event = {
'message': message,
}
self.tx_json('error', event)
def rx_json(self):
"""Receive a single message from the given websocket and JSON-decode it."""
rx = self.ws.recv()
rx_js = json.loads(rx)
logger.debug("Rx: %s", rx_js)
assert 'msg_type' in rx
return rx_js
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
self.tx_json(tx_msg_type, tx_d)
rx = self.rx_json()
assert rx['msg_type'] == rx_msg_type
return rx
def perform_outbound_hello(self, tx: dict = {}):
self.tx_json('hello', tx)
rx = self.rx_json()
assert rx['msg_type'] == 'hello_ack'
return rx
def rx_and_execute_cmd(self):
"""Receve and dispatch/execute a single command from the server."""
rx = self.rx_json()
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
if handler:
handler(rx)
else:
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])

View File

@@ -1,3 +1,6 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.pylint.main]
ignored-classes = ["twisted.internet.reactor"]

View File

@@ -1,7 +1,7 @@
pyscard
pyserial
pytlv
cmd2>=1.5
cmd2>=2.6.2,<3.0
jsonpath-ng
construct>=2.10.70
bidict
@@ -15,4 +15,3 @@ git+https://github.com/osmocom/asn1tools
packaging
git+https://github.com/hologram-io/smpp.pdu
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
smpplib

View File

@@ -21,7 +21,7 @@ setup(
"pyscard",
"pyserial",
"pytlv",
"cmd2 >= 1.5.0",
"cmd2 >= 1.5.0, < 3.0",
"jsonpath-ng",
"construct >= 2.10.70",
"bidict",
@@ -49,4 +49,12 @@ setup(
'asn1/saip/*.asn',
],
},
extras_require={
"smdpp": [
"klein",
"service-identity",
"pyopenssl",
"requests",
]
},
)

View File

@@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICgzCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI
zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n
MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV
HQ8BAf8EBAMCB4AwIAYDVR0lAQH/BBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMBQG
A1UdIAQNMAswCQYHZ4ESAQIBAzAdBgNVHQ4EFgQUPTMJg/OfzFvS5K1ophmnR0iu
i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ
dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG
JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0
cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0gA
MEUCIQCfaGcMk+kuSJsbIyRPWttwWNftwQdHCQuu346PaiA2FAIgUrqhPw2um9gV
C+eWHaXio7WQh5L6VgLZzNifTQcldD4=
-----END CERTIFICATE-----

View File

@@ -1,7 +1,7 @@
-----BEGIN CERTIFICATE-----
MIICgjCCAiigAwIBAgIBCjAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
MIICgjCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
SVQwHhcNMjUwNDIzMTUyMzA1WhcNMzUwNDIxMTUyMzA1WjAzMQ0wCwYDVQQKDARB
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
@@ -11,6 +11,6 @@ qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSAAw
RQIgYakBZP6y9/2iu9aZkgb89SQgfRb0TKVMGElWgtyoX2gCIQCT8o/TkAxhWCTY
yaBDMi1L9Ub+93ef5s+S+eHLmwuudA==
RQIhAL+1lp/hGsj87/5RqOX2u3hS/VSftDN7EPrHJJFnTXLRAiBVxemKIKmC7+W1
+RsTY5I51R+Cyoq4l5TEU49eplo5bw==
-----END CERTIFICATE-----

View File

@@ -1,142 +0,0 @@
#!/usr/bin/env python3
import logging
import sys
from pprint import pprint as pp
from pySim.ota import OtaKeyset, OtaDialectSms
from pySim.utils import b2h, h2b
import smpplib.gsm
import smpplib.client
import smpplib.consts
logger = logging.getLogger(__name__)
# if you want to know what's happening
logging.basicConfig(level='DEBUG')
class Foo:
def smpp_rx_handler(self, pdu):
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
if pdu.short_message:
try:
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
except ValueError:
spi = self.spi.copy()
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
pp(dec)
return None
def __init__(self):
# Two parts, UCS2, SMS with UDH
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
# Print when obtain message_id
client.set_message_sent_handler(
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
#client.set_message_received_handler(
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
client.set_message_received_handler(self.smpp_rx_handler)
client.connect()
client.bind_transceiver(system_id='test', password='test')
self.client = client
if False:
KIC1 = h2b('000102030405060708090a0b0c0d0e0f')
KID1 = h2b('101112131415161718191a1b1c1d1e1f')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
if False:
KIC1 = h2b('4BE2D58A1FA7233DD723B3C70996E6E6')
KID1 = h2b('4a664208eba091d32c4ecbc299da1f34')
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
#KIC3 = h2b('A4074D8E1FE69B484A7E62682ED09B51')
#KID3 = h2b('41FF1033910112DB4EBEBB7807F939CD')
#self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
# algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
#self.tar = h2b('B00011') # USIM RFM
self.tar = h2b('000000') # RAM
if False: # sysmoEUICC1-C2G
KIC1 = h2b('B52F9C5938D1C19ED73E1AE772937FD7')
KID1 = h2b('3BC696ACD1EEC95A6624F7330D22FC81')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
if False: # TS.48 profile
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
KID1 = h2b('112233445566778899aabbccddeeff10')
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
self.tar = h2b('000000') # ISD-P according to FIXME
if False: # TS.48 profile AES
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
KID1 = h2b('112233445566778899aabbccddeeff10')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2, kic=KIC1,
algo_auth='aes_cmac', kid_idx=2, kid=KID1)
self.tar = h2b('000000') # ISD-P according to FIXME
if True: # eSIM profile
KIC1 = h2b('207c5d2c1aa80d58cd8f542fb9ef2f80')
KID1 = h2b('312367f1681902fd67d9a71c62a840e3')
#KIC1 = h2b('B6B0E130EEE1C9A4A29DAEC034D35C9E')
#KID1 = h2b('889C51CD2A381A9D4EDDA9224C24A9AF')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
self.ota_keyset.cntr = 7
self.tar = h2b('b00001') # ADF.USIM
self.ota_dialect = OtaDialectSms()
self.spi = {'counter':'counter_must_be_higher', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
def tx_sms_tpdu(self, tpdu: bytes):
self.client.send_message(
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure it is a byte string, not unicode:
source_addr='12',
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure thease two params are byte strings, not unicode:
destination_addr='23',
short_message=tpdu,
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
protocol_id=0x7f,
#registered_delivery=True,
)
def tx_c_apdu(self, apdu: bytes):
logger.info("C-APDU: %s" % b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
self.tx_sms_tpdu(tpdu)
f = Foo()
print("initialized")
#f.tx_c_apdu(h2b('80a40400023f00'))
#f.tx_c_apdu(h2b('80EC010100'))
#f.tx_c_apdu(h2b('80EC0101' + '0E' + '350103' + '390203e8' + '3e052101020304'))
f.tx_c_apdu(h2b('00a40004026f07' + '00b0000009'))
f.client.listen()

View File

@@ -2,7 +2,7 @@ Detected UICC Add-on "SIM"
Detected UICC Add-on "GSM-R"
Detected UICC Add-on "RUIM"
Can't read AIDs from SIM -- 'list' object has no attribute 'lower'
warning: EF.DIR seems to be empty!
EF.DIR seems to be empty!
ADF.ECASD: a0000005591010ffffffff8900000200
ADF.ISD-R: a0000005591010ffffffff8900000100
ISIM: a0000000871004

View File

@@ -0,0 +1,5 @@
"card_type_id","formfactor_id","imsi","iccid","pin1","puk1","pin2","puk2","ki","adm1","adm2","proprietary","kic1","kic2","kic3","kid1","kid2","kid3","kik1","kik2","kik3","msisdn","acc","opc"
"myCardType","3FF","901700000000001","8988211000000000001","1234","12345678","1223","12345678","AAAAAAAAAAA5435425AAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 01","BBBBBBBBBB3324BBBBBBBB21212BBBBB","CC7654CCCCCCCCCCCCCCCCCCCCCCCCCC","DDDD90DDDDDDDDDDDDDDDDDD767DDDDD","EEEEEE567657567567EEEEEEEEEEEEEE","FFFFFFFFFFFFFFFFFFF56765765FFFFF","11111567811111111111111111111111","22222222222222222227669999222222","33333333333333333333333333333333","44444444444444445234544444444444","55555555555","0001","66666666666666666666666666666666"
"myCardType","3FF","901700000000002","8988211000000000002","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAA3425AAAA","10101010","9999999999999999","proprietary data 02","BBBBBB421BBBBBBBBBB12BBBBBBBBBBB","CCCCCCCCCC3456CCCCCCCCCCCCCCCCCC","DDDDDDDDD567657DDDD2DDDDDDDDDDDD","EEEEEEEE56756EEEEEEEEE567657EEEE","FFFFF567657FFFFFFFFFFFFFFFFFFFFF","11111111111146113433411576511111","22222222222223432225765222222222","33333333333333523453333333333333","44425435234444444444446544444444","55555555555","0001","66666666666666266666666666666666"
"myCardType","3FF","901700000000003","8988211000000000003","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 03","BBBBBBB45678BBBB756765BBBBBBBBBB","CCCCCCCCCCCCCC76543CCCC56765CCCC","DDDDDDDDDDDDDDDDDD5676575DDDDDDD","EEEEEEEEEEEEEEEEEE56765EEEEEEEEE","FFFFFFFFFFFFFFF567657FFFFFFFFFFF","11111111119876511111111111111111","22222222222444422222222222576522","33333332543333576733333333333333","44444444444567657567444444444444","55555555555","0001","66666675676575666666666666666666"
1 card_type_id formfactor_id imsi iccid pin1 puk1 pin2 puk2 ki adm1 adm2 proprietary kic1 kic2 kic3 kid1 kid2 kid3 kik1 kik2 kik3 msisdn acc opc
2 myCardType 3FF 901700000000001 8988211000000000001 1234 12345678 1223 12345678 AAAAAAAAAAA5435425AAAAAAAAAAAAAA 10101010 9999999999999999 proprietary data 01 BBBBBBBBBB3324BBBBBBBB21212BBBBB CC7654CCCCCCCCCCCCCCCCCCCCCCCCCC DDDD90DDDDDDDDDDDDDDDDDD767DDDDD EEEEEE567657567567EEEEEEEEEEEEEE FFFFFFFFFFFFFFFFFFF56765765FFFFF 11111567811111111111111111111111 22222222222222222227669999222222 33333333333333333333333333333333 44444444444444445234544444444444 55555555555 0001 66666666666666666666666666666666
3 myCardType 3FF 901700000000002 8988211000000000002 1234 12345678 1223 12345678 AAAAAAAAAAAAAAAAAAAAAAAA3425AAAA 10101010 9999999999999999 proprietary data 02 BBBBBB421BBBBBBBBBB12BBBBBBBBBBB CCCCCCCCCC3456CCCCCCCCCCCCCCCCCC DDDDDDDDD567657DDDD2DDDDDDDDDDDD EEEEEEEE56756EEEEEEEEE567657EEEE FFFFF567657FFFFFFFFFFFFFFFFFFFFF 11111111111146113433411576511111 22222222222223432225765222222222 33333333333333523453333333333333 44425435234444444444446544444444 55555555555 0001 66666666666666266666666666666666
4 myCardType 3FF 901700000000003 8988211000000000003 1234 12345678 1223 12345678 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA 10101010 9999999999999999 proprietary data 03 BBBBBBB45678BBBB756765BBBBBBBBBB CCCCCCCCCCCCCC76543CCCC56765CCCC DDDDDDDDDDDDDDDDDD5676575DDDDDDD EEEEEEEEEEEEEEEEEE56765EEEEEEEEE FFFFFFFFFFFFFFF567657FFFFFFFFFFF 11111111119876511111111111111111 22222222222444422222222222576522 33333332543333576733333333333333 44444444444567657567444444444444 55555555555 0001 66666675676575666666666666666666

View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
import unittest
import os
from pySim.card_key_provider import *
class TestCardKeyProviderCsv(unittest.TestCase):
def __init__(self, *args, **kwargs):
column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
"OPC" : "000102030405065545645645645D0E0F",
"KIC1" : "06410203546406456456450B0C0D0E0F",
"KID1" : "00040267840507667609045645645E0F",
"KIK1" : "0001020307687607668678678C0D0E0F",
"KIC2" : "000142457594860706090A0B0688678F",
"KID2" : "600102030405649468690A0B0C0D648F",
"KIK2" : "00010203330506070496330B08640E0F",
"KIC3" : "000104030405064684686A068C0D0E0F",
"KID3" : "00010243048468070809060B0C0D0E0F",
"KIK3" : "00010204040506070809488B0C0D0E0F"}
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
card_key_provider_register(CardKeyProviderCsv(csv_file_path, column_keys))
super().__init__(*args, **kwargs)
def test_card_key_provider_get(self):
test_data = [{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
'KI': '48a6d5f60567d45299e3ba08594009e7', 'ADM1': '10101010',
'ADM2': '9999999999999999', 'KIC1': '3eb8567fa0b4b1e63bcab13bff5f2702',
'KIC2': 'fd6c173a5b3f04b563808da24237fb46',
'KIC3': '66c8c848e5dff69d70689d155d44f323',
'KID1': 'd78accce870332dced467c173244dd94',
'KID2': 'b3bf050969747b2d2c9389e127a3d791',
'KID3': '40a77deb50d260b3041bbde1b5040625',
'KIK1': '451b503239d818ea34421aa9c2a8887a',
'KIK2': '967716f5fca8ae179f87f76524d1ae6b',
'KIK3': '0884db5eee5409a00fc1bbc57ac52541',
'OPC': '81817574c1961dd272ad080eb2caf279'}, 'ICCID' :"8988211000000000001"},
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
'KI': 'e94d7fa6fb92375dae86744ff6ecef49', 'ADM1': '10101010',
'ADM2': '9999999999999999', 'KIC1': '79b4e39387c66253da68f653381ded44',
'KIC2': '560561b5dba89c1da8d1920049e5e4f7',
'KIC3': '79ff35e84e39305a119af8c79f84e8e5',
'KID1': '233baf89122159553d67545ecedcf8e0',
'KID2': '8fc2874164d7a8e40d72c968bc894ab8',
'KID3': '2e3320f0dda85054d261be920fbfa065',
'KIK1': 'd51b1b17630103d1672a3e9e0e4827ed',
'KIK2': 'd01edbc48be555139506b0d7982bf7ff',
'KIK3': 'a6487a5170849e8e0a03026afea91f5a',
'OPC': '6b0d19ef28bd12f2daac31828d426939'}, 'ICCID' :"8988211000000000002"},
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
'KI': '3cdec1552ef433a89f327905213c5a6e', 'ADM1': '10101010',
'ADM2': '9999999999999999', 'KIC1': '72986b13ce505e12653ad42df5cfca13',
'KIC2': '8f0d1e58b01e833773e5562c4940674d',
'KIC3': '9c72ba5a14d54f489edbffd3d8802f03',
'KID1': 'd23a42995df9ca83f74b2cfd22695526',
'KID2': '5c3a189d12aa1ac6614883d7de5e6c8c',
'KID3': 'a6ace0d303a2b38a96b418ab83c16725',
'KIK1': 'bf2319467d859c12527aa598430caef2',
'KIK2': '6a4c459934bea7e40787976b8881ab01',
'KIK3': '91cd02c38b5f68a98cc90a1f2299538f',
'OPC': '6df46814b1697daca003da23808bbbc3'}, 'ICCID' :"8988211000000000003"}]
for t in test_data:
result = card_key_provider_get(["PIN1","PUK1","PIN2","PUK2","KI","ADM1","ADM2","KIC1",
"KIC2","KIC3","KID1","KID2","KID3","KIK1","KIK2","KIK3","OPC"],
"ICCID", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
result = card_key_provider_get(["PIN1","puk1","PIN2","PUK2","KI","adm1","ADM2","KIC1",
"KIC2","kic3","KID1","KID2","KID3","kik1","KIK2","KIK3","OPC"],
"iccid", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
def test_card_key_provider_get_field(self):
test_data = [{'EXPECTED' : "3eb8567fa0b4b1e63bcab13bff5f2702", 'ICCID' :"8988211000000000001"},
{'EXPECTED' : "79b4e39387c66253da68f653381ded44", 'ICCID' :"8988211000000000002"},
{'EXPECTED' : "72986b13ce505e12653ad42df5cfca13", 'ICCID' :"8988211000000000003"}]
for t in test_data:
result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
for t in test_data:
result = card_key_provider_get_field("kic1", "iccid", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED'))
class TestCardKeyFieldCryptor(unittest.TestCase):
def __init__(self, *args, **kwargs):
transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
"OPC" : "000102030405065545645645645D0E0F",
"KIC1" : "06410203546406456456450B0C0D0E0F",
"UICC_SCP03" : "00040267840507667609045645645E0F"}
self.crypt = CardKeyFieldCryptor(transport_keys)
super().__init__(*args, **kwargs)
def test_encrypt_field(self):
test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "OPC"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIC1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KID1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIK1"},
{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "opc"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kic1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kid1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kik1"}]
for t in test_data:
result = self.crypt.encrypt_field(t.get('FIELDNAME'), t.get('PLAINTEXT_VAL'))
self.assertEqual(result, t.get('EXPECTED'))
def test_decrypt_field(self):
test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "OPC"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIC1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KID1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIK1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "opc"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kic1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kid1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kik1"}]
for t in test_data:
result = self.crypt.decrypt_field(t.get('FIELDNAME'), t.get('ENCRYPTED_VAL'))
self.assertEqual(result, t.get('EXPECTED'))
if __name__ == "__main__":
unittest.main()

View File

@@ -90,34 +90,5 @@ class OidTest(unittest.TestCase):
self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0'))
self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1'))
class NonMatchTest(unittest.TestCase):
def test_nonmatch(self):
# non-matches before, in between and after matches
match_list = [Match(a=10, b=10, size=5), Match(a=20, b=20, size=4)]
nm_list = NonMatch.from_matchlist(match_list, 26)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=10), NonMatch(a=15, b=15, size=5),
NonMatch(a=24, b=24, size=2)])
def test_nonmatch_beg(self):
# single match at beginning
match_list = [Match(a=0, b=0, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 20)
self.assertEqual(nm_list, [NonMatch(a=5, b=5, size=15)])
def test_nonmatch_end(self):
# single match at end
match_list = [Match(a=19, b=19, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=19)])
def test_nonmatch_none(self):
# no match at all
match_list = []
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=24)])
if __name__ == "__main__":
unittest.main()

121
tests/unittests/test_log.py Executable file
View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
# (C) 2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier <pmaier@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import logging
from pySim.log import PySimLogger
import io
import sys
from inspect import currentframe, getframeinfo
log = PySimLogger.get("TEST")
TEST_MSG_DEBUG = "this is a debug message"
TEST_MSG_INFO = "this is an info message"
TEST_MSG_WARNING = "this is a warning message"
TEST_MSG_ERROR = "this is an error message"
TEST_MSG_CRITICAL = "this is a critical message"
expected_message = None
class PySimLogger_Test(unittest.TestCase):
def __test_01_safe_defaults_one(self, callback, message:str):
# When log messages are sent to an unconfigured PySimLogger class, we expect the unmodified message being
# logged to stdout, just as if it were printed via a normal print() statement.
log_output = io.StringIO()
sys.stdout = log_output
callback(message)
assert(log_output.getvalue().strip() == message)
sys.stdout = sys.__stdout__
def test_01_safe_defaults(self):
# When log messages are sent to an unconfigured PySimLogger class, we expect that all messages are logged,
# regardless of the logging level.
self.__test_01_safe_defaults_one(log.debug, TEST_MSG_DEBUG)
self.__test_01_safe_defaults_one(log.info, TEST_MSG_INFO)
self.__test_01_safe_defaults_one(log.warning, TEST_MSG_WARNING)
self.__test_01_safe_defaults_one(log.error, TEST_MSG_ERROR)
self.__test_01_safe_defaults_one(log.critical, TEST_MSG_CRITICAL)
@staticmethod
def _test_print_callback(message):
assert(message.strip() == expected_message)
def test_02_normal(self):
# When the PySimLogger is set up with its default values, we expect formatted log messages on all logging
# levels.
global expected_message
PySimLogger.setup(self._test_print_callback)
expected_message = "DEBUG: " + TEST_MSG_DEBUG
log.debug(TEST_MSG_DEBUG)
expected_message = "INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO)
expected_message = "WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING)
expected_message = "ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR)
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL)
def test_03_verbose(self):
# When the PySimLogger is set up with its default values, we expect verbose formatted log messages on all
# logging levels.
global expected_message
PySimLogger.setup(self._test_print_callback)
PySimLogger.set_verbose(True)
frame = currentframe()
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
log.debug(TEST_MSG_DEBUG)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR)
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL)
def test_04_level(self):
# When the PySimLogger is set up with its default values, we expect formatted log messages but since we will
# limit the log level to INFO, we should not see any messages of level DEBUG
global expected_message
PySimLogger.setup(self._test_print_callback)
PySimLogger.set_level(logging.INFO)
# We test this in non verbose mode, this will also confirm that disabeling the verbose mode works.
PySimLogger.set_verbose(False)
# Debug messages should not appear
expected_message = None
log.debug(TEST_MSG_DEBUG)
# All other messages should appear normally
expected_message = "INFO: " + TEST_MSG_INFO
log.info(TEST_MSG_INFO)
expected_message = "WARNING: " + TEST_MSG_WARNING
log.warning(TEST_MSG_WARNING)
expected_message = "ERROR: " + TEST_MSG_ERROR
log.error(TEST_MSG_ERROR)
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
log.critical(TEST_MSG_CRITICAL)
if __name__ == '__main__':
unittest.main()

View File

@@ -16,16 +16,6 @@ SPI_CC_POR_CIPHERED_CC = {
'por': 'por_required'
}
SPI_CC_CTR_POR_UNCIPHERED_CC = {
'counter':'counter_must_be_higher',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'cc',
'por': 'por_required'
}
SPI_CC_POR_UNCIPHERED_CC = {
'counter':'no_counter',
'ciphering':True,
@@ -46,17 +36,6 @@ SPI_CC_POR_UNCIPHERED_NOCC = {
'por': 'por_required'
}
SPI_CC_UNCIPHERED = {
'counter':'no_counter',
'ciphering':False,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'no_rc_cc_ds',
'por': 'por_required'
}
######################################################################
# old-style code-driven test (lots of code copy+paste)
######################################################################
@@ -95,48 +74,6 @@ class Test_SMS_AES128(unittest.TestCase):
self.assertEqual(b2h(dec_tar), b2h(self.tar))
self.assertEqual(dec_spi, spi)
def test_open_channel(self):
spi = self.spi_base
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('000102030405060708090a0b0c0d0e0f'),
kid=h2b('101112131415161718191a1b1c1d1e1f'))
tpdu = h2b('40048111227ff6407070611535007e070003000201700000781516011212000001ccda206e8b0d46304247bf00bfdc9853eed2a826f9af8dc7c2974ce2cb9bb55cc1a8577e047cc8f5d450380ba86b25354fe69f58884f671d7ace0c911f7c74830dc1d58b62cce4934568697ba1f577eecbca26c5dbfa32b0e2f0877948a9fb46a122e4214947386f467de11c')
#'40048111227ff6407070611535000a0500030002027b6abed3'
submit = SMS_DELIVER.from_bytes(tpdu)
submit.tp_udhi = True
print(submit)
#print("UD: %s" % b2h(submit.tp_ud))
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
print("UDHD: %s" % udhd)
print("DATA: %s" % b2h(data))
tpdu2 = h2b('40048111227ff6407070611535000a0500030002027b6abed3')
submit2 = SMS_DELIVER.from_bytes(tpdu2)
print(submit2)
udhd2, data2 = UserDataHeader.from_bytes(submit2.tp_ud)
print("UDHD: %s" % udhd2)
print("DATA: %s" % b2h(data2))
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data + data2)
print(b2h(dec_tar), b2h(dec_apdu))
def test_open_channel2(self):
spi = self.spi_base
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('000102030405060708090a0b0c0d0e0f'),
kid=h2b('101112131415161718191a1b1c1d1e1f'))
tpdu = h2b('40048111227ff6407070611535004d02700000481516011212000001fe4c0943aea42e45021c078ae06c66afc09303608874b72f58bacadb0dcf665c29349c799fbb522e61709c9baf1890015e8e8e196e36153106c8b92f95153774')
submit = SMS_DELIVER.from_bytes(tpdu)
submit.tp_udhi = True
print(submit)
#print("UD: %s" % b2h(submit.tp_ud))
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
print("UDHD: %s" % udhd)
print("DATA: %s" % b2h(data))
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data)
print(b2h(dec_tar), b2h(dec_apdu))
class Test_SMS_3DES(unittest.TestCase):
tar = h2b('b00000')
@@ -241,13 +178,6 @@ OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
kic=h2b('200102030405060708090a0b0c0d0e0f'),
kid=h2b('201102030405060708090a0b0c0d0e0f'))
OTA_KEYSET_eSIM_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('207c5d2c1aa80d58cd8f542fb9ef2f80'),
kid=h2b('312367f1681902fd67d9a71c62a840e3'))
OTA_KEYSET_eSIM_AES128.cntr = 2
class OtaTestCase(unittest.TestCase):
def __init__(self, methodName='runTest', **kwargs):
super().__init__(methodName, **kwargs)
@@ -255,7 +185,6 @@ class OtaTestCase(unittest.TestCase):
# SIM RFM: B00010
# USIM RFM: B00011
self.tar = h2b('B00011')
self.tar = h2b('B00000')
class SmsOtaTestCase(OtaTestCase):
# Array describing the input/output data for the tests. We use the
@@ -264,102 +193,70 @@ class SmsOtaTestCase(OtaTestCase):
# manually writing one class per test.
testdatasets = [
{
# 'name': '3DES-SJA5-CIPHERED-CC',
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
# 'spi': SPI_CC_POR_CIPHERED_CC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
# },
# 'response': {
# 'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
# 'name': '3DES-SJA5-UNCIPHERED-CC',
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
# 'spi': SPI_CC_POR_UNCIPHERED_CC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
# },
# 'response': {
# 'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
# 'name': '3DES-SJA5-UNCIPHERED-NOCC',
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
# 'spi': SPI_CC_POR_UNCIPHERED_NOCC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
# },
# 'response': {
# 'encoded_resp': '027100000e0ab0001100000000000000016132',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
# 'name': 'AES128-SJA5-CIPHERED-CC',
# 'ota_keyset': OTA_KEYSET_SJA5_AES128,
# 'spi': SPI_CC_POR_CIPHERED_CC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
# },
# 'response': {
# 'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
'name': 'AES128-eSIM-UNCIPHERED-CC',
'ota_keyset': OTA_KEYSET_eSIM_AES128,
'spi': SPI_CC_UNCIPHERED,
'name': '3DES-SJA5-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_CIPHERED_CC,
'request': {
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
'encoded_cmd': '1502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
'encoded_tpdu': '40048111227FF6407070611535002702700000221502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
},
'response': {
'encoded_resp': '027100000B0AB000000000000000000A9000',
'response_status': 'insufficient_security_level',
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
}
}, {
'name': 'AES128-eSIM-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_eSIM_AES128,
'spi': SPI_CC_CTR_POR_UNCIPHERED_CC,
'name': '3DES-SJA5-UNCIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_CC,
'request': {
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
'encoded_cmd': '00281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
'encoded_tpdu': '40048111227FF6407070611535002D02700000281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
},
'response': {
'encoded_resp': '027100001C12B00000000000000200007DB7086951F7DB55AB0780010123026D009000',
#'encoded_resp': '027100000b0ab0000100000000000006'
#'encoded_resp': '027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c'
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
}
}, {
'name': '3DES-SJA5-UNCIPHERED-NOCC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
},
'response': {
'encoded_resp': '027100000e0ab0001100000000000000016132',
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
}
}, {
'name': 'AES128-SJA5-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_AES128,
'spi': SPI_CC_POR_CIPHERED_CC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
},
'response': {
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
}
},
# TODO: AES192
# TODO: AES256
]
@@ -375,7 +272,7 @@ class SmsOtaTestCase(OtaTestCase):
kset = t['ota_keyset']
outp = self.dialect.encode_cmd(kset, self.tar, t['spi'], apdu=t['request']['apdu'])
#print("result: %s" % b2h(outp))
self.assertEqual(b2h(outp), t['request']['encoded_cmd'].lower())
self.assertEqual(b2h(outp), t['request']['encoded_cmd'])
with_udh = b'\x02\x70\x00' + outp
#print("with_udh: %s" % b2h(with_udh))
@@ -384,7 +281,7 @@ class SmsOtaTestCase(OtaTestCase):
tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
#print("TPDU: %s" % tpdu)
#print("tpdu: %s" % b2h(tpdu.to_bytes()))
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'].lower())
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'])
# also test decoder
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(kset, outp)
@@ -399,12 +296,9 @@ class SmsOtaTestCase(OtaTestCase):
r, d = self.dialect.decode_resp(kset, t['spi'], t['response']['encoded_resp'])
#print("RESP: %s / %s" % (r, d))
self.assertEqual(r.response_status, t['response']['response_status'])
if 'number_of_commands' in t['response']:
self.assertEqual(d.number_of_commands, t['response']['number_of_commands'])
if 'last_status_word' in t['response']:
self.assertEqual(d.last_status_word, t['response']['last_status_word'])
if 'last_response_data' in t['response']:
self.assertEqual(d.last_response_data, t['response']['last_response_data'])
self.assertEqual(d.number_of_commands, t['response']['number_of_commands'])
self.assertEqual(d.last_status_word, t['response']['last_status_word'])
self.assertEqual(d.last_response_data, t['response']['last_response_data'])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,301 +0,0 @@
#!/usr/bin/env python3
#
# This program receive APDUs via the VPCD protocol of Frank Morgner's
# virtualsmartcard, encrypts them with OTA (over the air) keys and
# forwards them via SMPP to a SMSC (SMS service centre).
#
# In other words, you can use it as a poor man's OTA server, to enable
# you to use unmodified application software with PC/SC support to talk
# securely via OTA with a remote SMS card.
#
# This is very much a work in progress at this point.
#######################################################################
# twisted VPCD Library
#######################################################################
import logging
import struct
import abc
from typing import Union, Optional
from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from pySim.utils import b2h, h2b
logger = logging.getLogger(__name__)
class VirtualCard(abc.ABC):
"""Abstract base class for a virtual smart card."""
def __init__(self, atr: Union[str, bytes]):
if isinstance(atr, str):
atr = h2b(atr)
self.atr = atr
@abc.abstractmethod
def power_change(self, new_state: bool):
"""Power the card on or off."""
pass
@abc.abstractmethod
def reset(self):
"""Reset the card."""
pass
@abc.abstractmethod
def rx_c_apdu(self, apdu: bytes):
"""Receive a C-APDU from the reader/application."""
pass
def tx_r_apdu(self, apdu: Union[str, bytes]):
if isinstance(apdu, str):
apdu = h2b(apdu)
logger.info("R-APDU: %s" % b2h(apdu))
self.protocol.send_data(apdu)
class VpcdProtocolBase(Protocol):
# Prefixed couldn't be used as the this.length wouldn't be available in this case
construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)),
'data'/If(this.length > 1, Bytes(this.length)),
'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4)))
def __init__(self, vcard: VirtualCard):
self.recvBuffer = b''
self.connectionCorrupted = False
self.pduReadTimer = None
self.pduReadTimerSecs = 10
self.callLater = reactor.callLater
self.on = False
self.vcard = vcard
self.vcard.protocol = self
def dataReceived(self, data: bytes):
"""entry point where twisted tells us data was received."""
#logger.debug('Data received: %s' % b2h(data))
self.recvBuffer = self.recvBuffer + data
while True:
if self.connectionCorrupted:
return
msg = self.readMessage()
if msg is None:
break
self.endPDURead()
self.rawMessageReceived(msg)
if len(self.recvBuffer) > 0:
self.incompletePDURead()
def incompletePDURead(self):
"""We have an incomplete PDU in readBuffer, schedule pduReadTimer"""
if self.pduReadTimer and self.pduReadTimer.active():
return
self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout)
def endPDURead(self):
"""We completed reading a PDU, cancel the pduReadTimer."""
if self.pduReadTimer and self.pduReadTimer.active():
self.pduReadTimer.cancel()
def readMessage(self) -> Optional[bytes]:
"""read an entire [raw] message."""
pduLen = self._getMessageLength()
if pduLen is None:
return None
return self._getMessage(pduLen)
def _getMessageLength(self) -> Optional[int]:
if len(self.recvBuffer) < 2:
return None
return struct.unpack('!H', self.recvBuffer[:2])[0]
def _getMessage(self, pduLen: int) -> Optional[bytes]:
if len(self.recvBuffer) < pduLen+2:
return None
message = self.recvBuffer[:pduLen+2]
self.recvBuffer = self.recvBuffer[pduLen+2:]
return message
def onPDUReadTimeout(self):
logger.error('PDU read timed out. Buffer is now considered corrupt')
#self.coruptDataReceived
def rawMessageReceived(self, message: bytes):
"""Called once a complete binary vpcd message has been received."""
pdu = None
try:
pdu = VpcdProtocolBase.construct.parse(message)
except Exception as e:
logger.exception(e)
logger.critical('Received corrupt PDU %s' % b2h(message))
#self.corupDataRecvd()
else:
self.PDUReceived(pdu)
def PDUReceived(self, pdu):
logger.debug("Rx PDU: %s" % pdu)
if pdu['data']:
return self.on_rx_data(pdu)
else:
method = getattr(self, 'on_rx_' + pdu['ctrl'])
return method(pdu)
def on_rx_atr(self, pdu):
self.send_data(self.vcard.atr)
def on_rx_on(self, pdu):
if self.on:
return
else:
self.on = True
self.vcard.power_change(self.on)
def on_rx_reset(self, pdu):
self.vcard.reset()
def on_rx_off(self, pdu):
if not self.on:
return
else:
self.on = False
self.vcard.power_change(self.on)
def on_rx_data(self, pdu):
self.vcard.rx_c_apdu(pdu['data'])
def send_pdu(self, pdu):
logger.debug("Sending PDU: %s" % pdu)
encoded = VpcdProtocolBase.construct.build(pdu)
#logger.debug("Sending binary: %s" % b2h(encoded))
self.transport.write(encoded)
def send_data(self, data: Union[str, bytes]):
if isinstance(data, str):
data = h2b(data)
return self.send_pdu({'length': 0, 'ctrl': '', 'data': data})
def send_ctrl(self, ctrl: str):
return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''})
class VpcdProtocolClient(VpcdProtocolBase):
pass
class VpcdClientFactory(ReconnectingClientFactory):
def __init__(self, vcard_class: VirtualCard):
self.vcard_class = vcard_class
def startedConnecting(self, connector):
logger.debug('Started to connect')
def buildProtocol(self, addr):
logger.info('Connection established to %s' % addr)
self.resetDelay()
return VpcdProtocolClient(vcard = self.vcard_class())
def clientConnectionLost(self, connector, reason):
logger.warning('Connection lost (reason: %s)' % reason)
super().clientConnectionLost(connector, reason)
def clientConnectionFailed(self, connector, reason):
logger.warning('Connection failed (reason: %s)' % reason)
super().clientConnectionFailed(connector, reason)
#######################################################################
# Application
#######################################################################
from pprint import pprint as pp
from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator
from twisted.internet import reactor
from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService
from smpp.twisted.protocol import SMPPClientProtocol
from smpp.twisted.config import SMPPClientConfig
from smpp.pdu.operations import SubmitSM, DeliverSM
from smpp.pdu import pdu_types
from pySim.ota import OtaKeyset, OtaDialectSms
from pySim.utils import b2h, h2b
class MyVcard(VirtualCard):
def __init__(self, **kwargs):
super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs)
self.smpp_client = None
# KIC1 + KID1 of 8988211000000467285
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
KID3 = h2b('12110C78E678C25408233076AA033615')
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
self.ota_dialect = OtaDialectSms()
self.tar = h2b('B00011')
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
def ensure_smpp(self):
config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test')
if self.smpp_client:
return
self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp)
smpp = self.smpp_client.connectAndBind()
#self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp)
#d = self.smpp.connectTCP(config.host, config.port)
#d = self.smpp.connectAndBind()
#d.addCallback(self.forwardToClient, self.smpp)
def power_change(self, new_state: bool):
if new_state:
logger.info("POWER ON")
self.ensure_smpp()
else:
logger.info("POWER OFF")
def reset(self):
logger.info("RESET")
def rx_c_apdu(self, apdu: bytes):
pp(self.smpp_client.smpp)
logger.info("C-APDU: %s" % b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
self.tx_sms_tpdu(tpdu)
#self.tx_r_apdu('9000')
def tx_sms_tpdu(self, tpdu: bytes):
"""Send a SMS TPDU via SMPP SubmitSM."""
dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class,
protocol_id=0x7f, short_message=tpdu)
self.smpp_client.smpp.sendDataRequest(submit)
def handleSmpp(self, smpp, pdu):
#logger.info("Received SMPP %s" % pdu)
data = pdu.params['short_message']
#logger.info("Received SMS Data %s" % b2h(data))
r, d = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data)
logger.info("Decoded SMPP %s" % r)
self.tx_r_apdu(r['last_response_data'] + r['last_status_word'])
if __name__ == '__main__':
import logging
logger = logging.getLogger(__name__)
import colorlog
log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
colorlog.basicConfig(level=logging.INFO, format = log_format)
logger = colorlog.getLogger()
from twisted.internet import reactor
host = 'localhost'
port = 35963
reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard))
reactor.run()