mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-04-10 15:01:05 +03:00
Compare commits
59 Commits
osmith/sai
...
neels/wip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d28cc512a | ||
|
|
53bf1f3501 | ||
|
|
8572181e41 | ||
|
|
7cadab94f8 | ||
|
|
ecfcf6057a | ||
|
|
7398327e1c | ||
|
|
b9edcb0fb7 | ||
|
|
292bf38942 | ||
|
|
745a60b63b | ||
|
|
f62ae7bd17 | ||
|
|
f9a53434ac | ||
|
|
a01eeec5c7 | ||
|
|
abde8db5e1 | ||
|
|
76eddbe4b8 | ||
|
|
19601a8d81 | ||
|
|
1c45cff351 | ||
|
|
e1beab83af | ||
|
|
be069ab63a | ||
|
|
67e695cedd | ||
|
|
37eea09c11 | ||
|
|
9e42ba6ba0 | ||
|
|
965ee38c05 | ||
|
|
ee03065663 | ||
|
|
8bf53239a1 | ||
|
|
f11ac56db1 | ||
|
|
93014f67ff | ||
|
|
c9786fa72e | ||
|
|
05cf68a4a4 | ||
|
|
5544a0f7c9 | ||
|
|
76d4ff8842 | ||
|
|
1d5f18a747 | ||
|
|
2ba685fdea | ||
|
|
9f18a0ff56 | ||
|
|
48022c94a0 | ||
|
|
07faf3aaa7 | ||
|
|
bd358a2621 | ||
|
|
2347b47e79 | ||
|
|
8fd3487f86 | ||
|
|
1f50f29546 | ||
|
|
f77a602139 | ||
|
|
1e0b3b35d4 | ||
|
|
ee0853a149 | ||
|
|
a5a5865c7c | ||
|
|
3752aeb94e | ||
|
|
914abe3309 | ||
|
|
84754b6ebb | ||
|
|
c47005d408 | ||
|
|
2dfaac6e4f | ||
|
|
a615ba5138 | ||
|
|
8ee10ab1a5 | ||
|
|
f10af30aed | ||
|
|
d8f3c78135 | ||
|
|
6b9b46a5a4 | ||
|
|
b6b4501e37 | ||
|
|
54658fa3a9 | ||
|
|
eb04bb1082 | ||
|
|
453fde5a3a | ||
|
|
57237b650e | ||
|
|
1f94791240 |
@@ -30,6 +30,48 @@ from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(Path(__file__).stem)
|
||||
|
||||
option_parser = argparse.ArgumentParser(description='Tool to send OTA SMS RFM/RAM messages via SMPP',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
option_parser.add_argument("--host", help="Host/IP of the SMPP server", default="localhost")
|
||||
option_parser.add_argument("--port", help="TCP port of the SMPP server", default=2775, type=int)
|
||||
option_parser.add_argument("--system-id", help="System ID to use to bind to the SMPP server", default="test")
|
||||
option_parser.add_argument("--password", help="Password to use to bind to the SMPP server", default="test")
|
||||
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
|
||||
algo_crypt_choices = []
|
||||
algo_crypt_classes = OtaAlgoCrypt.__subclasses__()
|
||||
for cls in algo_crypt_classes:
|
||||
algo_crypt_choices.append(cls.enum_name)
|
||||
option_parser.add_argument("--algo-crypt", choices=algo_crypt_choices, default='triple_des_cbc2',
|
||||
help="OTA crypt algorithm")
|
||||
algo_auth_choices = []
|
||||
algo_auth_classes = OtaAlgoAuth.__subclasses__()
|
||||
for cls in algo_auth_classes:
|
||||
algo_auth_choices.append(cls.enum_name)
|
||||
option_parser.add_argument("--algo-auth", choices=algo_auth_choices, default='triple_des_cbc2',
|
||||
help="OTA auth algorithm")
|
||||
option_parser.add_argument('--kic', required=True, type=is_hexstr, help='OTA key (KIC)')
|
||||
option_parser.add_argument('--kic-idx', default=1, type=int, help='OTA key index (KIC)')
|
||||
option_parser.add_argument('--kid', required=True, type=is_hexstr, help='OTA key (KID)')
|
||||
option_parser.add_argument('--kid-idx', default=1, type=int, help='OTA key index (KID)')
|
||||
option_parser.add_argument('--cntr', default=0, type=int, help='replay protection counter')
|
||||
option_parser.add_argument('--tar', required=True, type=is_hexstr, help='Toolkit Application Reference')
|
||||
option_parser.add_argument("--cntr-req", choices=CNTR_REQ.decmapping.values(), default='no_counter',
|
||||
help="Counter requirement")
|
||||
option_parser.add_argument('--no-ciphering', action='store_true', default=False, help='Disable ciphering')
|
||||
option_parser.add_argument("--rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="message check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument('--por-in-submit', action='store_true', default=False,
|
||||
help='require PoR to be sent via SMS-SUBMIT')
|
||||
option_parser.add_argument('--por-no-ciphering', action='store_true', default=False, help='Disable ciphering (PoR)')
|
||||
option_parser.add_argument("--por-rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="PoR check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument("--por-req", choices=POR_REQ.decmapping.values(), default='por_required',
|
||||
help="Proof of Receipt requirements")
|
||||
option_parser.add_argument('--src-addr', default='12', type=str, help='SMS source address (MSISDN)')
|
||||
option_parser.add_argument('--dest-addr', default='23', type=str, help='SMS destination address (MSISDN)')
|
||||
option_parser.add_argument('--timeout', default=10, type=int, help='Maximum response waiting time')
|
||||
option_parser.add_argument('-a', '--apdu', action='append', required=True, type=is_hexstr, help='C-APDU to send')
|
||||
|
||||
class SmppHandler:
|
||||
client = None
|
||||
|
||||
@@ -141,7 +183,7 @@ class SmppHandler:
|
||||
tuple containing the last response data and the last status word as byte strings
|
||||
"""
|
||||
|
||||
logger.info("C-APDU sending: %s..." % b2h(apdu))
|
||||
logger.info("C-APDU sending: %s...", b2h(apdu))
|
||||
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
@@ -167,65 +209,28 @@ class SmppHandler:
|
||||
return h2b(resp), h2b(sw)
|
||||
|
||||
if __name__ == '__main__':
|
||||
option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
option_parser.add_argument("--host", help="Host/IP of the SMPP server", default="localhost")
|
||||
option_parser.add_argument("--port", help="TCP port of the SMPP server", default=2775, type=int)
|
||||
option_parser.add_argument("--system-id", help="System ID to use to bind to the SMPP server", default="test")
|
||||
option_parser.add_argument("--password", help="Password to use to bind to the SMPP server", default="test")
|
||||
option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False)
|
||||
algo_crypt_choices = []
|
||||
algo_crypt_classes = OtaAlgoCrypt.__subclasses__()
|
||||
for cls in algo_crypt_classes:
|
||||
algo_crypt_choices.append(cls.enum_name)
|
||||
option_parser.add_argument("--algo-crypt", choices=algo_crypt_choices, default='triple_des_cbc2',
|
||||
help="OTA crypt algorithm")
|
||||
algo_auth_choices = []
|
||||
algo_auth_classes = OtaAlgoAuth.__subclasses__()
|
||||
for cls in algo_auth_classes:
|
||||
algo_auth_choices.append(cls.enum_name)
|
||||
option_parser.add_argument("--algo-auth", choices=algo_auth_choices, default='triple_des_cbc2',
|
||||
help="OTA auth algorithm")
|
||||
option_parser.add_argument('--kic', required=True, type=is_hexstr, help='OTA key (KIC)')
|
||||
option_parser.add_argument('--kic_idx', default=1, type=int, help='OTA key index (KIC)')
|
||||
option_parser.add_argument('--kid', required=True, type=is_hexstr, help='OTA key (KID)')
|
||||
option_parser.add_argument('--kid_idx', default=1, type=int, help='OTA key index (KID)')
|
||||
option_parser.add_argument('--cntr', default=0, type=int, help='replay protection counter')
|
||||
option_parser.add_argument('--tar', required=True, type=is_hexstr, help='Toolkit Application Reference')
|
||||
option_parser.add_argument("--cntr_req", choices=CNTR_REQ.decmapping.values(), default='no_counter',
|
||||
help="Counter requirement")
|
||||
option_parser.add_argument('--ciphering', default=True, type=bool, help='Enable ciphering')
|
||||
option_parser.add_argument("--rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="message check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument('--por-in-submit', default=False, type=bool,
|
||||
help='require PoR to be sent via SMS-SUBMIT')
|
||||
option_parser.add_argument('--por-shall-be-ciphered', default=True, type=bool, help='require encrypted PoR')
|
||||
option_parser.add_argument("--por-rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="PoR check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument("--por_req", choices=POR_REQ.decmapping.values(), default='por_required',
|
||||
help="Proof of Receipt requirements")
|
||||
option_parser.add_argument('--src-addr', default='12', type=str, help='TODO')
|
||||
option_parser.add_argument('--dest-addr', default='23', type=str, help='TODO')
|
||||
option_parser.add_argument('--timeout', default=10, type=int, help='TODO')
|
||||
option_parser.add_argument('-a', '--apdu', action='append', required=True, type=is_hexstr, help='C-APDU to send')
|
||||
opts = option_parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO,
|
||||
format='%(asctime)s %(levelname)s %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S')
|
||||
|
||||
if opts.kic_idx != opts.kid_idx:
|
||||
logger.warning("KIC index (%s) and KID index (%s) are different (security violation, card should reject message)",
|
||||
opts.kic_idx, opts.kid_idx)
|
||||
|
||||
ota_keyset = OtaKeyset(algo_crypt=opts.algo_crypt,
|
||||
kic_idx=opts.kic_idx,
|
||||
kic=h2b(opts.kic),
|
||||
algo_auth=opts.algo_auth,
|
||||
kid_idx=opts.kic_idx,
|
||||
kid_idx=opts.kid_idx,
|
||||
kid=h2b(opts.kid),
|
||||
cntr=opts.cntr)
|
||||
spi = {'counter' : opts.cntr_req,
|
||||
'ciphering' : opts.ciphering,
|
||||
'ciphering' : not opts.no_ciphering,
|
||||
'rc_cc_ds': opts.rc_cc_ds,
|
||||
'por_in_submit':opts.por_in_submit,
|
||||
'por_shall_be_ciphered':opts.por_shall_be_ciphered,
|
||||
'por_in_submit': opts.por_in_submit,
|
||||
'por_shall_be_ciphered': not opts.por_no_ciphering,
|
||||
'por_rc_cc_ds': opts.por_rc_cc_ds,
|
||||
'por': opts.por_req}
|
||||
apdu = h2b("".join(opts.apdu))
|
||||
|
||||
@@ -48,6 +48,7 @@ pySim consists of several parts:
|
||||
sim-rest
|
||||
suci-keytool
|
||||
saip-tool
|
||||
smpp-ota-tool
|
||||
|
||||
|
||||
Indices and tables
|
||||
|
||||
179
docs/smpp-ota-tool.rst
Normal file
179
docs/smpp-ota-tool.rst
Normal file
@@ -0,0 +1,179 @@
|
||||
smpp-ota-tool
|
||||
=============
|
||||
|
||||
The `smpp-ota-tool` allows users to send OTA SMS messages containing APDU scripts (RFM, RAM) via an SMPP server. The
|
||||
intended audience are developers who want to test/evaluate the OTA SMS interface of a SIM/UICC/eUICC. `smpp-ota-tool`
|
||||
is intended to be used as a companion tool for :ref:`pySim-smpp2sim`, however it should be usable on any other SMPP
|
||||
server (such as a production SMSC of a live cellular network) as well.
|
||||
|
||||
From the technical perspective `smpp-ota-tool` takes the role of an SMPP ESME. It takes care of the encoding, encryption
|
||||
and checksumming (signing) of the RFM/RAM OTA SMS and eventually submits it to the SMPP server. The program then waits
|
||||
for a response. The response is automatically parsed and printed on stdout. This makes the program also suitable to be
|
||||
called from shell scripts.
|
||||
|
||||
.. note:: In the following we will we will refer to `SIM` as one of the following: `SIM`, `USIM`, `ISIM`, `UICC`,
|
||||
`eUICC`, `eSIM`.
|
||||
|
||||
Applying OTA keys
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
Depending on the `SIM` type you will receive one or more sets of keys which you can use to communicate with the `SIM`
|
||||
through a secure channel protocol. When using the OTA SMS method, the SCP80 protocol is used and it therefore crucial
|
||||
to use a keyset that is actually suitable for SCP80.
|
||||
|
||||
A keyset usually consists of three keys:
|
||||
|
||||
#. KIC: the key used for ciphering (encryption/decryption)
|
||||
#. KID: the key used to compute a cryptographic checksum (signing)
|
||||
#. KIK: the key used to encrypt/decrypt key material (key rotation, adding of new keys)
|
||||
|
||||
From the transport security perspective, only KIC and KID are relevant. The KIK (also referenced as "Data Encryption
|
||||
Key", DEK) is only used when keys are rotated or new keys are added (see also ETSI TS 102 226, section 8.2.1.5).
|
||||
|
||||
When the keyset is programmed into the security domain of the `SIM`, it is tied to a specific cryptographic algorithm
|
||||
(3DES, AES128 or AES256) and a so called Key Version Number (KVN). The term "Key Version Number" is misleading, since
|
||||
it is actually not a version number. It is a unique identifier of a certain keyset which also identifies for which
|
||||
secure channel protocol the keyset may be used. Keysets with a KVN from 1-15 (``0x01``-``0x0F``) are suitable for SCP80.
|
||||
This means that it is not only important to know just the KIC/KID/KIK keys. Also the related algorithms and the KVN
|
||||
numbers must be known.
|
||||
|
||||
.. note:: SCP80 keysets typically start counting from 1 upwards. Typical configurations use a set of 3 keysets with
|
||||
KVN numbers 1-3.
|
||||
|
||||
Addressing an Application
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
When communicating with a specific application on a `SIM` via SCP80, it is important to address that application with
|
||||
the correct parameters. The following two parameters must be known in advance:
|
||||
|
||||
#. TAR: The Toolkit Application Reference (TAR) number is a three byte value that uniquely addresses an application
|
||||
on the `SIM`. The exact values may vary (see also ETSI TS 101 220, Table D.1).
|
||||
#. MSL: The Minimum Security Level (MSL) is a bit-field that dictates which of the security measures encoded in the
|
||||
SPI are mandatory (see also ETSI TS 102 225, section 5.1.1).
|
||||
|
||||
A practical example
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. note:: This tutorial assumes that pySim-smpp2sim is running on the local machine with its default parameters.
|
||||
See also :ref:`pySim-smpp2sim`.
|
||||
|
||||
Let's assume that an OTA SMS shall be sent to the SIM RFM application of an sysmoISIM-SJA2. What we want to do is to
|
||||
select DF.GSM and to get the select response back.
|
||||
|
||||
We have received the following key material from the `SIM` vendor:
|
||||
|
||||
::
|
||||
|
||||
KIC1: F09C43EE1A0391665CC9F05AF4E0BD10
|
||||
KID1: 01981F4A20999F62AF99988007BAF6CA
|
||||
KIK1: 8F8AEE5CDCC5D361368BC45673D99195
|
||||
KIC2: 01022916E945B656FDE03F806A105FA2
|
||||
KID2: D326CB69F160333CC5BD1495D448EFD6
|
||||
KIK2: 08037E0590DFE049D4975FFB8652F625
|
||||
KIC3: 2B22824D0D27A3A1CEEC512B312082B4
|
||||
KID3: F1697766925A11F4458295590137B672
|
||||
KIK3: C7EE69B2C5A1C8E160DD36A38EB517B3
|
||||
|
||||
Those are three keysets. The enumeration is directly equal to the KVN used. All three keysets are 3DES keys, which
|
||||
means triple_des_cbc2 is the correct algorithm to use.
|
||||
|
||||
.. note:: The key set configuration can be confirmed by retrieving the key configuration using
|
||||
`get_data key_information` from within an SCP02 session on ADF.ISD.
|
||||
|
||||
In this example we intend to address the SIM RFM application on the `SIM`. Which according to the manual has TAR ``B00010``
|
||||
and MSL ``0x06``. When we hold ``0x06`` = ``0b00000110`` against the SPI coding chart (see also ETSI TS 102 225,
|
||||
section 5.1.1). We can deduct that Ciphering and Cryptographic Checksum are mandatory.
|
||||
|
||||
.. note:: The MSL (see also ETSI TS 102 226, section 6.1) is assigned to an application by the `SIM` issuer. It is a
|
||||
custom decision and may vary with different `SIM` types/profiles. In the case of sysmoISIM-SJS1/SJA2/SJA5 the
|
||||
counter requirement has been waived to simplify lab/research type use. In productive environments, `SIM`
|
||||
applications should ideally use an MSL that makes the counter mandatory.
|
||||
|
||||
In order to select DF.GSM (``0x7F20``) and to retrieve the select response, two APDUs are needed. The first APDU is the
|
||||
select command ``A0A40000027F20`` and the second is the related get-response command ``A0C0000016``. Those APDUs will be
|
||||
concatenated and are sent in a single message. The message containing the concatenated APDUs works as a script that
|
||||
is received by the SIM RFM application and then executed. This method poses some limitations that have to be taken into
|
||||
account when making requests like this (see also ETSI TS 102 226, section 5).
|
||||
|
||||
With this information we may now construct a commandline for `smpp-ota-tool.py`. We will pass the KVN as kid_idx and
|
||||
kic_idx (see also ETSI TS 102 225, Table 2, fields `KIc` and `KID`). Both index values should refer to the same
|
||||
keyset/KVN as keysets should not be mixed. (`smpp-ota-tool` still provides separate parameters anyway to allow testing
|
||||
with invalid keyset combinations)
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016
|
||||
2026-02-26 17:13:56 INFO Connecting to localhost:2775...
|
||||
2026-02-26 17:13:56 INFO C-APDU sending: a0a40000027f20a0c0000016...
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU sending: 02700000281506191515b00010da1d6cbbd0d11ce4330d844c7408340943e843f67a6d7b0674730881605fd62d...
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU sent, waiting for response...
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU received: 027100002c12b000107ddf58d1780f771638b3975759f4296cf5c31efc87a16a1b61921426baa16da1b5ba1a9951d59a39
|
||||
2026-02-26 17:13:56 INFO SMS-TPDU decoded: (Container(rpl=44, rhl=18, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00\x00', pcntr=0, response_status=uEnumIntegerString.new(0, 'por_ok'), cc_rc=b'\x8f\xea\xf5.\xf4\x0e\xc2\x14', secured_data=b'\x02\x90\x00\x00\x00\xff\xff\x7f \x02\x00\x00\x00\x00\x00\t\xb1\x065\x04\x00\x83\x8a\x83\x8a'), Container(number_of_commands=2, last_status_word=u'9000', last_response_data=u'0000ffff7f2002000000000009b106350400838a838a'))
|
||||
2026-02-26 17:13:56 INFO R-APDU received: 0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
2026-02-26 17:13:56 INFO Disconnecting...
|
||||
|
||||
The result we see is the select response of DF.GSM and a status word indicating that the last command has been
|
||||
processed normally.
|
||||
|
||||
As we can see, this mechanism now allows us to perform small administrative tasks remotely. We can read the contents of
|
||||
files remotely or make changes to files. Depending on the changes we make, there may be security issues arising from
|
||||
replay attacks. With the commandline above, the communication is encrypted and protected by a cryptographic checksum,
|
||||
so an adversary can neither read, nor alter the message. However, an adversary could still replay an intercepted
|
||||
message and the `SIM` would happily execute the contained APDUs again.
|
||||
|
||||
To prevent this, we may include a replay protection counter within the message. In this case, the MSL indicates that a
|
||||
replay protection counter is not required. However, to extended the security of our messages, we may chose to use a
|
||||
counter anyway. In the following example, we will encode a counter value of 100. We will instruct the `SIM` to make sure
|
||||
that the value we send is higher than the counter value that is currently stored in the `SIM`.
|
||||
|
||||
To add a replay connection counter we add the commandline arguments `--cntr-req` to set the counter requirement and
|
||||
`--cntr` to pass the counter value.
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016 --cntr-req counter_must_be_higher --cntr 100
|
||||
2026-02-26 17:16:39 INFO Connecting to localhost:2775...
|
||||
2026-02-26 17:16:39 INFO C-APDU sending: a0a40000027f20a0c0000016...
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU sending: 02700000281516191515b000103a4f599e94f2b5dcfbbda984761b7977df6514c57a580fb4844787c436d2eade...
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU sent, waiting for response...
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU received: 027100002c12b0001049fb0315f6c6401b553867f412cefaf9355b38271178edb342a3bc9cc7e670cdc1f45eea6ffcbb39
|
||||
2026-02-26 17:16:39 INFO SMS-TPDU decoded: (Container(rpl=44, rhl=18, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00d', pcntr=0, response_status=uEnumIntegerString.new(0, 'por_ok'), cc_rc=b'\xa9/\xc7\xc9\x00"\xab5', secured_data=b'\x02\x90\x00\x00\x00\xff\xff\x7f \x02\x00\x00\x00\x00\x00\t\xb1\x065\x04\x00\x83\x8a\x83\x8a'), Container(number_of_commands=2, last_status_word=u'9000', last_response_data=u'0000ffff7f2002000000000009b106350400838a838a'))
|
||||
2026-02-26 17:16:39 INFO R-APDU received: 0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
0000ffff7f2002000000000009b106350400838a838a 9000
|
||||
2026-02-26 17:16:39 INFO Disconnecting...
|
||||
|
||||
The `SIM` has accepted the message. The message got processed and the `SIM` has set its internal to 100. As an experiment,
|
||||
we may try to re-use the counter value:
|
||||
|
||||
::
|
||||
|
||||
$ PYTHONPATH=./ ./contrib/smpp-ota-tool.py --kic F09C43EE1A0391665CC9F05AF4E0BD10 --kid 01981F4A20999F62AF99988107BAF6CA --kid_idx 1 --kic_idx 1 --algo-crypt triple_des_cbc2 --algo-auth triple_des_cbc2 --tar B00010 --apdu A0A40000027F20 --apdu A0C0000016 --cntr-req counter_must_be_higher --cntr 100
|
||||
2026-02-26 17:16:43 INFO Connecting to localhost:2775...
|
||||
2026-02-26 17:16:43 INFO C-APDU sending: a0a40000027f20a0c0000016...
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU sending: 02700000281516191515b000103a4f599e94f2b5dcfbbda984761b7977df6514c57a580fb4844787c436d2eade...
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU sent, waiting for response...
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU received: 027100000b0ab0001000000000000006
|
||||
2026-02-26 17:16:43 INFO SMS-TPDU decoded: (Container(rpl=11, rhl=10, tar=b'\xb0\x00\x10', cntr=b'\x00\x00\x00\x00\x00', pcntr=0, response_status=uEnumIntegerString.new(6, 'undefined_security_error'), cc_rc=b'', secured_data=b''), None)
|
||||
Traceback (most recent call last):
|
||||
File "/home/user/work/git_master/pysim/./contrib/smpp-ota-tool.py", line 238, in <module>
|
||||
resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr, opts.dest_addr, opts.timeout)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
File "/home/user/work/git_master/pysim/./contrib/smpp-ota-tool.py", line 162, in transceive_apdu
|
||||
raise ValueError("Response does not contain any last_response_data, no R-APDU received!")
|
||||
ValueError: Response does not contain any last_response_data, no R-APDU received!
|
||||
2026-02-26 17:16:43 INFO Disconnecting...
|
||||
|
||||
As we can see, the `SIM` has rejected the message with an `undefined_security_error`. The replay-protection-counter
|
||||
ensures that a message can only be sent once.
|
||||
|
||||
.. note:: The replay-protection-counter is implemented as a 5 byte integer value (see also ETSI TS 102 225, Table 3).
|
||||
When the counter has reached its maximum, it will not overflow nor can it be reset.
|
||||
|
||||
smpp-ota-tool syntax
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. argparse::
|
||||
:module: contrib.smpp-ota-tool
|
||||
:func: option_parser
|
||||
:prog: contrib/smpp-ota-tool.py
|
||||
@@ -55,3 +55,5 @@ And once your external program is sending SMS to the simulated SMSC, it will log
|
||||
SMSPPDownload(DeviceIdentities({'source_dev_id': 'network', 'dest_dev_id': 'uicc'}),Address({'ton_npi': 0, 'call_number': '0123456'}),SMS_TPDU({'tpdu': '400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d'}))
|
||||
INFO root: ENVELOPE: d147820283818604001032548b3b400290217ff6227052000000002d02700000281516191212b0000127fa28a5bac69d3c5e9df2c7155dfdde449c826b236215566530787b30e8be5d
|
||||
INFO root: SW 9000: 027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c
|
||||
|
||||
.. note:: for sending OTA SMS messages :ref:`smpp-ota-tool` may be used.
|
||||
|
||||
@@ -128,7 +128,7 @@ class EF_AD(TransparentEF):
|
||||
cell_test = 0x04
|
||||
|
||||
def __init__(self, fid='6f43', sfid=None, name='EF.AD',
|
||||
desc='Service Provider Name', size=(3, None), **kwargs):
|
||||
desc='Administrative Data', size=(3, None), **kwargs):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, **kwargs)
|
||||
self._construct = Struct(
|
||||
# Byte 1: Display Condition
|
||||
|
||||
@@ -16,12 +16,6 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import requests
|
||||
from klein import Klein
|
||||
from twisted.internet import defer, protocol, ssl, task, endpoints, reactor
|
||||
from twisted.internet.posixbase import PosixReactorBase
|
||||
from pathlib import Path
|
||||
from twisted.web.server import Site, Request
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import time
|
||||
@@ -129,12 +123,10 @@ class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||
class DownloadOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/downloadOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType
|
||||
}
|
||||
input_mandatory = ['header']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'iccid': param.Iccid,
|
||||
@@ -145,7 +137,6 @@ class DownloadOrder(Es2PlusApiFunction):
|
||||
class ConfirmOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/confirmOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
@@ -153,7 +144,7 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
'smdsAddress': param.SmdsAddress,
|
||||
'releaseFlag': param.ReleaseFlag,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'releaseFlag']
|
||||
input_mandatory = ['iccid', 'releaseFlag']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'eid': param.Eid,
|
||||
@@ -166,13 +157,12 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
class CancelOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/cancelOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
|
||||
}
|
||||
input_mandatory = ['header', 'finalProfileStatusIndicator', 'iccid']
|
||||
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -182,10 +172,9 @@ class CancelOrder(Es2PlusApiFunction):
|
||||
class ReleaseProfile(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/releaseProfile'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid']
|
||||
input_mandatory = ['iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -195,7 +184,6 @@ class ReleaseProfile(Es2PlusApiFunction):
|
||||
class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType,
|
||||
@@ -204,9 +192,10 @@ class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
'notificationPointStatus': param.NotificationPointStatus,
|
||||
'resultData': param.ResultData,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
expected_http_status = 204
|
||||
|
||||
|
||||
class Es2pApiClient:
|
||||
"""Main class representing a full ES2+ API client. Has one method for each API function."""
|
||||
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
|
||||
@@ -217,17 +206,18 @@ class Es2pApiClient:
|
||||
if client_cert:
|
||||
self.session.cert = client_cert
|
||||
|
||||
self.downloadOrder = JsonHttpApiClient(DownloadOrder(), url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = JsonHttpApiClient(ConfirmOrder(), url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = JsonHttpApiClient(CancelOrder(), url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = JsonHttpApiClient(ReleaseProfile(), url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = JsonHttpApiClient(HandleDownloadProgressInfo(), url_prefix, func_req_id, self.session)
|
||||
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
|
||||
|
||||
def _gen_func_id(self) -> str:
|
||||
"""Generate the next function call id."""
|
||||
self.func_id += 1
|
||||
return 'FCI-%u-%u' % (time.time(), self.func_id)
|
||||
|
||||
|
||||
def call_downloadOrder(self, data: dict) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(data, self._gen_func_id())
|
||||
@@ -247,116 +237,3 @@ class Es2pApiClient:
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())
|
||||
|
||||
class Es2pApiServerHandlerSmdpp(abc.ABC):
|
||||
"""ES2+ (SMDP+ side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_downloadOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_confirmOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_cancelOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_releaseProfile(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServerHandlerMno(abc.ABC):
|
||||
"""ES2+ (MNO side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServer(abc.ABC):
|
||||
"""Main class representing a full ES2+ API server. Has one method for each API function."""
|
||||
app = None
|
||||
|
||||
def __init__(self, port: int, interface: str, server_cert: str = None, client_cert_verify: str = None):
|
||||
logger.debug("HTTP SRV: starting ES2+ API server on %s:%s" % (interface, port))
|
||||
self.port = port
|
||||
self.interface = interface
|
||||
if server_cert:
|
||||
self.server_cert = ssl.PrivateCertificate.loadPEM(Path(server_cert).read_text())
|
||||
else:
|
||||
self.server_cert = None
|
||||
if client_cert_verify:
|
||||
self.client_cert_verify = ssl.Certificate.loadPEM(Path(client_cert_verify).read_text())
|
||||
else:
|
||||
self.client_cert_verify = None
|
||||
|
||||
def reactor(self, reactor: PosixReactorBase):
|
||||
logger.debug("HTTP SRV: listen on %s:%s" % (self.interface, self.port))
|
||||
if self.server_cert:
|
||||
if self.client_cert_verify:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(self.client_cert_verify),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenTCP(self.port, Site(self.app.resource()), interface=self.interface)
|
||||
return defer.Deferred()
|
||||
|
||||
class Es2pApiServerSmdpp(Es2pApiServer):
|
||||
"""ES2+ (SMDP+ side) API Server."""
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerSmdpp,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.downloadOrder = JsonHttpApiServer(DownloadOrder(), handler.call_downloadOrder)
|
||||
self.confirmOrder = JsonHttpApiServer(ConfirmOrder(), handler.call_confirmOrder)
|
||||
self.cancelOrder = JsonHttpApiServer(CancelOrder(), handler.call_cancelOrder)
|
||||
self.releaseProfile = JsonHttpApiServer(ReleaseProfile(), handler.call_releaseProfile)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(DownloadOrder.path)
|
||||
def call_downloadOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(request)
|
||||
|
||||
@app.route(ConfirmOrder.path)
|
||||
def call_confirmOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
return self.confirmOrder.call(request)
|
||||
|
||||
@app.route(CancelOrder.path)
|
||||
def call_cancelOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
return self.cancelOrder.call(request)
|
||||
|
||||
@app.route(ReleaseProfile.path)
|
||||
def call_releaseProfile(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
return self.releaseProfile.call(request)
|
||||
|
||||
class Es2pApiServerMno(Es2pApiServer):
|
||||
"""ES2+ (MNO side) API Server."""
|
||||
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerMno,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.handleDownloadProgressInfo = JsonHttpApiServer(HandleDownloadProgressInfo(),
|
||||
handler.call_handleDownloadProgressInfo)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(HandleDownloadProgressInfo.path)
|
||||
def call_handleDownloadProgressInfo(self, request: Request) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(request)
|
||||
|
||||
@@ -155,11 +155,11 @@ class Es9pApiClient:
|
||||
if server_cert_verify:
|
||||
self.session.verify = server_cert_verify
|
||||
|
||||
self.initiateAuthentication = JsonHttpApiClient(InitiateAuthentication(), url_prefix, '', self.session)
|
||||
self.authenticateClient = JsonHttpApiClient(AuthenticateClient(), url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = JsonHttpApiClient(GetBoundProfilePackage(), url_prefix, '', self.session)
|
||||
self.handleNotification = JsonHttpApiClient(HandleNotification(), url_prefix, '', self.session)
|
||||
self.cancelSession = JsonHttpApiClient(CancelSession(), url_prefix, '', self.session)
|
||||
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
|
||||
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
|
||||
self.handleNotification = HandleNotification(url_prefix, '', self.session)
|
||||
self.cancelSession = CancelSession(url_prefix, '', self.session)
|
||||
|
||||
def call_initiateAuthentication(self, data: dict) -> dict:
|
||||
return self.initiateAuthentication.call(data)
|
||||
|
||||
@@ -21,8 +21,6 @@ import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
import base64
|
||||
from twisted.web.server import Request
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
@@ -133,16 +131,6 @@ class JsonResponseHeader(ApiParam):
|
||||
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
|
||||
raise ValueError('Unknown/unspecified status "%s"' % status)
|
||||
|
||||
class JsonRequestHeader(ApiParam):
|
||||
"""SGP.22 section 6.5.1.3."""
|
||||
@classmethod
|
||||
def verify_decoded(cls, data):
|
||||
func_req_id = data.get('functionRequesterIdentifier')
|
||||
if not func_req_id:
|
||||
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
|
||||
func_call_id = data.get('functionCallIdentifier')
|
||||
if not func_call_id:
|
||||
raise ValueError('Missing mandatory functionCallIdentifier in header')
|
||||
|
||||
class HttpStatusError(Exception):
|
||||
pass
|
||||
@@ -173,118 +161,65 @@ class ApiError(Exception):
|
||||
|
||||
class JsonHttpApiFunction(abc.ABC):
|
||||
"""Base class for representing an HTTP[s] API Function."""
|
||||
# The below class variables are used to describe the properties of the API function. Derived classes are expected
|
||||
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
|
||||
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
|
||||
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
|
||||
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
|
||||
# prefix.
|
||||
# the below class variables are expected to be overridden in derived classes
|
||||
|
||||
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder')
|
||||
path = None
|
||||
|
||||
# dictionary of input parameters. key is parameter name, value is ApiParam class
|
||||
input_params = {}
|
||||
|
||||
# list of mandatory input parameters
|
||||
input_mandatory = []
|
||||
|
||||
# dictionary of output parameters. key is parameter name, value is ApiParam class
|
||||
output_params = {}
|
||||
|
||||
# list of mandatory output parameters (for successful response)
|
||||
output_mandatory = []
|
||||
|
||||
# list of mandatory output parameters (for failed response)
|
||||
output_mandatory_failed = []
|
||||
|
||||
# expected HTTP status code of the response
|
||||
expected_http_status = 200
|
||||
|
||||
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
|
||||
http_method = 'POST'
|
||||
|
||||
# additional custom HTTP headers (client requests)
|
||||
extra_http_req_headers = {}
|
||||
|
||||
# additional custom HTTP headers (server responses)
|
||||
extra_http_res_headers = {}
|
||||
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def __new__(cls, *args, role = 'legacy_client', **kwargs):
|
||||
"""
|
||||
Args:
|
||||
args: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
role: role ('server' or 'client') in which the JsonHttpApiFunction should be created.
|
||||
kwargs: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
"""
|
||||
|
||||
# Create a dictionary with the class attributes of this class (the properties listed above and the encode_
|
||||
# decode_ methods below). The dictionary will not include any dunder/magic methods
|
||||
cls_attr = {attr_name: getattr(cls, attr_name) for attr_name in dir(cls) if not attr_name.startswith('__')}
|
||||
|
||||
# Normal instantiation as JsonHttpApiFunction:
|
||||
if len(args) == 0 and len(kwargs) == 0:
|
||||
return type(cls.__name__, (abc.ABC,), cls_attr)()
|
||||
|
||||
# Instantiation as as JsonHttpApiFunction with a JsonHttpApiClient or JsonHttpApiServer base
|
||||
if role == 'legacy_client':
|
||||
# Deprecated: With the advent of the server role (JsonHttpApiServer) the API had to be changed. To maintain
|
||||
# compatibility with existing code (out-of-tree) the original behaviour and API interface and behaviour had
|
||||
# to be preserved. Already existing JsonHttpApiFunction definitions will still work and the related objects
|
||||
# may still be created on the original way: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session)
|
||||
logger.warning('implicit role (falling back to legacy JsonHttpApiClient) is deprecated, please specify role explcitly')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
result.legacy = True
|
||||
return result
|
||||
elif role == 'client':
|
||||
# Create a JsonHttpApiFunction in client role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='client')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
elif role == 'server':
|
||||
# Create a JsonHttpApiFunction in server role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='server')
|
||||
result = type(cls.__name__, (JsonHttpApiServer,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
else:
|
||||
raise ValueError('Invalid role \'%s\' specified' % role)
|
||||
|
||||
def encode_client(self, data: dict) -> dict:
|
||||
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for request body."""
|
||||
output = {}
|
||||
if func_call_id:
|
||||
output['header'] = {
|
||||
'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id
|
||||
}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
# pySim/esim/http_json_api.py:269:47: E1101: Instance of 'JsonHttpApiFunction' has no 'legacy' member (no-member)
|
||||
# pylint: disable=no-member
|
||||
if hasattr(self, 'legacy') and self.legacy:
|
||||
output[p] = JsonRequestHeader.encode(v)
|
||||
else:
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_client(self, data: dict) -> dict:
|
||||
def decode(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
if 'header' in self.output_params:
|
||||
# let's first do the header, it's special
|
||||
if not 'header' in data:
|
||||
raise ValueError('Mandatory output parameter "header" missing')
|
||||
hdr_class = self.output_params.get('header')
|
||||
output['header'] = hdr_class.decode(data['header'])
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
# we can only expect mandatory parameters to be present in case of successful execution
|
||||
for p in self.output_mandatory:
|
||||
if p == 'header':
|
||||
continue
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
@@ -296,171 +231,35 @@ class JsonHttpApiFunction(abc.ABC):
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
def encode_server(self, data: dict) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.output_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_server(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the request body."""
|
||||
output = {}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
class JsonHttpApiClient():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
|
||||
session: requests.Session):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
url_prefix : prefix to be put in front of the API function path (see JsonHttpApiFunction)
|
||||
func_req_id : function requestor id to use for requests
|
||||
session : session object (requests)
|
||||
"""
|
||||
self.api_func = api_func
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
|
||||
"""Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
|
||||
json-serializable dict. Output data is returned as json-deserialized dict."""
|
||||
|
||||
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
|
||||
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
|
||||
# field is checked by the encode_client method)
|
||||
if func_call_id:
|
||||
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_client(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
"""Make an API call to the HTTP API endpoint represented by this object.
|
||||
Input data is passed in `data` as json-serializable dict. Output data
|
||||
is returned as json-deserialized dict."""
|
||||
url = self.url_prefix + self.path
|
||||
encoded = json.dumps(self.encode(data, func_call_id))
|
||||
req_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
req_headers.update(self.api_func.extra_http_req_headers)
|
||||
req_headers.update(self.extra_http_req_headers)
|
||||
|
||||
# Perform HTTP request
|
||||
url = self.url_prefix + self.api_func.path
|
||||
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
|
||||
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
response = self.session.request(self.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
|
||||
logger.debug("HTTP RSP: %s" % (response.content))
|
||||
|
||||
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
|
||||
# SGP.22, section 6.5.1)
|
||||
if response.status_code != self.api_func.expected_http_status:
|
||||
if response.status_code != self.expected_http_status:
|
||||
raise HttpStatusError(response)
|
||||
if not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
raise HttpHeaderError(response)
|
||||
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
|
||||
raise HttpHeaderError(response)
|
||||
|
||||
# Decode response and return the result back to the caller
|
||||
if response.content:
|
||||
if response.headers.get('Content-Type').startswith('application/json'):
|
||||
output = self.api_func.decode_client(response.json())
|
||||
return self.decode(response.json())
|
||||
elif response.headers.get('Content-Type').startswith('text/plain;charset=UTF-8'):
|
||||
output = { 'data': response.content.decode('utf-8') }
|
||||
else:
|
||||
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
|
||||
return { 'data': response.content.decode('utf-8') }
|
||||
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
|
||||
|
||||
# In case the response contains a header, check it to make sure that the API call was executed successfully
|
||||
# (the presence of the header field is checked by the decode_client method)
|
||||
if 'header' in output:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
return output
|
||||
return None
|
||||
|
||||
class JsonHttpApiServer():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
call_handler : handler function to process the request. This function must accept the
|
||||
decoded request as a dictionary. The handler function must return a tuple consisting
|
||||
of the response in the form of a dictionary (may be empty), and a function execution
|
||||
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
|
||||
"""
|
||||
self.api_func = api_func
|
||||
if call_handler:
|
||||
self.call_handler = call_handler
|
||||
else:
|
||||
self.call_handler = self.default_handler
|
||||
|
||||
def default_handler(self, data: dict) -> (dict, str):
|
||||
"""default handler, used in case no call handler is provided."""
|
||||
logger.error("no handler function for request: %s" % str(data))
|
||||
return {}, 'Failed'
|
||||
|
||||
def call(self, request: Request) -> str:
|
||||
""" Process an incoming request.
|
||||
Args:
|
||||
request : request object as received using twisted.web.server
|
||||
Returns:
|
||||
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
|
||||
provided the request object)
|
||||
"""
|
||||
|
||||
# Make sure the request is done with the correct HTTP method
|
||||
if (request.method.decode() != self.api_func.http_method):
|
||||
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
|
||||
|
||||
# Decode the request
|
||||
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
|
||||
|
||||
# Run call handler (see above)
|
||||
data, fe_status = self.call_handler(decoded_request)
|
||||
|
||||
# In case a function execution status is returned, use it to generate and prepend the header field according to
|
||||
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
|
||||
if fe_status:
|
||||
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_server(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
res_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
res_headers.update(self.api_func.extra_http_res_headers)
|
||||
for header, value in res_headers.items():
|
||||
request.setHeader(header, value)
|
||||
request.setResponseCode(self.api_func.expected_http_status)
|
||||
|
||||
# Return the encoded result back to the caller for sending (using twisted/klein)
|
||||
return encoded
|
||||
|
||||
|
||||
@@ -151,6 +151,8 @@ class File:
|
||||
self.df_name = None
|
||||
self.fill_pattern = None
|
||||
self.fill_pattern_repeat = False
|
||||
self.pstdo = None # pinStatusTemplateDO, mandatory for DF/ADF
|
||||
self.lcsi = None # optional life cycle status indicator
|
||||
# apply some defaults from profile
|
||||
if self.template:
|
||||
self.from_template(self.template)
|
||||
@@ -278,6 +280,8 @@ class File:
|
||||
elif self.file_type in ['MF', 'DF', 'ADF']:
|
||||
fdb_dec['file_type'] = 'df'
|
||||
fdb_dec['structure'] = 'no_info_given'
|
||||
# pinStatusTemplateDO is mandatory for DF/ADF
|
||||
fileDescriptor['pinStatusTemplateDO'] = self.pstdo
|
||||
# build file descriptor based on above input data
|
||||
fd_dict = {}
|
||||
if len(fdb_dec):
|
||||
@@ -304,6 +308,8 @@ class File:
|
||||
# desired fill or repeat pattern in the "proprietaryEFInfo" element for the EF in Profiles
|
||||
# downloaded to a V2.2 or earlier eUICC.
|
||||
fileDescriptor['proprietaryEFInfo'] = pefi
|
||||
if self.lcsi:
|
||||
fileDescriptor['lcsi'] = self.lcsi
|
||||
logger.debug("%s: to_fileDescriptor(%s)" % (self, fileDescriptor))
|
||||
return fileDescriptor
|
||||
|
||||
@@ -323,6 +329,8 @@ class File:
|
||||
if efFileSize:
|
||||
self._file_size = self._decode_file_size(efFileSize)
|
||||
|
||||
self.pstdo = fileDescriptor.get('pinStatusTemplateDO', None)
|
||||
self.lcsi = fileDescriptor.get('lcsi', None)
|
||||
pefi = fileDescriptor.get('proprietaryEFInfo', {})
|
||||
securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None)
|
||||
if securityAttributesReferenced:
|
||||
|
||||
@@ -57,15 +57,18 @@ class BatchPersonalization:
|
||||
"""
|
||||
|
||||
class ParamAndSrc:
|
||||
'tie a ConfigurableParameter to a source of actual values'
|
||||
"""tie a ConfigurableParameter to a source of actual values"""
|
||||
def __init__(self, param: ConfigurableParameter, src: param_source.ParamSource):
|
||||
self.param = param
|
||||
if isinstance(param, type):
|
||||
self.param_cls = param
|
||||
else:
|
||||
self.param_cls = param.__class__
|
||||
self.src = src
|
||||
|
||||
def __init__(self,
|
||||
n: int,
|
||||
src_pes: ProfileElementSequence,
|
||||
params: list[ParamAndSrc]=None,
|
||||
params: list[ParamAndSrc]=[],
|
||||
csv_rows: Generator=None,
|
||||
):
|
||||
"""
|
||||
@@ -74,10 +77,10 @@ class BatchPersonalization:
|
||||
copied.
|
||||
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
|
||||
profile values.
|
||||
csv_rows: A list or generator producing all CSV rows one at a time, starting with a row containing the column
|
||||
headers. This is compatible with the python csv.reader. Each row gets passed to
|
||||
ParamSource.get_next(), such that ParamSource implementations can access the row items.
|
||||
See param_source.CsvSource.
|
||||
csv_rows: A generator (e.g. iter(list_of_rows)) producing all CSV rows one at a time, starting with a row
|
||||
containing the column headers. This is compatible with the python csv.reader. Each row gets passed to
|
||||
ParamSource.get_next(), such that ParamSource implementations can access the row items. See
|
||||
param_source.CsvSource.
|
||||
"""
|
||||
self.n = n
|
||||
self.params = params or []
|
||||
@@ -85,7 +88,7 @@ class BatchPersonalization:
|
||||
self.csv_rows = csv_rows
|
||||
|
||||
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
|
||||
self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src))
|
||||
self.params.append(BatchPersonalization.ParamAndSrc(param, src))
|
||||
|
||||
def generate_profiles(self):
|
||||
# get first row of CSV: column names
|
||||
@@ -112,10 +115,10 @@ class BatchPersonalization:
|
||||
try:
|
||||
input_value = p.src.get_next(csv_row=csv_row)
|
||||
assert input_value is not None
|
||||
value = p.param.__class__.validate_val(input_value)
|
||||
p.param.__class__.apply_val(pes, value)
|
||||
value = p.param_cls.validate_val(input_value)
|
||||
p.param_cls.apply_val(pes, value)
|
||||
except Exception as e:
|
||||
raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}') from e
|
||||
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e
|
||||
|
||||
yield pes
|
||||
|
||||
@@ -129,7 +132,7 @@ class UppAudit(dict):
|
||||
|
||||
@classmethod
|
||||
def from_der(cls, der: bytes, params: List, der_size=False, additional_sd_keys=False):
|
||||
'''return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
|
||||
"""return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
|
||||
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
|
||||
both 'IMSI' and 'IMSI-ACC' parameters.
|
||||
|
||||
@@ -151,7 +154,7 @@ class UppAudit(dict):
|
||||
Scp80Kvn03. So we would not show kvn 0x04..0x0f in an audit. additional_sd_keys=True includes audits of all SD
|
||||
key KVN there may be in the UPP. This helps to spot SD keys that may already be present in a UPP template, with
|
||||
unexpected / unusual kvn.
|
||||
'''
|
||||
"""
|
||||
|
||||
# make an instance of this class
|
||||
upp_audit = cls()
|
||||
@@ -317,7 +320,7 @@ class BatchAudit(list):
|
||||
return batch_audit
|
||||
|
||||
def to_csv_rows(self, headers=True, sort_key=None):
|
||||
'''generator that yields all audits' values as rows, useful feed to a csv.writer.'''
|
||||
"""generator that yields all audits' values as rows, useful feed to a csv.writer."""
|
||||
columns = set()
|
||||
for audit in self:
|
||||
columns.update(audit.keys())
|
||||
|
||||
@@ -37,13 +37,10 @@ class ParamSource:
|
||||
name = "none"
|
||||
numeric_base = None # or 10 or 16
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
"""Subclasses implement this:
|
||||
if a parameter source defines some string input magic, override this function.
|
||||
For example, a RandomDigitSource derives the number of digits from the string length,
|
||||
so the user can enter '0000' to get a four digit random number."""
|
||||
return cls(s)
|
||||
def __init__(self, input_str:str):
|
||||
"""Subclasses should call super().__init__(input_str) before evaluating self.input_str. Each subclass __init__()
|
||||
may in turn manipulate self.input_str to apply expansions or decodings."""
|
||||
self.input_str = input_str
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
"""Subclasses implement this: return the next value from the parameter source.
|
||||
@@ -51,78 +48,81 @@ class ParamSource:
|
||||
This default implementation is an empty source."""
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, input_str:str):
|
||||
"""compatibility with earlier version of ParamSource. Just use the constructor."""
|
||||
return cls(input_str)
|
||||
|
||||
class ConstantSource(ParamSource):
|
||||
"""one value for all"""
|
||||
name = "constant"
|
||||
|
||||
def __init__(self, val:str):
|
||||
self.val = val
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
return self.val
|
||||
return self.input_str
|
||||
|
||||
class InputExpandingParamSource(ParamSource):
|
||||
|
||||
def __init__(self, input_str:str):
|
||||
super().__init__(input_str)
|
||||
self.input_str = self.expand_input_str(self.input_str)
|
||||
|
||||
@classmethod
|
||||
def expand_str(cls, s:str):
|
||||
def expand_input_str(cls, input_str:str):
|
||||
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
|
||||
if "*" not in s:
|
||||
return s
|
||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
|
||||
if "*" not in input_str:
|
||||
return input_str
|
||||
# re: "XX * 123" with optional spaces
|
||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", input_str)
|
||||
if len(tokens) < 3:
|
||||
return s
|
||||
return input_str
|
||||
parts = []
|
||||
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
|
||||
parts.append(unchanged)
|
||||
repeat = int(repeat_str)
|
||||
parts.append(snippet * repeat)
|
||||
return "".join(parts)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
return cls(cls.expand_str(s))
|
||||
return "".join(parts)
|
||||
|
||||
class DecimalRangeSource(InputExpandingParamSource):
|
||||
"""abstract: decimal numbers with a value range"""
|
||||
|
||||
numeric_base = 10
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
"""
|
||||
See also from_str().
|
||||
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
|
||||
"""Constructor to set up values from a (user entered) string: DecimalRangeSource(input_str).
|
||||
Constructor to set up values directly: DecimalRangeSource(num_digits=3, first_value=123, last_value=456)
|
||||
|
||||
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
|
||||
num_digits: fixed number of digits (possibly with leading zeros) to generate.
|
||||
first_value, last_value: the decimal range in which to provide digits.
|
||||
num_digits produces leading zeros when first_value..last_value are shorter.
|
||||
"""
|
||||
num_digits = int(num_digits)
|
||||
first_value = int(first_value)
|
||||
last_value = int(last_value)
|
||||
assert ((input_str is not None and (num_digits, first_value, last_value) == (None, None, None))
|
||||
or (input_str is None and None not in (num_digits, first_value, last_value)))
|
||||
|
||||
if input_str is not None:
|
||||
super().__init__(input_str)
|
||||
|
||||
input_str = self.input_str
|
||||
|
||||
if ".." in input_str:
|
||||
first_str, last_str = input_str.split('..')
|
||||
first_str = first_str.strip()
|
||||
last_str = last_str.strip()
|
||||
else:
|
||||
first_str = input_str.strip()
|
||||
last_str = None
|
||||
|
||||
num_digits = len(first_str)
|
||||
first_value = int(first_str)
|
||||
last_value = int(last_str if last_str is not None else "9" * num_digits)
|
||||
|
||||
assert num_digits > 0
|
||||
assert first_value <= last_value
|
||||
self.num_digits = num_digits
|
||||
self.val_first_last = (first_value, last_value)
|
||||
self.first_value = first_value
|
||||
self.last_value = last_value
|
||||
|
||||
def val_to_digit(self, val:int):
|
||||
return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
|
||||
if ".." in s:
|
||||
first_str, last_str = s.split('..')
|
||||
first_str = first_str.strip()
|
||||
last_str = last_str.strip()
|
||||
else:
|
||||
first_str = s.strip()
|
||||
last_str = None
|
||||
|
||||
first_value = int(first_str)
|
||||
last_value = int(last_str) if last_str is not None else "9" * len(first_str)
|
||||
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
|
||||
|
||||
class RandomSourceMixin:
|
||||
random_impl = secrets.SystemRandom()
|
||||
|
||||
@@ -135,7 +135,7 @@ class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
|
||||
# try to generate random digits that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randint(*self.val_first_last)
|
||||
val = self.random_impl.randint(self.first_value, self.last_value)
|
||||
if val in RandomDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
@@ -150,9 +150,11 @@ class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
numeric_base = 16
|
||||
used_keys = set()
|
||||
|
||||
def __init__(self, num_digits):
|
||||
"""see from_str()"""
|
||||
num_digits = int(num_digits)
|
||||
def __init__(self, input_str:str):
|
||||
super().__init__(input_str)
|
||||
input_str = self.input_str
|
||||
|
||||
num_digits = len(input_str.strip())
|
||||
if num_digits < 1:
|
||||
raise ValueError("zero number of digits")
|
||||
# hex digits always come in two
|
||||
@@ -174,23 +176,20 @@ class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
|
||||
return b2h(val)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
return cls(num_digits=len(s.strip()))
|
||||
|
||||
class IncDigitSource(DecimalRangeSource):
|
||||
"""incrementing sequence of digits"""
|
||||
name = "incrementing decimal digits"
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
super().__init__(num_digits, first_value, last_value)
|
||||
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
|
||||
"""input_str: the first value to return, a string of an integer number with optional leading zero digits. The
|
||||
leading zero digits are preserved."""
|
||||
super().__init__(input_str, num_digits, first_value, last_value)
|
||||
self.next_val = None
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
"""Restart from the first value of the defined range passed to __init__()."""
|
||||
self.next_val = self.val_first_last[0]
|
||||
self.next_val = self.first_value
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = self.next_val
|
||||
@@ -200,7 +199,7 @@ class IncDigitSource(DecimalRangeSource):
|
||||
returnval = self.val_to_digit(val)
|
||||
|
||||
val += 1
|
||||
if val > self.val_first_last[1]:
|
||||
if val > self.last_value:
|
||||
self.next_val = None
|
||||
else:
|
||||
self.next_val = val
|
||||
@@ -211,13 +210,15 @@ class CsvSource(ParamSource):
|
||||
"""apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)"""
|
||||
name = "from CSV"
|
||||
|
||||
def __init__(self, csv_column):
|
||||
def __init__(self, input_str:str):
|
||||
"""self.csv_column = input_str:
|
||||
column name indicating the column to use for this parameter.
|
||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
||||
CsvSource picks the column with the name matching csv_column.
|
||||
"""
|
||||
csv_column: column name indicating the column to use for this parameter.
|
||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
||||
CsvSource picks the column with the name matching csv_column.
|
||||
"""
|
||||
self.csv_column = csv_column
|
||||
"""Parse input_str into self.num_digits, self.first_value, self.last_value."""
|
||||
super().__init__(input_str)
|
||||
self.csv_column = self.input_str
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = None
|
||||
|
||||
@@ -22,9 +22,11 @@ import re
|
||||
import pprint
|
||||
from typing import List, Tuple, Generator, Optional
|
||||
|
||||
from construct.core import StreamError
|
||||
from osmocom.tlv import camel_to_snake
|
||||
from osmocom.utils import hexstr
|
||||
from pySim.utils import enc_iccid, dec_iccid, enc_imsi, dec_imsi, h2b, b2h, rpad, sanitize_iccid
|
||||
from pySim.ts_31_102 import EF_AD
|
||||
from pySim.ts_51_011 import EF_SMSP
|
||||
from pySim.esim.saip import param_source
|
||||
from pySim.esim.saip import ProfileElement, ProfileElementSD, ProfileElementSequence
|
||||
@@ -55,22 +57,6 @@ class ClassVarMeta(abc.ABCMeta):
|
||||
setattr(x, k, v)
|
||||
return x
|
||||
|
||||
def file_tuples_content_as_bytes(l: List[Tuple]) -> Optional[bytes]:
|
||||
"""linearize a list of fillFileContent / fillFileOffset tuples into a stream of bytes."""
|
||||
stream = io.BytesIO()
|
||||
for k, v in l:
|
||||
if k == 'doNotCreate':
|
||||
return None
|
||||
if k == 'fileDescriptor':
|
||||
pass
|
||||
elif k == 'fillFileOffset':
|
||||
stream.seek(v, os.SEEK_CUR)
|
||||
elif k == 'fillFileContent':
|
||||
stream.write(v)
|
||||
else:
|
||||
return ValueError("Unknown key '%s' in tuple list" % k)
|
||||
return stream.getvalue()
|
||||
|
||||
class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
r"""Base class representing a part of the eSIM profile that is configurable during the
|
||||
personalization process (with dynamic data from elsewhere).
|
||||
@@ -344,6 +330,7 @@ class DecimalHexParam(DecimalParam):
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
val = super().validate_val(val)
|
||||
assert isinstance(val, str)
|
||||
val = ''.join('%02x' % ord(x) for x in val)
|
||||
if cls.rpad is not None:
|
||||
c = cls.rpad_char
|
||||
@@ -353,7 +340,7 @@ class DecimalHexParam(DecimalParam):
|
||||
|
||||
@classmethod
|
||||
def decimal_hex_to_str(cls, val):
|
||||
'useful for get_values_from_pes() implementations of subclasses'
|
||||
"""useful for get_values_from_pes() implementations of subclasses"""
|
||||
if isinstance(val, bytes):
|
||||
val = b2h(val)
|
||||
assert isinstance(val, hexstr)
|
||||
@@ -633,7 +620,7 @@ class SmspTpScAddr(ConfigurableParameter):
|
||||
# ensure the parameter_indicators.tp_sc_addr is True
|
||||
ef_smsp_dec['parameter_indicators']['tp_sc_addr'] = True
|
||||
# re-encode into the File body
|
||||
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1)
|
||||
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1, 52)
|
||||
#print("SMSP (new): %s" % f_smsp.body)
|
||||
# re-generate the pe.decoded member from the File instance
|
||||
pe.file2pe(f_smsp)
|
||||
@@ -662,6 +649,71 @@ class SmspTpScAddr(ConfigurableParameter):
|
||||
yield { cls.name: cls.tuple_to_str((international, digits)) }
|
||||
|
||||
|
||||
class MncLen(ConfigurableParameter):
|
||||
"""MNC length. Must be either 2 or 3. Sets only the MNC length field in EF-AD (Administrative Data)."""
|
||||
name = 'MNC-LEN'
|
||||
allow_chars = '23'
|
||||
strip_chars = ' \t\r\n'
|
||||
numeric_base = 10
|
||||
max_len = 1
|
||||
min_len = 1
|
||||
example_input = '2'
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
val = super().validate_val(val)
|
||||
val = int(val)
|
||||
if val not in (2, 3):
|
||||
raise ValueError(f"MNC-LEN must be either 2 or 3, not {val!r}")
|
||||
return val
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
"""val must be an int: either 2 or 3"""
|
||||
for pe in pes.get_pes_for_type('usim'):
|
||||
if not hasattr(pe, 'files'):
|
||||
continue
|
||||
# decode existing values
|
||||
f_ad = pe.files['ef-ad']
|
||||
if not f_ad.body:
|
||||
continue
|
||||
try:
|
||||
ef_ad = EF_AD()
|
||||
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
|
||||
except StreamError:
|
||||
continue
|
||||
if 'mnc_len' not in ef_ad_dec:
|
||||
continue
|
||||
# change mnc_len
|
||||
ef_ad_dec['mnc_len'] = val
|
||||
# re-encode into the File body
|
||||
f_ad.body = ef_ad.encode_bin(ef_ad_dec)
|
||||
pe.file2pe(f_ad)
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for naa in ('isim',):# 'isim', 'csim'):
|
||||
for pe in pes.get_pes_for_type(naa):
|
||||
if not hasattr(pe, 'files'):
|
||||
continue
|
||||
f_ad = pe.files.get('ef-ad', None)
|
||||
if f_ad is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
ef_ad = EF_AD()
|
||||
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
|
||||
except StreamError:
|
||||
continue
|
||||
|
||||
mnc_len = ef_ad_dec.get('mnc_len', None)
|
||||
if mnc_len is None:
|
||||
continue
|
||||
|
||||
yield { cls.name: str(mnc_len) }
|
||||
|
||||
|
||||
class SdKey(BinaryParam):
|
||||
"""Configurable Security Domain (SD) Key. Value is presented as bytes.
|
||||
Non-abstract implementations are generated in SdKey.generate_sd_key_classes"""
|
||||
|
||||
@@ -859,28 +859,22 @@ class ADF_SD(CardADF):
|
||||
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
|
||||
self._cmd.poutput("Loaded a total of %u bytes in %u blocks. Don't forget install_for_install (and make selectable) now!" % (total_size, block_nr))
|
||||
|
||||
install_cap_parser = argparse.ArgumentParser(usage='%(prog)s FILE [--install-parameters | --install-parameters-*]')
|
||||
install_cap_parser = argparse.ArgumentParser()
|
||||
install_cap_parser.add_argument('cap_file', type=str, metavar='FILE',
|
||||
help='JAVA-CARD CAP file to install')
|
||||
# Ideally, the parser should enforce that:
|
||||
# * either the `--install-parameters` is given alone,
|
||||
# * or distinct `--install-parameters-*` are optionally given instead.
|
||||
# We tried to achieve this using mutually exclusive groups (add_mutually_exclusive_group).
|
||||
# However, group nesting was never supported, often failed to work correctly, and was unintentionally
|
||||
# exposed through inheritance. It has been deprecated since version 3.11, removed in version 3.14.
|
||||
# Hence, we have to implement the enforcement manually.
|
||||
install_cap_parser_inst_prm_grp = install_cap_parser.add_argument_group('Install Parameters')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters', type=is_hexstr, default=None,
|
||||
help='install Parameters (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters-non-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='non volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters-stk',
|
||||
type=is_hexstr, default=None,
|
||||
help='Load Parameters (ETSI TS 102 226, section 8.2.1.3.2.1)')
|
||||
install_cap_parser_inst_prm_g = install_cap_parser.add_mutually_exclusive_group()
|
||||
install_cap_parser_inst_prm_g.add_argument('--install-parameters', type=is_hexstr, default=None,
|
||||
help='install Parameters (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_g_grp = install_cap_parser_inst_prm_g.add_argument_group()
|
||||
install_cap_parser_inst_prm_g_grp.add_argument('--install-parameters-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_g_grp.add_argument('--install-parameters-non-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='non volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_g_grp.add_argument('--install-parameters-stk',
|
||||
type=is_hexstr, default=None,
|
||||
help='Load Parameters (ETSI TS 102 226, section 8.2.1.3.2.1)')
|
||||
|
||||
@cmd2.with_argparser(install_cap_parser)
|
||||
def do_install_cap(self, opts):
|
||||
@@ -894,17 +888,9 @@ class ADF_SD(CardADF):
|
||||
load_file_aid = cap.get_loadfile_aid()
|
||||
module_aid = cap.get_applet_aid()
|
||||
application_aid = module_aid
|
||||
if opts.install_parameters is not None:
|
||||
# `--install-parameters` and `--install-parameters-*` are mutually exclusive
|
||||
# make sure that none of `--install-parameters-*` is given; abort otherwise
|
||||
if any(p is not None for p in [opts.install_parameters_non_volatile_memory_quota,
|
||||
opts.install_parameters_volatile_memory_quota,
|
||||
opts.install_parameters_stk]):
|
||||
self.install_cap_parser.error('arguments --install-parameters-* are '
|
||||
'not allowed with --install-parameters')
|
||||
if opts.install_parameters:
|
||||
install_parameters = opts.install_parameters;
|
||||
else:
|
||||
# `--install-parameters-*` are all optional
|
||||
install_parameters = gen_install_parameters(opts.install_parameters_non_volatile_memory_quota,
|
||||
opts.install_parameters_volatile_memory_quota,
|
||||
opts.install_parameters_stk)
|
||||
|
||||
@@ -266,11 +266,13 @@ class SCP02(SCP):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def dek_encrypt(self, plaintext:bytes) -> bytes:
|
||||
cipher = DES.new(self.card_keys.dek[:8], DES.MODE_ECB)
|
||||
# See also GPC section B.1.1.2, E.4.7, and E.4.1
|
||||
cipher = DES3.new(self.sk.data_enc, DES.MODE_ECB)
|
||||
return cipher.encrypt(plaintext)
|
||||
|
||||
def dek_decrypt(self, ciphertext:bytes) -> bytes:
|
||||
cipher = DES.new(self.card_keys.dek[:8], DES.MODE_ECB)
|
||||
# See also GPC section B.1.1.2, E.4.7, and E.4.1
|
||||
cipher = DES3.new(self.sk.data_enc, DES.MODE_ECB)
|
||||
return cipher.decrypt(ciphertext)
|
||||
|
||||
def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes):
|
||||
|
||||
@@ -2200,9 +2200,9 @@ update_record 6 fe0112ffb53e96e5ff99731d51ad7beafd0e23ffffffffffffffffffffffffff
|
||||
update_record 7 fe02101da012f436d06824ecdd15050419ff9affffffffffffffffffffffffffffffff
|
||||
update_record 8 fe02116929a373388ac904aff57ff57f6b3431ffffffffffffffffffffffffffffffff
|
||||
update_record 9 fe0212a99245a5dc814e2f4c1aa908e9946e03ffffffffffffffffffffffffffffffff
|
||||
update_record 10 fe0310521312c05a9aea93d70d44405172a580ffffffffffffffffffffffffffffffff
|
||||
update_record 11 fe0311a9e45c72d45abde7db74261ee0c11b1bffffffffffffffffffffffffffffffff
|
||||
update_record 12 fe0312867ba36b5873d60ea8b2cdcf3c0ddddaffffffffffffffffffffffffffffffff
|
||||
update_record 10 fe03601111111111111111111111111111111111111111111111111111111111111111
|
||||
update_record 11 fe03612222222222222222222222222222222222222222222222222222222222222222
|
||||
update_record 12 fe03623333333333333333333333333333333333333333333333333333333333333333
|
||||
#
|
||||
################################################################################
|
||||
# MF/DF.SYSTEM/EF.SIM_AUTH_COUNTER #
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
# Card parameter:
|
||||
ICCID="8949440000001155314"
|
||||
KIC='51D4FC44BCBA7C4589DFADA3297720AF'
|
||||
KID='0449699C472CE71E2FB7B56245EF7684'
|
||||
|
||||
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
|
||||
TAR='B00010'
|
||||
APDU='A0A40000027F20A0C0000016'
|
||||
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'
|
||||
@@ -20,13 +20,14 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
PYSIM_SHELL=./pySim-shell.py
|
||||
PYSIM_SHELL_LOG=./pySim-shell.log
|
||||
PYSIM_SMPP2SIM=./pySim-smpp2sim.py
|
||||
PYSIM_SMPP2SIM_LOG=./pySim-smpp2sim.log
|
||||
PYSIM_SMPP2SIM_PORT=2775
|
||||
PYSIM_SMPP2SIM_TIMEOUT=10
|
||||
PYSIM_SMPPOTATOOL=./contrib/smpp-ota-tool.py
|
||||
PYSIM_SMPPOTATOOL_LOG=./smpp-ota-tool.log
|
||||
PYSIM_SHELL=./pySim-shell.py
|
||||
|
||||
function dump_logs {
|
||||
echo ""
|
||||
@@ -44,12 +45,11 @@ function dump_logs {
|
||||
function send_test_request {
|
||||
echo ""
|
||||
echo "Sending request to SMPP server:"
|
||||
TAR=$1
|
||||
C_APDU=$2
|
||||
R_APDU_EXPECTED=$3
|
||||
C_APDU=$1
|
||||
R_APDU_EXPECTED=$2
|
||||
|
||||
echo "Sending: $C_APDU"
|
||||
COMMANDLINE="$PYSIM_SMPPOTATOOL --verbose --port $PYSIM_SMPP2SIM_PORT --kic $KIC --kid $KID --tar $TAR --apdu $C_APDU"
|
||||
COMMANDLINE="$PYSIM_SMPPOTATOOL --verbose --port $PYSIM_SMPP2SIM_PORT --kic $KIC --kid $KID --kic-idx $KEY_INDEX --kid-idx $KEY_INDEX --algo-crypt $ALGO_CRYPT --algo-auth $ALGO_AUTH --tar $TAR --apdu $C_APDU"
|
||||
echo "Commandline: $COMMANDLINE"
|
||||
R_APDU=`$COMMANDLINE 2> $PYSIM_SMPPOTATOOL_LOG`
|
||||
if [ $? -ne 0 ]; then
|
||||
@@ -57,7 +57,7 @@ function send_test_request {
|
||||
dump_logs
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "Got response from SMPP server:"
|
||||
echo "Sent: $C_APDU"
|
||||
echo "Received: $R_APDU"
|
||||
@@ -68,16 +68,14 @@ function send_test_request {
|
||||
exit 1
|
||||
fi
|
||||
echo "Response matches the expected response -- success!"
|
||||
echo ""
|
||||
}
|
||||
|
||||
function start_smpp_server {
|
||||
PCSC_READER=$1
|
||||
|
||||
# Start the SMPP server
|
||||
echo ""
|
||||
echo "Starting SMPP server:"
|
||||
|
||||
# Start the SMPP server
|
||||
COMMANDLINE="$PYSIM_SMPP2SIM -p $PCSC_READER --smpp-bind-port $PYSIM_SMPP2SIM_PORT --apdu-trace"
|
||||
echo "Commandline: $COMMANDLINE"
|
||||
$COMMANDLINE > $PYSIM_SMPP2SIM_LOG 2>&1 &
|
||||
@@ -102,55 +100,117 @@ function start_smpp_server {
|
||||
echo "SMPP server reachable (port=$PYSIM_SMPP2SIM_PORT)"
|
||||
}
|
||||
|
||||
function find_card_by_iccid {
|
||||
# Find reader number of the card
|
||||
ICCID=$1
|
||||
function stop_smpp_server {
|
||||
echo ""
|
||||
echo "Stopping SMPP server:"
|
||||
kill $PYSIM_SMPP2SIM_PID
|
||||
echo "SMPP server stopped (PID=$PYSIM_SMPP2SIM_PID)"
|
||||
trap EXIT
|
||||
}
|
||||
|
||||
function find_card_by_iccid_or_eid {
|
||||
ICCID=$1
|
||||
EID=$2
|
||||
echo ""
|
||||
echo "Searching for card:"
|
||||
echo "ICCID: \"$ICCID\""
|
||||
if [ -n "$EID" ]; then
|
||||
echo "EID: \"$EID\""
|
||||
fi
|
||||
|
||||
# Determine number of available PCSC readers
|
||||
PCSC_READER_COUNT=`pcsc_scan -rn | wc -l`
|
||||
|
||||
# In case an EID is set, search for a card with that EID first
|
||||
if [ -n "$EID" ]; then
|
||||
for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
|
||||
echo "probing card (eID) in reader $PCSC_READER ..."
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "get_eid" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep $EID > /dev/null
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Found card (eID) in reader $PCSC_READER"
|
||||
return $PCSC_READER
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
# Search for card with the given ICCID
|
||||
if [ -z "$ICCID" ]; then
|
||||
echo "invalid ICCID, zero length ICCID is not allowed! -- abort"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
PCSC_READER_COUNT=`pcsc_scan -rn | wc -l`
|
||||
for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
|
||||
echo "probing card in reader $PCSC_READER ..."
|
||||
EF_ICCID_DECODED=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e 'select EF.ICCID' -e 'read_binary_decoded --oneline' 2> /dev/null | tail -1`
|
||||
echo $EF_ICCID_DECODED | grep $ICCID > /dev/null
|
||||
echo "probing card (ICCID) in reader $PCSC_READER ..."
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select EF.ICCID" -e "read_binary_decoded" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep $ICCID > /dev/null
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Found card in reader $PCSC_READER"
|
||||
echo "Found card (by ICCID) in reader $PCSC_READER"
|
||||
return $PCSC_READER
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Card with ICCID \"$ICCID\" not found -- abort"
|
||||
echo "Card not found -- abort"
|
||||
exit 1
|
||||
}
|
||||
|
||||
function enable_profile {
|
||||
PCSC_READER=$1
|
||||
ICCID=$2
|
||||
EID=$3
|
||||
if [ -z "$EID" ]; then
|
||||
# This is no eUICC, nothing to enable
|
||||
return 0
|
||||
fi
|
||||
|
||||
# Check if the profile is already enabled
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select EF.ICCID" -e "read_binary_decoded" 2> /dev/null | tail -3`
|
||||
ICCID_ENABLED=`echo $RESULT_JSON | jq -r '.iccid'`
|
||||
if [ $ICCID != $ICCID_ENABLED ]; then
|
||||
# Disable the currentle enabled profile
|
||||
echo ""
|
||||
echo "Disabeling currently enabled profile:"
|
||||
echo "ICCID: \"$ICCID\""
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "disable_profile --iccid $ICCID_ENABLED" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep "ok" > /dev/null
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "unable to disable profile with \"$ICCID_ENABLED\""
|
||||
exit 1
|
||||
fi
|
||||
echo "profile disabled"
|
||||
|
||||
# Enable the profile we intend to test with
|
||||
echo ""
|
||||
echo "Enabeling profile:"
|
||||
echo "ICCID: \"$ICCID\""
|
||||
RESULT_JSON=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e "select ADF.ISD-R" -e "enable_profile --iccid $ICCID" 2> /dev/null | tail -3`
|
||||
echo $RESULT_JSON | grep "ok\|profileNotInDisabledState" > /dev/null
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "unable to enable profile with \"$ICCID\""
|
||||
exit 1
|
||||
fi
|
||||
echo "profile enabled"
|
||||
fi
|
||||
}
|
||||
|
||||
export PYTHONPATH=./
|
||||
|
||||
echo "pySim-smpp2sim_test - a test program to test pySim-smpp2sim.py"
|
||||
echo "=============================================================="
|
||||
|
||||
# TODO: At the moment we can only have one card and one testcase. This is
|
||||
# sufficient for now. We can extend this later as needed.
|
||||
|
||||
# Read test parameters from config from file
|
||||
TEST_CONFIG_FILE=${0%.*}.cfg
|
||||
echo "using config file: $TEST_CONFIG_FILE"
|
||||
if ! [ -e "$TEST_CONFIG_FILE" ]; then
|
||||
echo "test configuration file does not exist! -- abort"
|
||||
exit 1
|
||||
fi
|
||||
. $TEST_CONFIG_FILE
|
||||
|
||||
# Execute testcase
|
||||
find_card_by_iccid $ICCID
|
||||
start_smpp_server $?
|
||||
send_test_request $TAR $APDU "$EXPECTED_RESPONSE"
|
||||
|
||||
|
||||
TESTCASE_DIR=`dirname $0`
|
||||
for TEST_CONFIG_FILE in $TESTCASE_DIR/testcase_*.cfg ; do
|
||||
echo ""
|
||||
echo "running testcase: $TEST_CONFIG_FILE"
|
||||
. $TEST_CONFIG_FILE
|
||||
find_card_by_iccid_or_eid $ICCID $EID
|
||||
PCSC_READER=$?
|
||||
enable_profile $PCSC_READER $ICCID $EID
|
||||
start_smpp_server $PCSC_READER
|
||||
send_test_request $APDU "$EXPECTED_RESPONSE"
|
||||
stop_smpp_server
|
||||
echo ""
|
||||
echo "testcase ok"
|
||||
echo "--------------------------------------------------------------"
|
||||
done
|
||||
|
||||
echo "done."
|
||||
|
||||
17
tests/pySim-smpp2sim_test/testcase_3des_cbc2_rfm.cfg
Normal file
17
tests/pySim-smpp2sim_test/testcase_3des_cbc2_rfm.cfg
Normal file
@@ -0,0 +1,17 @@
|
||||
# Preparation:
|
||||
# This testcase executes against a sysmoISIM-SJA5 card. For the testcase, the
|
||||
# key configuration on the card may be used as it is.
|
||||
|
||||
# Card parameter:
|
||||
ICCID="8949440000001155314" # <-- change to the ICCID of your card!
|
||||
EID=""
|
||||
KIC='51D4FC44BCBA7C4589DFADA3297720AF' # <-- change to the KIC1 of your card!
|
||||
KID='0449699C472CE71E2FB7B56245EF7684' # <-- change to the KID1 of your card!
|
||||
KEY_INDEX=1
|
||||
ALGO_CRYPT=triple_des_cbc2
|
||||
ALGO_AUTH=triple_des_cbc2
|
||||
TAR='B00010'
|
||||
|
||||
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
|
||||
APDU='A0A40000027F20A0C0000016'
|
||||
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'
|
||||
19
tests/pySim-smpp2sim_test/testcase_aes128_cbc_cmac_rfm.cfg
Normal file
19
tests/pySim-smpp2sim_test/testcase_aes128_cbc_cmac_rfm.cfg
Normal file
@@ -0,0 +1,19 @@
|
||||
# Preparation:
|
||||
# This testcase executes against a sysmoEUICC1-C2T, which is equipped with the
|
||||
# TS48V1-B-UNIQUE test profile from https://test.rsp.sysmocom.de/ (Activation
|
||||
# code: 1$smdpp.test.rsp.sysmocom.de$TS48V1-B-UNIQUE). This testprofile must be
|
||||
# present on the eUICC before this testcase can be executed.
|
||||
|
||||
# Card parameter:
|
||||
ICCID="8949449999999990031"
|
||||
EID="89049044900000000000000000102355" # <-- change to the EID of your card!
|
||||
KIC='66778899aabbccdd1122334455eeff10'
|
||||
KID='112233445566778899aabbccddeeff10'
|
||||
KEY_INDEX=2
|
||||
ALGO_CRYPT=aes_cbc
|
||||
ALGO_AUTH=aes_cmac
|
||||
TAR='b00120'
|
||||
|
||||
# Testcase: Send OTA-SMS that selects DF.ICCID and returns the select response
|
||||
APDU='00a40004022fe200C000001d'
|
||||
EXPECTED_RESPONSE='621b8202412183022fe2a503d001408a01058b032f06038002000a8800 9000'
|
||||
28
tests/pySim-smpp2sim_test/testcase_aes256_cbc_cmac_rfm.cfg
Normal file
28
tests/pySim-smpp2sim_test/testcase_aes256_cbc_cmac_rfm.cfg
Normal file
@@ -0,0 +1,28 @@
|
||||
# Preparation:
|
||||
# This testcase executes against a sysmoISIM-SJA5 card. Since this card model is
|
||||
# shipped with a classic DES key configuration, it is necessary to provision
|
||||
# AES128 test keys before this testcase may be executed. The the following
|
||||
# pySim-shell command sequence may be used:
|
||||
#
|
||||
# verify_adm 34173960 # <-- change to the ADM key of your card!
|
||||
# select /DF.SYSTEM/EF.0348_KEY
|
||||
# update_record 10 fe03601111111111111111111111111111111111111111111111111111111111111111
|
||||
# update_record 11 fe03612222222222222222222222222222222222222222222222222222222222222222
|
||||
# update_record 12 fe03623333333333333333333333333333333333333333333333333333333333333333
|
||||
#
|
||||
# This overwrites one of the already existing 3DES SCP02 key (KVN 47) and replaces it
|
||||
# with an AES256 SCP80 key (KVN 3).
|
||||
|
||||
# Card parameter:
|
||||
ICCID="8949440000001155314" # <-- change to the ICCID of your card!
|
||||
EID=""
|
||||
KIC='1111111111111111111111111111111111111111111111111111111111111111'
|
||||
KID='2222222222222222222222222222222222222222222222222222222222222222'
|
||||
KEY_INDEX=3
|
||||
ALGO_CRYPT=aes_cbc
|
||||
ALGO_AUTH=aes_cmac
|
||||
TAR='B00010'
|
||||
|
||||
# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
|
||||
APDU='A0A40000027F20A0C0000016'
|
||||
EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'
|
||||
@@ -310,11 +310,14 @@ class ConfigurableParameterTest(unittest.TestCase):
|
||||
p13n.SdKeyScp80Kvn03DesDek,
|
||||
#p13n.SdKeyScp80Kvn03DesEnc,
|
||||
#p13n.SdKeyScp80Kvn03DesMac,
|
||||
p13n.SdKeyScp81Kvn40Dek ,
|
||||
#p13n.SdKeyScp81Kvn40AesDek,
|
||||
p13n.SdKeyScp81Kvn40DesDek,
|
||||
#p13n.SdKeyScp81Kvn40Tlspsk,
|
||||
#p13n.SdKeyScp81Kvn41Dek ,
|
||||
#p13n.SdKeyScp81Kvn41AesDek,
|
||||
#p13n.SdKeyScp81Kvn41DesDek,
|
||||
p13n.SdKeyScp81Kvn41Tlspsk,
|
||||
#p13n.SdKeyScp81Kvn42Dek ,
|
||||
#p13n.SdKeyScp81Kvn42AesDek,
|
||||
#p13n.SdKeyScp81Kvn42DesDek,
|
||||
#p13n.SdKeyScp81Kvn42Tlspsk,
|
||||
):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user