Compare commits

..

77 Commits

Author SHA1 Message Date
Neels Hofmeyr e82ae66d84 Revert "esim/http_json_api: extend JSON API with server functionality"
This reverts commit e00c0becca.
2026-06-26 21:51:06 +02:00
Neels Hofmeyr e345f31833 Revert "esim/http_json_api: add missing apidoc"
This reverts commit 0a1c5a27d7.
2026-06-26 21:51:06 +02:00
Neels Hofmeyr 504c83662a Revert "http_json_api: Only require Content-Type if response body is non-empty"
This reverts commit e0a9e73267.
2026-06-26 21:51:06 +02:00
Neels Hofmeyr 9677a8ecf2 Revert "esim/http_json_api: add alternative API interface"
This reverts commit f9d7c82b4d.
2026-06-26 21:51:06 +02:00
Neels Hofmeyr 7f87563d09 Revert "esim/http_json_api: add alternative API interface (follow up)"
This reverts commit 8b2a49aa8e.
2026-06-26 21:51:06 +02:00
Neels Hofmeyr 5d079d085f Revert "esim/http_json_api: allow URL rewriting"
This reverts commit 0634f77308.
2026-06-26 21:51:06 +02:00
Neels Hofmeyr fd8c2ba0b0 personalization: add GfmSuciRi, GfmSuciCalcInfo, and test
Change-Id: I1b69debf5992aa715171b43b30864dc152dc556f
2026-06-26 21:51:06 +02:00
Neels Hofmeyr 27dfae5efc SAIP2.1_gfmsuci.der
Change-Id: I69354754013e4a98d86a9a8abdd7a6a36eb95177
2026-06-26 21:51:06 +02:00
Neels Hofmeyr 785ebfdd06 debug log for rebuild_mandatory_services()
Change-Id: Ie53c84247a9524a81b475b3b02b5330207c15601
2026-06-26 21:51:06 +02:00
Neels Hofmeyr 0d8a43904f saip/personalization: add EuiccMandatoryServiceParam for 3 services
Even though the eUICC-Mandatory-services are always set by
saip.PES.rebuild_mandatory_services(), these new params are useful to
audit and validate the values as present in a given DER.

Change-Id: Icddeb2488c4a024c6ee5afcc1b6c8cc0e436c43c
2026-06-26 21:48:49 +02:00
Neels Hofmeyr 664d69db65 personalization: SUCI: ff-pad remaining file size
Change-Id: I9465eac9269e3f76dddd467109f547489a0feb93
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 0175b1045b saip.personalization: add SAIP 2.3 SUCI parameters
Change-Id: I3c0793b8a67bbd0c8247784bd3b5cbd265f94ec2
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 945dfb0129 tweak test_configurable_parameters.py: add iff_present flag
apply a parameter only when it exists in the template, will be useful
for suci

Change-Id: I5811ecde4c4e880bb8dbd22fffe23faafcfe36ad
2026-06-26 21:47:48 +02:00
Neels Hofmeyr f1447ea55b tweak test_configurable_parameters.py: show value found in template
Change-Id: If9039bbb8547ee24ae784a932f60cd5de6c9247b
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 7f32641008 tweak test_configurable_parameters.py: saner output composition.
Change-Id: Id3e3b46b2b3d7919a75c620803ce28d2a715008b
2026-06-26 21:47:48 +02:00
Neels Hofmeyr c01d7a70e1 TuakNumberOfKeccak: remove numeric_base to hide random number sources
Change-Id: I913878e3f05ad1e39ff45da75c67582a6a1f930f
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 8c81e70225 saip/batch: add blacklist to to_csv_rows()
Change-Id: I5e567e59a007cf4b5d75a4dcea5371ff4404bf24
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 434cc45259 ConfigurableParameter.get_typical_input_len: limit to 10 lines
Change-Id: Ia3d79e786f397a02bf2a8fafac5030d1198d9f76
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 434669e36c personalization: EF_SMSP: keep same length as found in template
Change-Id: Id24752101ae82c4986209f4103cc9cbdcce8ce1d
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 26ea0d1656 saip.batch: log parameter errors
Change-Id: I6a46b2dc9018078ab8361226d1e6b50d3b4e1aaa
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 7362525f8e personalization: add MncLen ConfigurableParameter
Implement as EnumParam because it only has two possible values

Change-Id: I6c600faeab00ffb072acbe94c9a8b2d1397c07d3
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 5e543f5566 personalization: EnumParam: implement as value_map, not enum.IntEnum
EnumParam is for labels shown in the UI, not an enum to be used in
python code (what the python enum module is written for). So it is
semantically wrong to use enum.IntEnum.

- the label should be allowed to be any string, not just valid python
  identifiers. For example, a label like "SUCI-in-USIM" should be
  possible.

- we should not "leak" UI labels into the python namespace.

- the value should be allowed to be any type, so that each
  ConfigurableParameter implementation can use whichever is its internal
  native type.

History: this patch is the original version of EnumParam, which was
modified during CR to use enum.IntEnum. However, newer
ConfigurableParameters coming up don't match well:
- MncLen (labels "2" and "3")
- EuiccMandatoryServiceParam (values True and False)
- EfUstServiceParam like SuciInUsim (labels "SUCI-in-UE" and
  "SUCI-in-USIM")

So this brings back the original capability for any label string, and
any type of value.

Change-Id: I690ceccf0ec7ef7067bcaa5cec1303cdaf0f78a4
2026-06-26 21:47:48 +02:00
Neels Hofmeyr e2ec0347f6 esim/http_json_api.py: support text/plain response Content-Type
Allow returning text/plain Content-Types as 'data' output argument.

So far, all the esim/http_json_api functions require a JSON response.
However, a specific vendor has a list function where the request is JSON
but the response is text/plain CSV data. Allow and return in a dict.

Change-Id: Iba6e4cef1048b376050a435a900c0f395655a790
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 7a1c506ba9 saip.SdKey*: remove transitional name mapping
Now, finally, all SdKey classes have a unified logical naming scheme.
Revert the transitional name mapping.

Change-Id: Ic185af4a903c2211a5361d023af9e7c6fc57ae78
2026-06-26 21:47:48 +02:00
Neels Hofmeyr cd7a788d6d saip.SdKey*: transitional name mapping
To help existing applications transition to a common naming scheme for
the SdKey classes, offer this intermediate result, where the SdKey
classes' .name are still unchanged as before generating them.

Change-Id: I974cb6c393a2ed2248a6240c2722d157e9235c33
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 7fcd21dd4f ts_31_102.py: EF_SUCI_Calc_Info(TransparentEF): fix len test
while len(foo):

throws an exception when foo == None.
Instead doing

    while foo:

fixes a problem when reading in empty SUCI calc info data, e.g. from
TS48v7.0_SAIP2.3_BERTLV_SUCI_NoRAMRFM.der.

Change-Id: Ia4e2356d0241d7a6ca399ba7e8be7f27ec836104
2026-06-26 21:47:48 +02:00
Neels Hofmeyr a23f06ba4c typo 'concetenation' in personalization.py
Change-Id: I51345db014335e8a70a7437a9cad5a3e47570a95
2026-06-26 21:47:48 +02:00
Neels Hofmeyr 0ed72f6ee7 test_configurable_parameters: test less templates
Change-Id: Ib75b6919a3acfddd99bf9baa9b6847ef731b9e67
2026-06-26 21:47:48 +02:00
Neels Hofmeyr c5f6ba83c2 saip BatchPersonalization: call rebuild_mandatory_services()
Particular reason: when manipulating the 5G SUCI parameters, the
mandatory services get-identity, profile-a-x25519 and profile-b-p256 may
need to be reconfigured.

In general, it is a good idea to run these checks anyway.

Change-Id: I5e6eef0f1845a25cddb03af8d16c40e305bcdc1f
2026-06-26 20:22:59 +02:00
Neels Hofmeyr d23e3fdd44 saip.PES.rebuild_mandatory_services(): set 5G get-identity, profile-a-x25519, profile-b-p256
Related: SYS#8096 SYS#8037
Change-Id: Ibc29c6437c5c92e2b14938b733156536863465c1
2026-06-26 20:21:10 +02:00
Alexander Couzens d0e6a1b119 euicc: get_profiles_info: add additional tags
Add definitions for ProfileOwner (decoded),
Notification Configuration Info, SM-DP+ proprietary data,
Profile Policy Rules.

Change-Id: I727dbe34d87a42bb3b526bd7a8accd687d20a208
2026-06-22 17:03:04 +00:00
Alexander Couzens 980282cc12 euicc: extend get_profiles_info to retrieve all known tags
get_profiles_info only request for the default tag list, but
not all tags.
Add --all to the function to request for all known tags.

Change-Id: Ia6878519a480bd625bb1fa2567c1fd2e0e89b071
2026-06-22 17:03:04 +00:00
Neels Hofmeyr 728940efb2 saip: add numeric_base indicator to ConfigurableParameter
By default, numeric_base = None, to indicate that there are no explicit
limitations on the number space.

For parameters that are definitely decimal, set numeric_base = 10.
For definitely hexadecimal, set numeric_base = 16.

Do the same for ConfigurableParameter as well as ParamSource, so callers
can match them up: if a parameter is numeric_base = 10, then omit
sources that are numeric_base = 16, and vice versa.

Change-Id: Ib0977bbdd9a85167be7eb46dd331fedd529dae01
Jenkins: skip-card-test
2026-06-22 19:56:29 +07:00
Neels Hofmeyr cfe2b94f67 saip SmspTpScAddr.get_values_from_pes: allow empty values
Change-Id: Ibbdd08f96160579238b50699091826883f2e9f5a
Jenkins: skip-card-test
2026-06-22 19:56:29 +07:00
Neels Hofmeyr 861ed0a1d8 add comment about not updating existing key_usage_qualifier
Change-Id: Ie23ae5fde17be6b37746784bf1601b4d0874397a
Jenkins: skip-card-test
2026-06-22 19:56:29 +07:00
Neels Hofmeyr b576e8fcff test_configurable_parameters.py: add tests for new parameters
For:
SmspTpScAddr
MilenageRotation
MilenageXoringConstants
TuakNrOfKeccak

Change-Id: Iecbea14fe31a9ee08d871dcde7f295d26d7bd001
Jenkins: skip-card-test
2026-06-22 19:48:52 +07:00
Neels Hofmeyr 38f93d974b SmspTpScAddr: fix SMSP record length and alpha_id padding
apply_val() was re-encoding the SMSP with the minimum total_len of 28,
which produces a 28-byte body with no alpha_id field.  After a DER
round-trip, the profile machinery re-pads the body to the original
record length using the template's fill pattern, which may not be 0xFF.
Those non-0xFF fill bytes end up in the alpha_id area, and GSM 7-bit
decoding then fails with a KeyError when the modified profile is read
back.

Fix by:
- setting alpha_id = '' so the field is present but empty
- setting f_smsp.rec_len = 42 (28 fixed bytes + 14 bytes of alpha_id
  padding) so the re-encoded body carries 0xFF-padded alpha_id space
  and the efFileSize in the fileDescriptor stays consistent
- passing total_len=f_smsp.rec_len to encode_record_bin() so the
  alpha_id area is actually padded to that length

Change-Id: Ief6e02517f3e96158a2509d763b88aec4bd5a296
Jenkins: skip-card-test
2026-06-21 06:17:54 +07:00
Neels Hofmeyr c5e7e59928 ConfigurableParameter: safer val length check
validate_val() calls len() to check the value against allow_len,
min_len and max_len. len() requires the object to have a __len__()
method, which integers do not — calling len() on an int raises
TypeError.

Fix this by checking for __len__ first: if present, use len(val) as
usual; otherwise fall back to len(str(val)), which gives the number
of decimal digits for integer values.

Change-Id: Ibe91722ed1477b00d20ef5e4e7abd9068ff2f3e4
Jenkins: skip-card-test
2026-06-21 03:46:16 +07:00
Neels Hofmeyr 98af3dd2e9 UppAudit: better indicate exception cause
Change-Id: I4d986b89a473a5b12ed56b4710263b034876a33e
Jenkins: skip-card-test
2026-06-21 03:46:16 +07:00
Neels Hofmeyr e9ff4f3b93 personalization: generate sdkey classes from a list
Change-Id: Ic92ddea6e1fad8167ea75baf78ffc3eb419838c4
Jenkins: skip-card-test
2026-06-21 03:46:16 +07:00
Neels Hofmeyr ce039d69ba saip/param_source: try to not repeat random values
Change-Id: I4fa743ef5677580f94b9df16a5051d1d178edeb0
Jenkins: skip-card-test
2026-06-21 03:46:16 +07:00
Neels Hofmeyr aad92f2b73 param_source: use secrets.SystemRandom as secure random nr source
secrets.SystemRandom is defined as the most secure random source
available on the given operating system.

Change-Id: I8049cd1292674b3ced82b0926569128535af6efe
Jenkins: skip-card-test
2026-06-21 03:46:11 +07:00
Neels Hofmeyr 512aba8b1d param_source: use random.SystemRandom as random nr source
Python's random module uses a PRNG (Mersenne Twister) which is
utterly insecure for key generation - it was so far only used for
testing.  Replace it with random.SystemRandom(), which draws from
/dev/urandom and is suitable for generating cryptographic key material.

Change-Id: I6de38c14ac6dd55bc84d53974192509c18d02bfa
Jenkins: skip-card-test
2026-06-21 03:42:06 +07:00
Neels Hofmeyr b5ba274583 add test_param_src.py
Change-Id: I03087b84030fddae98b965e0075d44e04ec6ba5c
Jenkins: skip-card-test
2026-06-21 03:30:02 +07:00
Neels Hofmeyr 4307cffc82 param_source: allow plugging a random implementation (for testing)
Change-Id: Idce2b18af70c17844d6f09f7704efc869456ac39
Jenkins: skip-card-test
2026-06-21 03:30:02 +07:00
Neels Hofmeyr bfdfcad22c personalization: add int as input type for BinaryParameter
Change-Id: I31d8142cb0847a8b291f8dc614d57cb4734f0190
Jenkins: skip-card-test
2026-06-21 03:30:02 +07:00
Neels Hofmeyr ef0a2fcb37 personalization.ConfigurableParameter: fix BytesIO() input
Change-Id: I0ad160eef9015e76eef10baee7c6b606fe249123
Jenkins: skip-card-test
2026-06-21 03:30:02 +07:00
Neels Hofmeyr 3974e96933 add test_configurable_parameters.py
Add ConfigurableParameterTest, which applies each parameter to a real
UPP DER template and reads it back, comparing results against a stored
expected-output snapshot (xo/test_configurable_parameters).

Add TestValidateVal covering validate_val() for Iccid, Imsi, Pin1, Puk1
and K, testing both valid inputs and invalid ones expected to raise
ValueError.

Add TestEnumParam covering the EnumParam methods (validate_val,
map_name_to_val, map_val_to_name, name_normalize, clean_name_str) using
AlgorithmID as the concrete subclass, including fuzzy name matching.

Also add get_value_from_pes() to ConfigurableParameter as a convenience
wrapper around get_values_from_pes() that asserts all returned values
are identical and returns the single result.

Change-Id: Ia55f0d11f8197ca15a948a83a34b3488acf1a0b4
Co-authored-by: Vadim Yanitskiy <vyanitskiy@sysmocom.de>
Jenkins: skip-card-test
2026-06-21 03:30:02 +07:00
Neels Hofmeyr a7c762eb2e ConfigurableParameter: do not magically overwrite the 'name' attribute
The ClassVarMeta metaclass used to derive each ConfigurableParameter's
'name' attribute automatically from the Python class name (via
camel_to_snake()).  Stop doing this, for three reasons:

1) Python class names follow constraints that do not fit the naming
   commonly used in CSV files.  For example, a name like
   "5GS-SUCI-CalcInfo" starts with a digit and contains dashes,
   neither of which is permissible in a class name.

2) Python class names live in their own namespace, distinct from the
   one used to present eSIM parameters to end users.  Deriving the UI
   name from the class name couples these two namespaces together.

   Taken together, (1) and (2) mean that automatic naming both imposes
   class-name constraints on the user-visible names and merges the
   internal Python namespace with the publicly shown one - a layer
   violation from the perspective of UI design.

3) Overriding 'name' from __new__() makes manual naming impossible: a
   subclass that sets 'name = "bar"' as a class attribute would still
   end up with the value computed by the metaclass, which is
   surprising and hard to track down:

       class MySuper(metaclass=...):  # __new__ sets name = 'foo'
           ...
       class MySub(MySuper):
           name = 'bar'
       print(MySub().name)  # 'foo', not 'bar' as one would expect

Change-Id: I6f631444c6addeb7ccc5f6c55b9be3dc83409169
Jenkins: skip-card-test
2026-06-21 03:29:38 +07:00
Neels Hofmeyr 710a27d6cf personalization audit: optionally audit all (unknown) SD keys
By a flag, allow to audit also all Security Domain KVN that we have
*not* created ConfigurableParameter subclasses for.

For example, SCP80 has reserved kvn 0x01..0x0f, but we offer only
Scp80Kvn01, Scp80Kvn02, Scp80Kvn03. So we would not show kvn
0x04..0x0f in an audit.

This patch includes audits of all SD key kvn there may be in the UPP.
This will help to spot SD keys that may already be present in a UPP
template, with unexpected / unusual kvn.

Change-Id: Icaf6f7b589f117868633c0968a99f2f0252cf612
Jenkins: skip-card-test
2026-06-19 10:37:56 +00:00
Neels Hofmeyr 08f40db8a3 personalization: implement UppAudit and BatchAudit
Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a
Jenkins: skip-card-test
2026-06-19 10:37:56 +00:00
kukutyin 4fb393e6ea fix(ts_51_011): fix lifecycle decoding
- implement proper LCS decoding per TS 102 221 / TS 31.101
- previous implementation misclassified multiple states

Related: OS#7018
Change-Id: I8a3bd820b9fbc13c025f8302d1d2eac21686c541
2026-06-08 11:54:32 +00:00
kukutyin ce5da32a75 fix(ts_51_011): apply correct access conditions length
When the access conditions are extracted from resp_bin, the wrong length
is used and only 2 bytes instead of 3 are extracted.

3GPP TS 51.011, section 9.2.1, table below "Response parameters/data
in case of an EF", clearly states that the length should be 3 bytes
(position 9-11)

Related: OS#7018
Change-Id: I410fb58c395beafba8de6d5ab4e71452f424cdf2
2026-06-08 11:54:28 +00:00
Philipp Maier 9ddd235a2c pySim/ts_51_011: rewrite comment for better understanding
The comment reads like that we were applying TS 102.221 here, but we only
mean our internal decoding format. The spec that actually matters here is
TS 51.011. Let's rephrase the comment so that this becomes more clear.

Related: OS#7018
Change-Id: Ie0184eea25f4d9f4baf9ab137c53a926edba2bf8
2026-06-03 08:46:02 +00:00
Philipp Maier 77eb30a782 unittests: add testcases for decode_select_response
A CardProfile class usually contains a static method decode_select_response.
Unfortunately those methods have no unit-test coverage yet. Let's add unit
tests for the decoders in CardProfileSIM and CardProfileUICC.

Related: OS#7018
Change-Id: Id2b5e005d7ad30d56c5c936e612600213620a0ed
2026-06-03 08:46:02 +00:00
YanTong C 2530329ae2 osmo-smdpp.py: use commonpath in transversal check
Use commonpath, as commonprefix allows accessing a sibiling directory
with the same prefix.

Change-Id: I7a42b40aa2bbcd5f0ec99f172503354c6eaa9828
2026-05-27 10:23:13 +02:00
Philipp Maier f9e4291a43 pySim/ara_m, cosmetic: swap --nfc-always and --nfc-never options
The commandline options --nfc-always and --nfc-never appear in the
opposite order when compared to --apdu-never and --apdu-always.

Let's swap the options to make the helpscreen and the code more
consistent.

Change-Id: I7289c3628b1b8dd3eec2f1c8f2132e3015422960
Related: SYS#6959
2026-05-13 15:49:20 +02:00
Philipp Maier 20538775b2 pySim/scp: migrate to pySimLogger
The module scp.py predates the existence of the pySimLogger and still
uses an individually created logger. Let's migrate to pySimLogger to
avoid unexpected effects and to be uniform with the other modules.

Related: SYS#6959
Change-Id: I5db7180f93f116dd2d99c33da264f74ea16a1a37
2026-05-08 17:54:33 +02:00
Philipp Maier ef58c94dfe pySim/filesystem: use pySimLogger instead of print
let's replace the stray print statements with proper logger calls.

Related: SYS#6959
Change-Id: I3a7188ad33706df66b2113e15cc7d06004c9bc39
2026-05-08 17:54:33 +02:00
Philipp Maier 810c51c38f pySim/app: use pySimLogger instead of print
let's replace the stray print statements with proper logger calls.

Related: SYS#6959
Change-Id: I95b4536cc8853e7ba6a5dd573b903dfb85e56b9a
2026-05-08 17:54:33 +02:00
Philipp Maier 66d3b54f92 pySimLogger: fix default log format string
In format string we prepend when we log in verbose mode. We use %(module)s
as format string quaifier. This qualifier is replaced with the name of the
module from where the logger was called. This is mostly equal to the logger
name (__name__) we pass when we create the logger.

However, this is not the behavior we actually want. We want to log the
logger name that we passed when the logger was created. For this, we must
use %(name)s as qualifier.

Related: SYS#6959
Change-Id: I3951a70ad6ce864a7158b093cba46ae9fc1cb5bd
2026-05-08 17:54:33 +02:00
Philipp Maier 7d11f91778 card_key_provider: add a static method to parse --column-keys args
The contents of the --column-keys arguments are currently parsed
in init_card_key_provider. Let's add a static method in
CardKeyFieldCryptor to simplify re-usage of the CardKeyFieldCryptor

Related: SYS#6959
Change-Id: Ic955f271b1de1b1b855b21c82ed10343044e45fa
2026-05-08 17:54:33 +02:00
Philipp Maier 58a324126e card_key_provider: pass CardKeyFieldCryptor to constructor
We currently create the CardKeyFieldCryptor object inside the constructor
of the concrete CardKeyProvider classes. There is currently no problem
with that, but when we create the CardKeyFieldCryptor object first and
then pass it as parameter to the constructor, we gain more flexibility
in case we want to support other CardKeyFieldCryptor variants in the
future.

Related: SYS#6959
Change-Id: If43552740aadacab9126f8a002749a9582eef8f4
2026-05-08 17:54:33 +02:00
Philipp Maier 3cd5c41fb4 card_key_provider: move boiler-plate code into helper functions
in pySim-shell.py we add the commandline options for the card key
provider and do the setup accordingly. Let's put this boilerplate
code into helper functions instead, so that we can re-use it in
other pySim programs as well. Let's use pySim.transport as a
pattern.

Related: SYS#6959
Change-Id: I6d095cbb644e608f4a751a1d0749b1484cdc781d
2026-05-08 17:54:33 +02:00
Philipp Maier 593bfa0911 ts_51_011/EF.SMSP: fix handling of 'alpha_id' field
The field 'alpha_id' is technically not an optional field, even though
the specification describes it as optional. Once the card manufacturer
decides that the field should be present, it must be always present and
vice versa.

(see code comment for a more detailed description)

Related: SYS#7765
Change-Id: I0ec99b2648b22c56f9145345e4cd8776f9217701
2026-04-29 19:39:14 +00:00
Philipp Maier 8fa7727a14 pySim-prog/cards: fix programming of EF.SMSP
The legacy code found in legacy/cards.py does not use the modern
construct based encoder (pySim-read uses it). The card classes either
use their own implementation of update_smsp or use the generic method
provided by the SimCard class. The latter one is true for FairwavesSIM
and WavemobileSim.

Unfortunately the implementation found in the SimCard is wrong. It
adds padding at the end of the file instead of the beginning. This
completely messes up the contents of EF.SMSP for the cards using this
method. To fix this, let's use the leftpad feature provided by
the update_record. This will ensure a correct alignment of the file
contents.

Related: SYS#7765
Change-Id: Ie112418f1f1461762d61365d3863181ca6be7245
2026-04-29 19:39:14 +00:00
Philipp Maier f1609424de pySim/transport: fix GET RESPONSE behaviour
The current behavior we implement in the method __send_apdu_T0 is
incomplete. Some details discussed in ETSI TS 102 221,
section 7.3.1.1.4, clause 4 seem to be not fully implemented. We
may also end up sending a GET RESPONSE in other APDU cases than
case 4 (the only case that uses the GET RESPONSE command).

Related: OS#6970
Change-Id: I26f0566af0cdd61dcc97f5f502479dc76adc37cc
2026-04-29 19:34:27 +00:00
Neels Hofmeyr 1167b65e2a comment in uicc.py on Security Domain Keys: add SCP81
Change-Id: Ib0205880f58e78c07688b4637abd5f67ea0570d1
Jenkins: skip-card-test
2026-04-25 05:15:14 +07:00
Neels Hofmeyr cd4b01f67e personalization: fix SdKey.apply_val() implementation
'securityDomain' elements are decoded to ProfileElementSD instances,
which keep higher level representations of the key data apart from the
decoded[] lists.

So far, apply_val() was dropping binary values in decoded[], which does
not work, because ProfileElementSD._pre_encode() overwrites
self.decoded[] from the higher level representation.

Implement using
- ProfileElementSD.find_key() and SecurityDomainKeyComponent to modify
  an exsiting entry, or
- ProfileElementSD.add_key() to create a new entry.

Before this patch, SdKey parameters seemed to patch PES successfully,
but their modifications did not end up in the encoded DER.

(BTW, this does not fix any other errors that may still be present in
the various SdKey subclasses, patches coming up.)

Related: SYS#6768
Change-Id: I07dfc378705eba1318e9e8652796cbde106c6a52
Jenkins: skip-card-test
2026-04-25 05:15:14 +07:00
Neels Hofmeyr 393de033d3 personalization: add get_typical_input_len() to ConfigurableParameter
The aim is to tell a user interface how wide an input text field should
be chosen to be convenient -- ideally showing the entire value in all
cases, but not too huge for fields that have no sane size limit.

Change-Id: I2568a032167a10517d4d75d8076a747be6e21890
Jenkins: skip-card-test
2026-04-25 05:15:14 +07:00
Neels Hofmeyr 5f1c7d603c personalization: make AlgorithmID a new EnumParam
The AlgorithmID has a few preset values, and hardly anyone knows which
is which. So instead of entering '1', '2' or '3', make it work with
prededined values 'Milenage', 'TUAK' and 'usim-test'.

Implement the enum value part abstractly in new EnumParam.

Make AlgorithmID a subclass of EnumParam and define the values as from
pySim/esim/asn1/saip/PE_Definitions-3.3.1.asn

Related: SYS#6768
Change-Id: I71c2ec1b753c66cb577436944634f32792353240
Jenkins: skip-card-test
2026-04-25 05:15:14 +07:00
Neels Hofmeyr d7072e9263 personalization: indicate default ParamSource per ConfigurableParameter
Add default_source class members pointing to ParamSource classes to all
ConfigurableParameter subclasses.

This is useful to automatically set up a default ParamSource for a given
ConfigurableParameter subclass, during user interaction to produce a
batch personalization.

For example, if the user selects a Pin1 parameter, a calling program can
implicitly set this to a RandomDigitSource, which will magically make it
work the way that most users need.

BTW, default_source and default_value can be combined to configure a
matching ParamSource instance:

  my_source = MyParam.default_source.from_str( MyParam.default_value )

Change-Id: Ie58d13bce3fa1aa2547cf3cee918c2f5b30a8b32
Jenkins: skip-card-test
2026-04-25 05:15:14 +07:00
Neels Hofmeyr ac593bb14d personalization: implement reading back values from a PES
Implement get_values_from_pes(), the reverse direction of apply_val():
read back and return values from a ProfileElementSequence. Implement for
all ConfigurableParameter subclasses.

Future: SdKey.get_values_from_pes() is reading pe.decoded[], which works
fine, but I07dfc378705eba1318e9e8652796cbde106c6a52 will change this
implementation to use the higher level ProfileElementSD members.

Implementation detail:

Implement get_values_from_pes() as classmethod that returns a generator.
Subclasses should yield all occurences of their parameter in a given
PES.

For example, the ICCID can appear in multiple places.
Iccid.get_values_from_pes() yields all of the individual values. A set()
of the results quickly tells whether the PES is consistent.

Rationales for reading back values:

This allows auditing an eSIM profile, particularly for producing an
output.csv from a batch personalization (that generated lots of random
key material which now needs to be fed to an HLR...).

Reading back from a binary result is more reliable than storing the
values that were fed into a personalization.
By auditing final DER results with this code, I discovered:
- "oh, there already was some key material in my UPP template."
- "all IMSIs ended up the same, forgot to set up the parameter."
- the SdKey.apply() implementations currently don't work, see
  I07dfc378705eba1318e9e8652796cbde106c6a52 for a fix.

Change-Id: I234fc4317f0bdc1a486f0cee4fa432c1dce9b463
Jenkins: skip-card-test
2026-04-25 05:15:14 +07:00
Neels Hofmeyr a95622a022 personalization: add param_source.py, add batch.py
Implement pySim.esim.saip.batch.BatchPersonalization,
generating N eSIM profiles from a preset configuration.

Batch parameters can be fed by a constant, incrementing, random or from
CSV rows: add pySim.esim.saip.param_source.* classes to feed such input
to each of the BatchPersonalization's ConfigurableParameter instances.

Related: SYS#6768
Change-Id: I01ae40a06605eb205bfb409189fcd2b3a128855a
Jenkins: skip-card-test
2026-04-25 05:15:14 +07:00
Vadim Yanitskiy 03b58985a5 tests: pySim-smpp2sim_test.sh: fix copy-pasted comment
Change-Id: I8167c6a3251bb6755810c96075010e920ceee8ac
Jenkins: skip-card-test
2026-04-23 23:54:10 +07:00
Vadim Yanitskiy cc71dbf899 contrib/jenkins.sh: add setup_venv()
Reduece code duplication by factoring out virtualenv setup and
activation into a shell function.

Change-Id: Ibb193d12d5502c78104ef53badc6037f08e92df1
2026-04-23 23:54:00 +07:00
Vadim Yanitskiy aafc8d51c3 contrib/jenkins.sh: separate JOB_TYPE for card tests
A separate job gives us a possibility to skip tests requiring physical
cards for specific commits that do not touch the core logic.  See the
related commits in osmo-ci.git.

Change-Id: If76d812ee43b7eb3b57fdc660c60bf31fbff5b16
Related: osmo-ci.git Ia48d1b468f65d7c2e6b4128eeac36d0f3d03c45e
Related: osmo-ci.git I986d88545f64e13cd571ba9ff56bc924822e39a0
2026-04-23 23:52:55 +07:00
32 changed files with 4290 additions and 3212 deletions
+20 -14
View File
@@ -10,6 +10,11 @@
export PYTHONUNBUFFERED=1
setup_venv() {
virtualenv -p python3 venv --system-site-packages
. venv/bin/activate
}
if [ ! -d "./tests/" ] ; then
echo "###############################################"
echo "Please call from pySim-prog top directory"
@@ -23,8 +28,7 @@ fi
case "$JOB_TYPE" in
"test")
virtualenv -p python3 venv --system-site-packages
. venv/bin/activate
setup_venv
pip install -r requirements.txt
pip install pyshark
@@ -32,23 +36,27 @@ case "$JOB_TYPE" in
# Execute automatically discovered unit tests first
python -m unittest discover -v -s tests/unittests
# Run pySim-prog integration tests (requires physical cards)
cd tests/pySim-prog_test/
./pySim-prog_test.sh
cd ../../
# Run pySim-trace test
tests/pySim-trace_test/pySim-trace_test.sh
;;
"card-test") # tests requiring physical cards
setup_venv
# Run pySim-shell integration tests (requires physical cards)
pip install -r requirements.txt
# Run pySim-prog integration tests
cd tests/pySim-prog_test/
./pySim-prog_test.sh
cd ../../
# Run pySim-shell integration tests
python3 -m unittest discover -v -s ./tests/pySim-shell_test/
# Run pySim-smpp2sim test
tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
;;
"distcheck")
virtualenv -p python3 venv --system-site-packages
. venv/bin/activate
setup_venv
pip install .
pip install pyshark
@@ -61,8 +69,7 @@ case "$JOB_TYPE" in
# Print pylint version
pip3 freeze | grep pylint
virtualenv -p python3 venv --system-site-packages
. venv/bin/activate
setup_venv
pip install .
@@ -80,8 +87,7 @@ case "$JOB_TYPE" in
contrib/*.py
;;
"docs")
virtualenv -p python3 venv --system-site-packages
. venv/bin/activate
setup_venv
pip install -r requirements.txt
+1 -1
View File
@@ -640,7 +640,7 @@ class SmDppHttpServer:
# look up profile based on matchingID. We simply check if a given file exists for now..
path = os.path.join(self.upp_dir, matchingId) + '.der'
# prevent directory traversal attack
if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir:
if os.path.commonpath((os.path.realpath(path),self.upp_dir)) != self.upp_dir:
raise ApiError('8.2.6', '3.8', 'Refused')
if not os.path.isfile(path) or not os.access(path, os.R_OK):
raise ApiError('8.2.6', '3.8', 'Refused')
+5 -24
View File
@@ -69,8 +69,8 @@ from pySim.ts_102_222 import Ts102222Commands
from pySim.gsm_r import DF_EIRENE
from pySim.cat import ProactiveCommand
from pySim.card_key_provider import CardKeyProviderCsv, CardKeyProviderPgsql
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
from pySim.card_key_provider import card_key_provider_argparse_add_args, card_key_provider_init
from pySim.card_key_provider import card_key_provider_get_field, card_key_provider_get
from pySim.app import init_card
@@ -1146,18 +1146,6 @@ global_group.add_argument("--skip-card-init", help="Skip all card/profile initia
global_group.add_argument("--verbose", help="Enable verbose logging",
action='store_true', default=False)
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
card_key_group.add_argument('--csv', metavar='FILE',
default="~/.osmocom/pysim/card_data.csv",
help='Read card data from CSV file')
card_key_group.add_argument('--pgsql', metavar='FILE',
default="~/.osmocom/pysim/card_data_pgsql.cfg",
help='Read card data from PostgreSQL database (config file)')
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help=argparse.SUPPRESS, dest='column_key')
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help='per-column AES transport key', dest='column_key')
adm_group = global_group.add_mutually_exclusive_group()
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
help='ADM PIN used for provisioning (overwrites default)')
@@ -1170,6 +1158,7 @@ option_parser.add_argument("command", nargs='?',
help="A pySim-shell command that would optionally be executed at startup")
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
help="Optional Arguments for command")
card_key_provider_argparse_add_args(option_parser)
if __name__ == '__main__':
startup_errors = False
@@ -1178,16 +1167,8 @@ if __name__ == '__main__':
# Ensure that we are able to print formatted warnings from the beginning.
PySimLogger.setup(print, {logging.WARN: YELLOW}, opts.verbose)
# Register csv-file as card data provider, either from specified CSV
# or from CSV file in home directory
column_keys = {}
for par in opts.column_key:
name, key = par.split(':')
column_keys[name] = key
if os.path.isfile(os.path.expanduser(opts.csv)):
card_key_provider_register(CardKeyProviderCsv(os.path.expanduser(opts.csv), column_keys))
if os.path.isfile(os.path.expanduser(opts.pgsql)):
card_key_provider_register(CardKeyProviderPgsql(os.path.expanduser(opts.pgsql), column_keys))
# Init card key provider for automatic card key retrieval
card_key_provider_init(opts)
# Init card reader driver
sl = init_reader(opts, proactive_handler = Proact())
+7 -4
View File
@@ -26,6 +26,9 @@ from pySim.cdma_ruim import CardProfileRUIM
from pySim.ts_102_221 import CardProfileUICC
from pySim.utils import all_subclasses
from pySim.exceptions import SwMatchError
from pySim.log import PySimLogger
log = PySimLogger.get(__name__)
# we need to import this module so that the SysmocomSJA2 sub-class of
# CardModel is created, which will add the ATR-based matching and
@@ -54,7 +57,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
# Wait up to three seconds for a card in reader and try to detect
# the card type.
print("Waiting for card...")
log.info("Waiting for card...")
sl.wait_for_card(3)
# The user may opt to skip all card initialization. In this case only the
@@ -66,7 +69,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
generic_card = False
card = card_detect(scc)
if card is None:
print("Warning: Could not detect card type - assuming a generic card type...")
log.warning("Could not detect card type - assuming a generic card type...")
card = SimCardBase(scc)
generic_card = True
@@ -76,7 +79,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
# just means that pySim was unable to recognize the card profile. This
# may happen in particular with unprovisioned cards that do not have
# any files on them yet.
print("Unsupported card type!")
log.warning("Unsupported card type!")
return None, card
# ETSI TS 102 221, Table 9.3 specifies a default for the PIN key
@@ -87,7 +90,7 @@ def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState,
if generic_card and isinstance(profile, CardProfileUICC):
card._adm_chv_num = 0x0A
print("Info: Card is of type: %s" % str(profile))
log.info("Card is of type: %s", str(profile))
# FIXME: this shouldn't really be here but somewhere else/more generic.
# We cannot do it within pySim/profile.py as that would create circular
+2 -2
View File
@@ -334,10 +334,10 @@ class ADF_ARAM(CardADF):
apdu_grp.add_argument(
'--apdu-filter', help='APDU filter: multiple groups of 8 hex bytes (4 byte CLA/INS/P1/P2 followed by 4 byte mask)')
nfc_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
nfc_grp.add_argument('--nfc-always', action='store_true',
help='NFC event access is allowed')
nfc_grp.add_argument('--nfc-never', action='store_true',
help='NFC event access is not allowed')
nfc_grp.add_argument('--nfc-always', action='store_true',
help='NFC event access is allowed')
store_ref_ar_do_parse.add_argument(
'--android-permissions', help='Android UICC Carrier Privilege Permissions (8 hex bytes)')
+69 -6
View File
@@ -33,10 +33,12 @@ from Cryptodome.Cipher import AES
from osmocom.utils import h2b, b2h
from pySim.log import PySimLogger
import os
import abc
import csv
import logging
import yaml
import argparse
log = PySimLogger.get(__name__)
@@ -130,6 +132,31 @@ class CardKeyFieldCryptor:
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
return b2h(cipher.encrypt(h2b(plaintext_val)))
@staticmethod
def argparse_add_args(arg_parser: argparse.ArgumentParser):
arg_parser.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help='per-column AES transport key', dest='column_key')
# Depprecated argument, replaced by --column-key (see above)
arg_parser.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
help=argparse.SUPPRESS, dest='column_key')
@staticmethod
def transport_keys_from_opts(opts: argparse.Namespace) -> dict:
"""
Transport keys are passed via the commandline using the '--column-key' option. Each column requires a
dedicated transport key. This method can be used to extract the column keys parameters from the commandline
options into a dict that can be directly passed to the construtor with the transport_keys argument.
Args:
opts: parsed commandline options (Namespace)
"""
transport_keys = {}
for par in opts.column_key:
name, key = par.split(':')
transport_keys[name] = key
return transport_keys
class CardKeyProvider(abc.ABC):
"""Base class, not containing any concrete implementation."""
@@ -148,24 +175,33 @@ class CardKeyProvider(abc.ABC):
fond None shall be returned.
"""
@staticmethod
def argparse_add_args(arg_parser: argparse.ArgumentParser):
"""
Add the commandline arguments relevant for this card key provider.
Args:
arg_parser : argument parser group
"""
def __str__(self):
return type(self).__name__
class CardKeyProviderCsv(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified CSV file."""
def __init__(self, csv_filename: str, transport_keys: dict):
def __init__(self, csv_filename: str, field_cryptor: CardKeyFieldCryptor):
"""
Args:
csv_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor)
field_cryptor : (see class CardKeyFieldCryptor)
"""
log.info("Using CSV file as card key data source: %s" % csv_filename)
self.csv_file = open(csv_filename, 'r')
if not self.csv_file:
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
self.csv_filename = csv_filename
self.crypt = CardKeyFieldCryptor(transport_keys)
self.crypt = field_cryptor
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
self.csv_file.seek(0)
@@ -188,14 +224,20 @@ class CardKeyProviderCsv(CardKeyProvider):
return None
return return_dict
@staticmethod
def argparse_add_args(arg_parser: argparse.ArgumentParser):
arg_parser.add_argument('--csv', metavar='FILE',
default="~/.osmocom/pysim/card_data.csv",
help='Read card data from CSV file')
class CardKeyProviderPgsql(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified PostgreSQL database table."""
def __init__(self, config_filename: str, transport_keys: dict):
def __init__(self, config_filename: str, field_cryptor: CardKeyFieldCryptor):
"""
Args:
config_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor)
field_cryptor : (see class CardKeyFieldCryptor)
"""
import psycopg2
log.info("Using SQL database as card key data source: %s" % config_filename)
@@ -212,7 +254,7 @@ class CardKeyProviderPgsql(CardKeyProvider):
host=config.get('host'))
self.tables = config.get('table_names')
log.info("Card key database tables: %s" % str(self.tables))
self.crypt = CardKeyFieldCryptor(transport_keys)
self.crypt = field_cryptor
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
import psycopg2
@@ -252,6 +294,11 @@ class CardKeyProviderPgsql(CardKeyProvider):
result[k] = self.crypt.decrypt_field(k, result.get(k))
return result
@staticmethod
def argparse_add_args(arg_parser: argparse.ArgumentParser):
arg_parser.add_argument('--pgsql', metavar='FILE',
default="~/.osmocom/pysim/card_data_pgsql.cfg",
help='Read card data from PostgreSQL database (config file)')
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
"""Register a new card key provider.
@@ -305,3 +352,19 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
fields = [field]
result = card_key_provider_get(fields, key, value, card_key_providers)
return result.get(field.upper())
def card_key_provider_argparse_add_args(arg_parser: argparse.ArgumentParser):
"""Add card key provider commandline options to the given argument parser"""
card_key_group = arg_parser.add_argument_group('Card Key Provider Options')
CardKeyProviderCsv.argparse_add_args(card_key_group)
CardKeyProviderPgsql.argparse_add_args(card_key_group)
CardKeyFieldCryptor.argparse_add_args(card_key_group)
def card_key_provider_init(opts: argparse.Namespace):
"""Initialize card key provider depending on the user provided commandline options"""
transport_keys = CardKeyFieldCryptor.transport_keys_from_opts(opts)
card_key_field_cryptor = CardKeyFieldCryptor(transport_keys)
if os.path.isfile(os.path.expanduser(opts.csv)):
card_key_provider_register(CardKeyProviderCsv(os.path.expanduser(opts.csv), card_key_field_cryptor))
if os.path.isfile(os.path.expanduser(opts.pgsql)):
card_key_provider_register(CardKeyProviderPgsql(os.path.expanduser(opts.pgsql), card_key_field_cryptor))
+11 -134
View File
@@ -16,12 +16,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import requests
from klein import Klein
from twisted.internet import defer, protocol, ssl, task, endpoints, reactor
from twisted.internet.posixbase import PosixReactorBase
from pathlib import Path
from twisted.web.server import Site, Request
import logging
from datetime import datetime
import time
@@ -129,12 +123,10 @@ class Es2PlusApiFunction(JsonHttpApiFunction):
class DownloadOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/downloadOrder'
input_params = {
'header': JsonRequestHeader,
'eid': param.Eid,
'iccid': param.Iccid,
'profileType': param.ProfileType
}
input_mandatory = ['header']
output_params = {
'header': JsonResponseHeader,
'iccid': param.Iccid,
@@ -145,7 +137,6 @@ class DownloadOrder(Es2PlusApiFunction):
class ConfirmOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/confirmOrder'
input_params = {
'header': JsonRequestHeader,
'iccid': param.Iccid,
'eid': param.Eid,
'matchingId': param.MatchingId,
@@ -153,7 +144,7 @@ class ConfirmOrder(Es2PlusApiFunction):
'smdsAddress': param.SmdsAddress,
'releaseFlag': param.ReleaseFlag,
}
input_mandatory = ['header', 'iccid', 'releaseFlag']
input_mandatory = ['iccid', 'releaseFlag']
output_params = {
'header': JsonResponseHeader,
'eid': param.Eid,
@@ -166,13 +157,12 @@ class ConfirmOrder(Es2PlusApiFunction):
class CancelOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/cancelOrder'
input_params = {
'header': JsonRequestHeader,
'iccid': param.Iccid,
'eid': param.Eid,
'matchingId': param.MatchingId,
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
}
input_mandatory = ['header', 'finalProfileStatusIndicator', 'iccid']
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
output_params = {
'header': JsonResponseHeader,
}
@@ -182,10 +172,9 @@ class CancelOrder(Es2PlusApiFunction):
class ReleaseProfile(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/releaseProfile'
input_params = {
'header': JsonRequestHeader,
'iccid': param.Iccid,
}
input_mandatory = ['header', 'iccid']
input_mandatory = ['iccid']
output_params = {
'header': JsonResponseHeader,
}
@@ -195,7 +184,6 @@ class ReleaseProfile(Es2PlusApiFunction):
class HandleDownloadProgressInfo(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
input_params = {
'header': JsonRequestHeader,
'eid': param.Eid,
'iccid': param.Iccid,
'profileType': param.ProfileType,
@@ -204,9 +192,10 @@ class HandleDownloadProgressInfo(Es2PlusApiFunction):
'notificationPointStatus': param.NotificationPointStatus,
'resultData': param.ResultData,
}
input_mandatory = ['header', 'iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
expected_http_status = 204
class Es2pApiClient:
"""Main class representing a full ES2+ API client. Has one method for each API function."""
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
@@ -217,17 +206,18 @@ class Es2pApiClient:
if client_cert:
self.session.cert = client_cert
self.downloadOrder = JsonHttpApiClient(DownloadOrder(), url_prefix, func_req_id, self.session)
self.confirmOrder = JsonHttpApiClient(ConfirmOrder(), url_prefix, func_req_id, self.session)
self.cancelOrder = JsonHttpApiClient(CancelOrder(), url_prefix, func_req_id, self.session)
self.releaseProfile = JsonHttpApiClient(ReleaseProfile(), url_prefix, func_req_id, self.session)
self.handleDownloadProgressInfo = JsonHttpApiClient(HandleDownloadProgressInfo(), url_prefix, func_req_id, self.session)
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
def _gen_func_id(self) -> str:
"""Generate the next function call id."""
self.func_id += 1
return 'FCI-%u-%u' % (time.time(), self.func_id)
def call_downloadOrder(self, data: dict) -> dict:
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
return self.downloadOrder.call(data, self._gen_func_id())
@@ -247,116 +237,3 @@ class Es2pApiClient:
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())
class Es2pApiServerHandlerSmdpp(abc.ABC):
"""ES2+ (SMDP+ side) API Server handler class. The API user is expected to override the contained methods."""
@abc.abstractmethod
def call_downloadOrder(self, data: dict) -> (dict, str):
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
pass
@abc.abstractmethod
def call_confirmOrder(self, data: dict) -> (dict, str):
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
pass
@abc.abstractmethod
def call_cancelOrder(self, data: dict) -> (dict, str):
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
pass
@abc.abstractmethod
def call_releaseProfile(self, data: dict) -> (dict, str):
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
pass
class Es2pApiServerHandlerMno(abc.ABC):
"""ES2+ (MNO side) API Server handler class. The API user is expected to override the contained methods."""
@abc.abstractmethod
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
pass
class Es2pApiServer(abc.ABC):
"""Main class representing a full ES2+ API server. Has one method for each API function."""
app = None
def __init__(self, port: int, interface: str, server_cert: str = None, client_cert_verify: str = None):
logger.debug("HTTP SRV: starting ES2+ API server on %s:%s" % (interface, port))
self.port = port
self.interface = interface
if server_cert:
self.server_cert = ssl.PrivateCertificate.loadPEM(Path(server_cert).read_text())
else:
self.server_cert = None
if client_cert_verify:
self.client_cert_verify = ssl.Certificate.loadPEM(Path(client_cert_verify).read_text())
else:
self.client_cert_verify = None
def reactor(self, reactor: PosixReactorBase):
logger.debug("HTTP SRV: listen on %s:%s" % (self.interface, self.port))
if self.server_cert:
if self.client_cert_verify:
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(self.client_cert_verify),
interface=self.interface)
else:
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(),
interface=self.interface)
else:
reactor.listenTCP(self.port, Site(self.app.resource()), interface=self.interface)
return defer.Deferred()
class Es2pApiServerSmdpp(Es2pApiServer):
"""ES2+ (SMDP+ side) API Server."""
app = Klein()
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerSmdpp,
server_cert: str = None, client_cert_verify: str = None):
super().__init__(port, interface, server_cert, client_cert_verify)
self.handler = handler
self.downloadOrder = JsonHttpApiServer(DownloadOrder(), handler.call_downloadOrder)
self.confirmOrder = JsonHttpApiServer(ConfirmOrder(), handler.call_confirmOrder)
self.cancelOrder = JsonHttpApiServer(CancelOrder(), handler.call_cancelOrder)
self.releaseProfile = JsonHttpApiServer(ReleaseProfile(), handler.call_releaseProfile)
task.react(self.reactor)
@app.route(DownloadOrder.path)
def call_downloadOrder(self, request: Request) -> dict:
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
return self.downloadOrder.call(request)
@app.route(ConfirmOrder.path)
def call_confirmOrder(self, request: Request) -> dict:
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
return self.confirmOrder.call(request)
@app.route(CancelOrder.path)
def call_cancelOrder(self, request: Request) -> dict:
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
return self.cancelOrder.call(request)
@app.route(ReleaseProfile.path)
def call_releaseProfile(self, request: Request) -> dict:
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
return self.releaseProfile.call(request)
class Es2pApiServerMno(Es2pApiServer):
"""ES2+ (MNO side) API Server."""
app = Klein()
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerMno,
server_cert: str = None, client_cert_verify: str = None):
super().__init__(port, interface, server_cert, client_cert_verify)
self.handler = handler
self.handleDownloadProgressInfo = JsonHttpApiServer(HandleDownloadProgressInfo(),
handler.call_handleDownloadProgressInfo)
task.react(self.reactor)
@app.route(HandleDownloadProgressInfo.path)
def call_handleDownloadProgressInfo(self, request: Request) -> dict:
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
return self.handleDownloadProgressInfo.call(request)
+5 -5
View File
@@ -155,11 +155,11 @@ class Es9pApiClient:
if server_cert_verify:
self.session.verify = server_cert_verify
self.initiateAuthentication = JsonHttpApiClient(InitiateAuthentication(), url_prefix, '', self.session)
self.authenticateClient = JsonHttpApiClient(AuthenticateClient(), url_prefix, '', self.session)
self.getBoundProfilePackage = JsonHttpApiClient(GetBoundProfilePackage(), url_prefix, '', self.session)
self.handleNotification = JsonHttpApiClient(HandleNotification(), url_prefix, '', self.session)
self.cancelSession = JsonHttpApiClient(CancelSession(), url_prefix, '', self.session)
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
self.handleNotification = HandleNotification(url_prefix, '', self.session)
self.cancelSession = CancelSession(url_prefix, '', self.session)
def call_initiateAuthentication(self, data: dict) -> dict:
return self.initiateAuthentication.call(data)
+45 -268
View File
@@ -19,10 +19,8 @@ import abc
import requests
import logging
import json
from typing import Optional, Tuple
from typing import Optional
import base64
from twisted.web.server import Request
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@@ -133,16 +131,6 @@ class JsonResponseHeader(ApiParam):
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
raise ValueError('Unknown/unspecified status "%s"' % status)
class JsonRequestHeader(ApiParam):
"""SGP.22 section 6.5.1.3."""
@classmethod
def verify_decoded(cls, data):
func_req_id = data.get('functionRequesterIdentifier')
if not func_req_id:
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
func_call_id = data.get('functionCallIdentifier')
if not func_call_id:
raise ValueError('Missing mandatory functionCallIdentifier in header')
class HttpStatusError(Exception):
pass
@@ -173,118 +161,65 @@ class ApiError(Exception):
class JsonHttpApiFunction(abc.ABC):
"""Base class for representing an HTTP[s] API Function."""
# The below class variables are used to describe the properties of the API function. Derived classes are expected
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
# prefix.
# the below class variables are expected to be overridden in derived classes
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder', see also method rewrite_url).
path = None
# dictionary of input parameters. key is parameter name, value is ApiParam class
input_params = {}
# list of mandatory input parameters
input_mandatory = []
# dictionary of output parameters. key is parameter name, value is ApiParam class
output_params = {}
# list of mandatory output parameters (for successful response)
output_mandatory = []
# list of mandatory output parameters (for failed response)
output_mandatory_failed = []
# expected HTTP status code of the response
expected_http_status = 200
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
http_method = 'POST'
# additional custom HTTP headers (client requests)
extra_http_req_headers = {}
# additional custom HTTP headers (server responses)
extra_http_res_headers = {}
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
self.url_prefix = url_prefix
self.func_req_id = func_req_id
self.session = session
def __new__(cls, *args, role = 'legacy_client', **kwargs):
"""
Args:
args: (see JsonHttpApiClient and JsonHttpApiServer)
role: role ('server' or 'client') in which the JsonHttpApiFunction should be created.
kwargs: (see JsonHttpApiClient and JsonHttpApiServer)
"""
# Create a dictionary with the class attributes of this class (the properties listed above and the encode_
# decode_ methods below). The dictionary will not include any dunder/magic methods
cls_attr = {attr_name: getattr(cls, attr_name) for attr_name in dir(cls) if not attr_name.startswith('__')}
# Normal instantiation as JsonHttpApiFunction:
if len(args) == 0 and len(kwargs) == 0:
return type(cls.__name__, (abc.ABC,), cls_attr)()
# Instantiation as as JsonHttpApiFunction with a JsonHttpApiClient or JsonHttpApiServer base
if role == 'legacy_client':
# Deprecated: With the advent of the server role (JsonHttpApiServer) the API had to be changed. To maintain
# compatibility with existing code (out-of-tree) the original behaviour and API interface and behaviour had
# to be preserved. Already existing JsonHttpApiFunction definitions will still work and the related objects
# may still be created on the original way: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session)
logger.warning('implicit role (falling back to legacy JsonHttpApiClient) is deprecated, please specify role explcitly')
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
result.api_func = result
result.legacy = True
return result
elif role == 'client':
# Create a JsonHttpApiFunction in client role
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='client')
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
result.api_func = result
return result
elif role == 'server':
# Create a JsonHttpApiFunction in server role
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='server')
result = type(cls.__name__, (JsonHttpApiServer,), cls_attr)(None, *args, **kwargs)
result.api_func = result
return result
else:
raise ValueError('Invalid role \'%s\' specified' % role)
def encode_client(self, data: dict) -> dict:
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
"""Validate an encode input dict into JSON-serializable dict for request body."""
output = {}
if func_call_id:
output['header'] = {
'functionRequesterIdentifier': self.func_req_id,
'functionCallIdentifier': func_call_id
}
for p in self.input_mandatory:
if not p in data:
raise ValueError('Mandatory input parameter %s missing' % p)
for p, v in data.items():
p_class = self.input_params.get(p)
if not p_class:
# pySim/esim/http_json_api.py:269:47: E1101: Instance of 'JsonHttpApiFunction' has no 'legacy' member (no-member)
# pylint: disable=no-member
if hasattr(self, 'legacy') and self.legacy:
output[p] = JsonRequestHeader.encode(v)
else:
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
output[p] = v
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
output[p] = v
else:
output[p] = p_class.encode(v)
return output
def decode_client(self, data: dict) -> dict:
def decode(self, data: dict) -> dict:
"""[further] Decode and validate the JSON-Dict of the response body."""
output = {}
output_mandatory = self.output_mandatory
if 'header' in self.output_params:
# let's first do the header, it's special
if not 'header' in data:
raise ValueError('Mandatory output parameter "header" missing')
hdr_class = self.output_params.get('header')
output['header'] = hdr_class.decode(data['header'])
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
# different set of mandatory parameters applies.
header = data.get('header')
if header:
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
output_mandatory = self.output_mandatory_failed
for p in output_mandatory:
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
raise ApiError(output['header']['functionExecutionStatus'])
# we can only expect mandatory parameters to be present in case of successful execution
for p in self.output_mandatory:
if p == 'header':
continue
if not p in data:
raise ValueError('Mandatory output parameter "%s" missing' % p)
for p, v in data.items():
@@ -296,195 +231,37 @@ class JsonHttpApiFunction(abc.ABC):
output[p] = p_class.decode(v)
return output
def encode_server(self, data: dict) -> dict:
"""Validate an encode input dict into JSON-serializable dict for response body."""
output = {}
output_mandatory = self.output_mandatory
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
# different set of mandatory parameters applies.
header = data.get('header')
if header:
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
output_mandatory = self.output_mandatory_failed
for p in output_mandatory:
if not p in data:
raise ValueError('Mandatory output parameter %s missing' % p)
for p, v in data.items():
p_class = self.output_params.get(p)
if not p_class:
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
output[p] = v
else:
output[p] = p_class.encode(v)
return output
def decode_server(self, data: dict) -> dict:
"""[further] Decode and validate the JSON-Dict of the request body."""
output = {}
for p in self.input_mandatory:
if not p in data:
raise ValueError('Mandatory input parameter "%s" missing' % p)
for p, v in data.items():
p_class = self.input_params.get(p)
if not p_class:
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
output[p] = v
else:
output[p] = p_class.decode(v)
return output
def rewrite_url(self, data: dict, url: str) -> Tuple[dict, str]:
"""
Rewrite a static URL using information passed in the data dict. This method may be overloaded by a derived
class to allow fully dynamic URLs. The input parameters required for the URL rewriting may be passed using
data parameter. In case those parameters are additional parameters that are not intended to be passed to
the encode_client method later, they must be removed explcitly.
Args:
data: (see JsonHttpApiClient and JsonHttpApiServer)
url: statically generated URL string (see comment in JsonHttpApiClient)
"""
# This implementation is a placeholder in which we do not perform any URL rewriting. We just pass through data
# and url unmodified.
return data, url
class JsonHttpApiClient():
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
session: requests.Session):
"""
Args:
api_func : API function definition (JsonHttpApiFunction)
url_prefix : prefix to be put in front of the API function path (see JsonHttpApiFunction)
func_req_id : function requestor id to use for requests
session : session object (requests)
"""
self.api_func = api_func
self.url_prefix = url_prefix
self.func_req_id = func_req_id
self.session = session
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
"""
Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
json-serializable fields. `data` may also contain additional parameters required for URL rewriting (see
rewrite_url in class JsonHttpApiFunction). Output data is returned as json-deserialized dict.
Args:
data: Input data required to perform the request.
func_call_id: Function Call Identifier, if present a header field is generated automatically.
timeout: Maximum amount of time to wait for the request to complete.
"""
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
# field is checked by the encode_client method)
if func_call_id:
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
'functionCallIdentifier': func_call_id}} | data
# The URL used for the HTTP request (see below) normally consists of the initially given url_prefix
# concatenated with the path defined by the JsonHttpApiFunction definition. This static URL path may be
# rewritten by rewrite_url method defined in the JsonHttpApiFunction.
data, url = self.api_func.rewrite_url(data, self.url_prefix + self.api_func.path)
# Encode the message (the presence of mandatory fields is checked during encoding)
encoded = json.dumps(self.api_func.encode_client(data))
# Apply HTTP request headers according to SGP.22, section 6.5.1
"""Make an API call to the HTTP API endpoint represented by this object.
Input data is passed in `data` as json-serializable dict. Output data
is returned as json-deserialized dict."""
url = self.url_prefix + self.path
encoded = json.dumps(self.encode(data, func_call_id))
req_headers = {
'Content-Type': 'application/json',
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
}
req_headers.update(self.api_func.extra_http_req_headers)
req_headers.update(self.extra_http_req_headers)
# Perform HTTP request
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
response = self.session.request(self.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
logger.debug("HTTP RSP: %s" % (response.content))
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
# SGP.22, section 6.5.1)
if response.status_code != self.api_func.expected_http_status:
if response.status_code != self.expected_http_status:
raise HttpStatusError(response)
if response.content and not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
resp_content_type = response.headers.get('Content-Type')
if not resp_content_type.startswith(req_headers['Content-Type']):
raise HttpHeaderError(response)
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
raise HttpHeaderError(response)
# Decode response and return the result back to the caller
if response.content:
output = self.api_func.decode_client(response.json())
# In case the response contains a header, check it to make sure that the API call was executed successfully
# (the presence of the header field is checked by the decode_client method)
if 'header' in output:
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
raise ApiError(output['header']['functionExecutionStatus'])
return output
if resp_content_type.startswith('application/json'):
return self.decode(response.json())
elif resp_content_type.startswith('text/plain;charset=UTF-8'):
return { 'data': response.content.decode('utf-8') }
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
return None
class JsonHttpApiServer():
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
"""
Args:
api_func : API function definition (JsonHttpApiFunction)
call_handler : handler function to process the request. This function must accept the
decoded request as a dictionary. The handler function must return a tuple consisting
of the response in the form of a dictionary (may be empty), and a function execution
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
"""
self.api_func = api_func
if call_handler:
self.call_handler = call_handler
else:
self.call_handler = self.default_handler
def default_handler(self, data: dict) -> (dict, str):
"""default handler, used in case no call handler is provided."""
logger.error("no handler function for request: %s" % str(data))
return {}, 'Failed'
def call(self, request: Request) -> str:
""" Process an incoming request.
Args:
request : request object as received using twisted.web.server
Returns:
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
provided the request object)
"""
# Make sure the request is done with the correct HTTP method
if (request.method.decode() != self.api_func.http_method):
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
# Decode the request
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
# Run call handler (see above)
data, fe_status = self.call_handler(decoded_request)
# In case a function execution status is returned, use it to generate and prepend the header field according to
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
if fe_status:
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
# Encode the message (the presence of mandatory fields is checked during encoding)
encoded = json.dumps(self.api_func.encode_server(data))
# Apply HTTP request headers according to SGP.22, section 6.5.1
res_headers = {
'Content-Type': 'application/json',
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
}
res_headers.update(self.api_func.extra_http_res_headers)
for header, value in res_headers.items():
request.setHeader(header, value)
request.setResponseCode(self.api_func.expected_http_status)
# Return the encoded result back to the caller for sending (using twisted/klein)
return encoded
+56 -4
View File
@@ -34,7 +34,7 @@ from pySim import ts_102_222
from pySim.utils import dec_imsi
from pySim.ts_102_221 import FileDescriptor
from pySim.filesystem import CardADF, Path
from pySim.ts_31_102 import ADF_USIM
from pySim.ts_31_102 import ADF_USIM, EF_UST, EF_SUCI_Calc_Info
from pySim.ts_31_103 import ADF_ISIM
from pySim.esim import compile_asn1_subdir
from pySim.esim.saip import templates
@@ -1517,8 +1517,11 @@ class ProfileElementHeader(ProfileElement):
def mandatory_service_add(self, service_name):
self.decoded['eUICC-Mandatory-services'][service_name] = None
def mandatory_service_present(self, service_name):
return service_name in self.decoded['eUICC-Mandatory-services'].keys()
def mandatory_service_remove(self, service_name):
if service_name in self.decoded['eUICC-Mandatory-services'].keys():
if self.mandatory_service_present(service_name):
del self.decoded['eUICC-Mandatory-services'][service_name]
else:
raise ValueError("service not in eUICC-Mandatory-services list, cannot remove")
@@ -1726,12 +1729,61 @@ class ProfileElementSequence:
if 'BT' in ftype_list:
svc_set.add('ber-tlv')
# FIXME:dfLinked files (scan all files, check for non-empty Fcp.linkPath presence of DFs)
# TODO: 5G related bits (derive from EF.UST or file presence?)
# 5G:
# - When SUCI is:
# - enabled (EF.UST 124 = true)
# AND
# - calculated in the USIM (EF.UST 125 = true),
# then eUICC-Mandatory-services needs 'get-identity'.
# - 'get-identity' implies that the eUICC must support ONE OF profile-A OR profile-B.
# (One might assume from this that, when SUCI-CalcInfo for USIM in DF.SAIP contains both key types, then no
# profile-A or B services need to be requested explicitly. However, the correct logic is:)
# - Iff the SUCI-CalcInfo for USIM (DF.SAIP) contains a key of profile-A ("identifier": 1),
# then eUICC-Mandatory-services needs 'profile-a-x25519'.
# - Same: profile-B ("identifier": 2) needs 'profile-b-p256'.
# - (When SUCI is calculated in the UE, then the eUICC does not need to provide any of these services.)
suci_in_usim_enabled = False
try:
f_ust = self.get_pe_for_type("usim").files["ef-ust"]
ust = EF_UST().decode_bin(f_ust.body)
suci_in_usim_enabled = ust[124]['activated'] and ust[125]['activated']
except (KeyError, AttributeError):
pass
if suci_in_usim_enabled:
svc_set.add('get-identity')
# now check for profile-a and profile-b presence
suci_calcinfo_has_profile_a = False
suci_calcinfo_has_profile_b = False
try:
f_sucici = self.get_pe_for_type("df-saip").files["ef-suci-calc-info-usim"]
sucici = EF_SUCI_Calc_Info().decode_bin(f_sucici.body) or {}
for prot_scheme in sucici['prot_scheme_id_list']:
if not isinstance(prot_scheme, dict):
continue
ps_id = prot_scheme["identifier"]
if ps_id == 1:
suci_calcinfo_has_profile_a = True
elif ps_id == 2:
suci_calcinfo_has_profile_b = True
except (KeyError, AttributeError):
pass
if suci_calcinfo_has_profile_a:
# The profile has a profile-A key, so require that
svc_set.add('profile-a-x25519')
if suci_calcinfo_has_profile_b:
# The profile has a profile-B key, so require that
svc_set.add('profile-b-p256')
hdr_pe = self.get_pe_for_type('header')
# patch in the 'manual' services from the existing list:
old_svc_set = set()
for old_svc in hdr_pe.decoded['eUICC-Mandatory-services'].keys():
if old_svc in manual_services:
svc_set.add(old_svc)
old_svc_set.add(old_svc)
logger.debug(f"{svc_set=} + {old_svc_set=}")
svc_set = svc_set.union(old_svc_set)
logger.debug(f"{svc_set=}")
hdr_pe.decoded['eUICC-Mandatory-services'] = {x: None for x in svc_set}
def rebuild_mandatory_gfstelist(self):
+35 -21
View File
@@ -20,22 +20,32 @@
import copy
import pprint
from typing import List, Generator
import logging
import traceback
import inspect
from typing import Generator, Union
from pySim.esim.saip.personalization import ConfigurableParameter
from pySim.esim.saip import param_source
from pySim.esim.saip import ProfileElementSequence, ProfileElementSD
from pySim.global_platform import KeyUsageQualifier
from osmocom.utils import b2h
logger = logging.getLogger(__name__)
def _func_():
return inspect.currentframe().f_back.f_code.co_name
# a list of ConfigurableParameter classes and/or ConfigurableParameter class instances
ParamList = list[Union[type[ConfigurableParameter], ConfigurableParameter]]
class BatchPersonalization:
"""Produce a series of eSIM profiles from predefined parameters.
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
Usage example:
der_input = some_file.open('rb').read()
der_input = open('some_file', 'rb').read()
pes = ProfileElementSequence.from_der(der_input)
p = pers.BatchPersonalization(
p = BatchPersonalization(
n=10,
src_pes=pes,
csv_rows=get_csv_reader())
@@ -68,7 +78,7 @@ class BatchPersonalization:
def __init__(self,
n: int,
src_pes: ProfileElementSequence,
params: list[ParamAndSrc]=[],
params: list[ParamAndSrc]=None,
csv_rows: Generator=None,
):
"""
@@ -118,8 +128,12 @@ class BatchPersonalization:
value = p.param_cls.validate_val(input_value)
p.param_cls.apply_val(pes, value)
except Exception as e:
print(traceback.format_exc())
logger.error('during %s: %r', _func_(), e)
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e
pes.rebuild_mandatory_services()
yield pes
@@ -131,14 +145,14 @@ class UppAudit(dict):
"""
@classmethod
def from_der(cls, der: bytes, params: List, der_size=False, additional_sd_keys=False):
def from_der(cls, der: bytes, params: ParamList, der_size=False, additional_sd_keys=False):
"""return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
both 'IMSI' and 'IMSI-ACC' parameters.
e.g.
UppAudit.from_der(my_der, [Imsi, ])
--> {'IMSI': '001010000000023', 'IMSI-ACC': '5'}
--> {'IMSI': {'001010000000023'}, 'IMSI-ACC': {'5'}}
(where 'IMSI' == Imsi.name)
@@ -183,11 +197,11 @@ class UppAudit(dict):
audit_key = f'SdKey_KVN{key.key_version_number:02x}_ID{key.key_identifier:02x}'
kuq_bin = KeyUsageQualifier.build(key.key_usage_qualifier).hex()
audit_val = f'{key.key_components=!r} key_usage_qualifier=0x{kuq_bin}={key.key_usage_qualifier!r}'
upp_audit[audit_key] = set((audit_val, ))
upp_audit.add_values({audit_key: audit_val})
return upp_audit
def get_single_val(self, key, validate=True, allow_absent=False, absent_val=None):
def get_single_val(self, key, allow_absent=False, absent_val=None):
"""
Return the audit's value for the given audit key (like 'IMSI' or 'IMSI-ACC').
Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value,
@@ -227,7 +241,7 @@ class UppAudit(dict):
v = try_single_val(v)
if isinstance(v, bytes):
v = bytes_to_hexstr(v)
v = b2h(v)
if v is None:
return 'not present'
return str(v)
@@ -237,21 +251,21 @@ class UppAudit(dict):
return UppAudit.audit_val_to_str(self.get(key))
def add_values(self, src:dict):
"""self and src are both a dict of sets.
"""Merge a plain dict of values into self, which is a dict of sets.
For example from
self == { 'a': set((123,)) }
self == { 'a': {123} }
and
src == { 'a': set((456,)), 'b': set((789,)) }
src == { 'a': 456, 'b': 789 }
then after this function call:
self == { 'a': set((123, 456,)), 'b': set((789,)) }
self == { 'a': {123, 456}, 'b': {789} }
"""
assert isinstance(src, dict)
for key, srcvalset in src.items():
for key, srcval in src.items():
dstvalset = self.get(key)
if dstvalset is None:
dstvalset = set()
self[key] = dstvalset
dstvalset.add(srcvalset)
dstvalset.add(srcval)
def __str__(self):
return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys()))
@@ -276,7 +290,7 @@ class BatchAudit(list):
BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances.
"""
def __init__(self, params:List):
def __init__(self, params: ParamList):
assert params
self.params = params
@@ -319,12 +333,15 @@ class BatchAudit(list):
return batch_audit
def to_csv_rows(self, headers=True, sort_key=None):
def to_csv_rows(self, headers=True, sort_key=None, column_blacklist=None):
"""generator that yields all audits' values as rows, useful feed to a csv.writer."""
columns = set()
for audit in self:
columns.update(audit.keys())
if column_blacklist:
columns.difference_update(set(column_blacklist))
columns = tuple(sorted(columns, key=sort_key))
if headers:
@@ -333,9 +350,6 @@ class BatchAudit(list):
for audit in self:
yield (audit.get_single_val(col, allow_absent=True, absent_val="") for col in columns)
def bytes_to_hexstr(b:bytes, sep=''):
return sep.join(f'{x:02x}' for x in b)
def esim_profile_introspect(upp):
pes = ProfileElementSequence.from_der(upp.read())
d = {}
@@ -343,7 +357,7 @@ def esim_profile_introspect(upp):
def show_bytes_as_hexdump(item):
if isinstance(item, bytes):
return bytes_to_hexstr(item)
return b2h(item)
if isinstance(item, list):
return list(show_bytes_as_hexdump(i) for i in item)
if isinstance(item, tuple):
+21 -29
View File
@@ -129,27 +129,24 @@ class RandomSourceMixin:
class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
"""return a different sequence of random decimal digits each"""
name = "random decimal digits"
used_keys = set()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.used_keys = set()
def get_next(self, csv_row:dict=None):
# try to generate random digits that are always different from previously produced random bytes
attempts = 10
while True:
# try to generate random digits that are always different from previously produced random digits
for _ in range(10):
val = self.random_impl.randint(self.first_value, self.last_value)
if val in RandomDigitSource.used_keys:
attempts -= 1
if attempts:
continue
RandomDigitSource.used_keys.add(val)
break
if val not in self.used_keys:
break
self.used_keys.add(val)
return self.val_to_digit(val)
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
"""return a different sequence of random hexadecimal digits each"""
name = "random hexadecimal digits"
numeric_base = 16
used_keys = set()
def __init__(self, input_str:str):
super().__init__(input_str)
input_str = self.input_str
@@ -161,18 +158,15 @@ class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
if (num_digits & 1) != 0:
raise ValueError(f"hexadecimal value should have even number of digits, not {num_digits}")
self.num_digits = num_digits
self.used_keys = set()
def get_next(self, csv_row:dict=None):
# try to generate random bytes that are always different from previously produced random bytes
attempts = 10
while True:
for _ in range(10):
val = self.random_impl.randbytes(self.num_digits // 2)
if val in RandomHexDigitSource.used_keys:
attempts -= 1
if attempts:
continue
RandomHexDigitSource.used_keys.add(val)
break
if val not in self.used_keys:
break
self.used_keys.add(val)
return b2h(val)
@@ -181,8 +175,9 @@ class IncDigitSource(DecimalRangeSource):
name = "incrementing decimal digits"
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
"""input_str: the first value to return, a string of an integer number with optional leading zero digits. The
leading zero digits are preserved."""
"""input_str: the range of values to iterate. Format: 'FIRST..LAST' (e.g. '0001..9999') or
just 'FIRST' (iterates to the maximum value for the given digit width). Leading zeros in
FIRST determine the digit width and are preserved in returned values."""
super().__init__(input_str, num_digits, first_value, last_value)
self.next_val = None
self.reset()
@@ -211,12 +206,9 @@ class CsvSource(ParamSource):
name = "from CSV"
def __init__(self, input_str:str):
"""self.csv_column = input_str:
column name indicating the column to use for this parameter.
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
CsvSource picks the column with the name matching csv_column.
"""
"""Parse input_str into self.num_digits, self.first_value, self.last_value."""
"""input_str: the CSV column name to read values from.
The caller passes the current CSV row to get_next(), from which CsvSource picks the column matching
this name."""
super().__init__(input_str)
self.csv_column = self.input_str
@@ -224,6 +216,6 @@ class CsvSource(ParamSource):
val = None
if csv_row:
val = csv_row.get(self.csv_column)
if not val:
if val is None:
raise ParamSourceUndefinedExn(f"no value for CSV column {self.csv_column!r}")
return val
+460 -61
View File
@@ -16,23 +16,29 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
import enum
import io
import os
import re
import pprint
import json
from typing import List, Tuple, Generator, Optional
from construct.core import StreamError
from osmocom.tlv import camel_to_snake
from osmocom.utils import hexstr
from pySim.utils import enc_iccid, dec_iccid, enc_imsi, dec_imsi, h2b, b2h, rpad, sanitize_iccid
from pySim.ts_31_102 import EF_AD
from pySim.ts_31_102 import EF_AD, EF_UST, EF_Routing_Indicator, EF_SUCI_Calc_Info, DF_USIM_5GS
from pySim.ts_51_011 import EF_SMSP
from pySim.esim.saip import param_source
from pySim.esim.saip import ProfileElement, ProfileElementSD, ProfileElementSequence
from pySim.esim.saip import ProfileElementHeader
from pySim.esim.saip import SecurityDomainKey, SecurityDomainKeyComponent
from pySim.global_platform import KeyUsageQualifier, KeyType
# optimization: instantiate class instance to get the fid only once.
file_path_df_5gs = bytes.fromhex(DF_USIM_5GS().fid)
fid_ri = bytes.fromhex(EF_Routing_Indicator().fid)
fid_sucici = bytes.fromhex(EF_SUCI_Calc_Info().fid)
def unrpad(s: hexstr, c='f') -> hexstr:
return hexstr(s.rstrip(c))
@@ -239,7 +245,7 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
if val is None:
val = v
elif val != v:
raise ValueError(f'get_value_from_pes(): got distinct values: {val!r} != {v!r}')
raise ValueError(f'get_value_from_pes(): got distinct values: {val!r} != {v!r}')
return val
@classmethod
@@ -291,7 +297,9 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
May be overridden by subclasses.
This default implementation returns the maximum allowed value length -- a good fit for most subclasses.
'''
return cls.get_len_range()[1] or 16
l = cls.get_len_range()[1] or 16
l = min(10*80, l)
return l
@classmethod
def is_super_of(cls, other_class):
@@ -421,10 +429,14 @@ class BinaryParam(ConfigurableParameter):
class EnumParam(ConfigurableParameter):
"""ConfigurableParameter for named value enumerations.
Subclasses define an own value_map, and implement their own apply_val() and get_values_from_pes().
"""
value_map = {
# For example:
#'Meaningful label for value 23': 0x23,
# Where 0x23 is a valid value to use for apply_val().
# Where 0x23 is a valid value to use for apply_val(), of any valid type.
}
_value_map_reverse = None
@@ -634,27 +646,22 @@ class SmspTpScAddr(ConfigurableParameter):
# - To generate the right amount of fillFileContent, pass total_len=42 to encode_record_bin().
# - To show the right size in the PES, set f_smsp.rec_len = 42
ef_smsp_dec['alpha_id'] = ''
f_smsp.rec_len = 42
# we can set this to choose a fixed length:
#f_smsp.rec_len = 42
# but leave rec_len unchanged to keep the same length as was found in the eSIM template.
# re-encode into the File body.
#
#print("SMSP (new): %s" % f_smsp.body)
# re-generate the pe.decoded member from the File instance
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1, total_len=f_smsp.rec_len)
# re-generate the pe.decoded member from the File instance
pe.file2pe(f_smsp)
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('usim'):
f_smsp = pe.files.get('ef-smsp', None)
if f_smsp is None:
continue
try:
ef_smsp = EF_SMSP()
ef_smsp_dec = ef_smsp.decode_record_bin(f_smsp.body, 1)
except IndexError:
continue
f_smsp = pe.files['ef-smsp']
ef_smsp = EF_SMSP()
ef_smsp_dec = ef_smsp.decode_record_bin(f_smsp.body, 1)
tp_sc_addr = ef_smsp_dec.get('tp_sc_addr', None)
@@ -667,24 +674,12 @@ class SmspTpScAddr(ConfigurableParameter):
yield { cls.name: cls.tuple_to_str((international, digits)) }
class MncLen(ConfigurableParameter):
class MncLen(EnumParam):
"""MNC length. Must be either 2 or 3. Sets only the MNC length field in EF-AD (Administrative Data)."""
name = 'MNC-LEN'
allow_chars = '23'
strip_chars = ' \t\r\n'
numeric_base = 10
max_len = 1
min_len = 1
example_input = '2'
value_map = { '2': 2, '3': 3 }
default_source = param_source.ConstantSource
@classmethod
def validate_val(cls, val):
val = super().validate_val(val)
val = int(val)
if val not in (2, 3):
raise ValueError(f"MNC-LEN must be either 2 or 3, not {val!r}")
return val
example_input = '2'
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
@@ -692,8 +687,10 @@ class MncLen(ConfigurableParameter):
for pe in pes.get_pes_for_type('usim'):
if not hasattr(pe, 'files'):
continue
f_ad = pe.files.get('ef-ad')
if not f_ad:
continue
# decode existing values
f_ad = pe.files['ef-ad']
if not f_ad.body:
continue
try:
@@ -711,25 +708,24 @@ class MncLen(ConfigurableParameter):
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for naa in ('isim',):# 'isim', 'csim'):
for pe in pes.get_pes_for_type(naa):
if not hasattr(pe, 'files'):
continue
f_ad = pe.files.get('ef-ad', None)
if f_ad is None:
continue
for pe in pes.get_pes_for_type('usim'):
if not hasattr(pe, 'files'):
continue
f_ad = pe.files.get('ef-ad', None)
if f_ad is None:
continue
try:
ef_ad = EF_AD()
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
except StreamError:
continue
try:
ef_ad = EF_AD()
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
except StreamError:
continue
mnc_len = ef_ad_dec.get('mnc_len', None)
if mnc_len is None:
continue
mnc_len = ef_ad_dec.get('mnc_len', None)
if mnc_len is None:
continue
yield { cls.name: str(mnc_len) }
yield { cls.name: cls.map_val_to_name(int(mnc_len)) }
class SdKey(BinaryParam):
@@ -738,7 +734,6 @@ class SdKey(BinaryParam):
# these will be set by subclasses
key_type = None
kvn = None
reserved_kvn = tuple() # tuple of all reserved kvn for a given SCPxx
key_id = None
key_usage_qual = None
@@ -784,8 +779,6 @@ class SdKey(BinaryParam):
yield { cls.name: b2h(kc) }
NO_OP = (('', {}))
LEN_128 = (16,)
LEN_128_192_256 = (16, 24, 32)
LEN_128_256 = (16, 32)
@@ -1025,7 +1018,7 @@ class Pin(DecimalHexParam):
for pinCode in pinCodes.decoded['pinCodes'][1]:
if pinCode['keyReference'] == cls.keyReference:
yield { cls.name: cls.decimal_hex_to_str(pinCode['pinValue']) }
yield { cls.name: cls.decimal_hex_to_str(pinCode['pinValue']) }
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
@@ -1103,21 +1096,19 @@ class AlgoConfig(ConfigurableParameter):
yield { cls.name: val }
class AlgorithmID(EnumParam, AlgoConfig):
'''use validate_val() from EnumParam, and apply_val() from AlgoConfig.
In get_values_from_pes(), return enum value names, not raw values.'''
"""use validate_val() from EnumParam, and apply_val() from AlgoConfig.
In get_values_from_pes(), return enum value names, not raw values."""
name = "Algorithm"
# as in pySim/esim/asn1/saip/PE_Definitions-3.3.1.asn
value_map = {
"Milenage" : 1,
"TUAK" : 2,
"usim-test" : 3,
}
algo_config_key = 'algorithmID'
example_input = "Milenage"
default_source = param_source.ConstantSource
algo_config_key = 'algorithmID'
# EnumParam.validate_val() returns the int values from value_map
@classmethod
@@ -1165,7 +1156,7 @@ class MilenageRotationConstants(BinaryParam, AlgoConfig):
class MilenageXoringConstants(BinaryParam, AlgoConfig):
"""XOR-ing constants c1,c2,c3,c4,c5 of Milenage, 128bit each. See 3GPP TS 35.206 Sections 2.3 + 5.3.
Provided as octet-string concatenation of all 5 constants. The default value by 3GPP is the concetenation
Provided as octet-string concatenation of all 5 constants. The default value by 3GPP is the concatenation
of::
00000000000000000000000000000000
@@ -1193,3 +1184,411 @@ class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
max_val = 255
example_input = '1'
default_source = param_source.ConstantSource
numeric_base = None # indicate that this won't need random number sources
class EfUstServiceParam(EnumParam):
"""superclass for EF-UST service flag parameters"""
service_idx = 0
value_map = { 'enabled': True, 'disabled': False }
default_source = param_source.ConstantSource
example_input = sorted(value_map.keys())[0]
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
for pe in pes.get_pes_for_type('usim'):
f_ust = pe.files['ef-ust']
ef_ust = EF_UST()
ust = ef_ust.decode_bin(f_ust.body)
ust[cls.service_idx]['activated'] = val
f_ust.body = ef_ust.encode_bin(ust)
pe.file2pe(f_ust)
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('usim'):
f_ust = pe.files.get('ef-ust', None)
if not f_ust:
continue
ef_ust = EF_UST()
try:
ust = ef_ust.decode_bin(f_ust.body)
service_flag = ust[cls.service_idx]['activated']
yield { cls.name: cls.map_val_to_name(service_flag) }
except:
pass
class SuciActive(EfUstServiceParam):
"""EF-UST service nr 124: enable or disable the SUCI service."""
service_idx = 124
name = '5G-SUCI-active'
value_map = { 'SUCI-off': False, 'SUCI-on': True }
example_input = 'SUCI-on'
class SuciInUsim(EfUstServiceParam):
"""EF-UST service nr 125: calculate SUCI in UE or in USIM"""
service_idx = 125
name = '5G-SUCI-in-USIM'
value_map = { 'SUCI-in-UE': False, 'SUCI-in-USIM': True }
example_input = 'SUCI-in-USIM'
class SuciRi(ConfigurableParameter):
"""SUCI Routing Indicator as in section 4.4.11.11 of 3GPP TS 31.102"""
name = '5G-SUCI-RI'
allow_chars = '0123456789'
min_len = 1
max_len = 4
allow_types = (str,)
example_input = '0'
default_source = param_source.ConstantSource
KEY_RI = "routing_indicator"
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
for pe in pes.get_pes_for_type('df-5gs'):
f_ri = pe.files.get('ef-routing-indicator', None)
if f_ri is None:
continue
ef_ri = EF_Routing_Indicator()
ri = ef_ri.decode_bin(f_ri.body)
ri[cls.KEY_RI] = str(val)
f_ri.body = ef_ri.encode_bin(ri)
pe.file2pe(f_ri)
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('df-5gs'):
f_ri = pe.files.get('ef-routing-indicator', None)
if f_ri is None:
continue
ef_ri = EF_Routing_Indicator()
try:
ri = ef_ri.decode_bin(f_ri.body)
yield { cls.name: ri.get(cls.KEY_RI) }
except:
pass
class SuciCalcInfoParameter(ConfigurableParameter):
"""SUCI Calculation Information as in section 4.4.11.8 of 3GPP TS 31.102"""
name = '5G-SUCI-CalcInfo'
default_source = param_source.ConstantSource
allow_types = (str,)
max_len = 4096 # to indicate a large input field to UI renderers
example_input = '{"prot_scheme_id_list": [{"priority": 0, "identifier": 0, "key_index": 0}], "hnet_pubkey_list": []}'
PE_IN_UE = ("df-5gs", "ef-suci-calc-info")
PE_IN_USIM = ("df-saip", "ef-suci-calc-info-usim")
suci_calc_info_pe = None
@classmethod
def validate_val(cls, val):
val = super().validate_val(val)
if not val:
val = "{}"
# check that it is a dict something like
# {
# "prot_scheme_id_list": [
# {"priority": 0, "identifier": 2, "key_index": 1},
# {"priority": 1, "identifier": 1, "key_index": 2},
# ],
# "hnet_pubkey_list": [
# {"hnet_pubkey_identifier": 27,
# "hnet_pubkey": "0472DA71976234CE833A6907425867B82E074D44EF907DFB4B3E21C1C2256EBCD15A7DED52FCBB097A4ED250E036C7B9C8C7004C4EEDC4F068CD7BF8D3F900E3B4"},
# {"hnet_pubkey_identifier": 30,
# "hnet_pubkey": "5A8D38864820197C3394B92613B20B91633CBD897119273BF8E4A6F4EEC0A650"},
# ],
# }
try:
d = json.loads(val)
except json.decoder.JSONDecodeError as e:
raise ValueError(f"Cannot parse SUCI Calc Info: {e}") from e
KEY_PSI_LIST = 'prot_scheme_id_list'
KEY_HPK_LIST = 'hnet_pubkey_list'
KEYS_D = set((KEY_HPK_LIST, KEY_PSI_LIST))
KEYS_PSI = set(('identifier', 'key_index', 'priority'))
KEYS_HPK = set(('hnet_pubkey_identifier', 'hnet_pubkey'))
if not d:
d = { KEY_PSI_LIST: [], KEY_HPK_LIST: [] }
if not (isinstance(d, dict)
and set(d.keys()) == KEYS_D):
raise ValueError(f"Unexpected structure in SUCI Calc Info: expected dict with entries {KEYS_D}")
psi = d.get(KEY_PSI_LIST, None)
if not all((set(e.keys()) == KEYS_PSI) for e in psi):
raise ValueError("Unexpected structure in SUCI Calc Info:"
f" in {KEY_PSI_LIST}, expected dict with entries {KEYS_PSI}")
hpk = d.get(KEY_HPK_LIST, None)
if not all((set(e.keys()) == KEYS_HPK) for e in hpk):
raise ValueError("Unexpected structure in SUCI Calc Info:"
f" in {KEY_HPK_LIST}, expected dict with entries {KEYS_HPK}")
return d
@classmethod
def _apply_suci(cls, pes: ProfileElementSequence, val, pe_type="df-5gs", pe_file="ef-suci-calc-info"):
for pe in pes.get_pes_for_type(pe_type):
f_sucici = pe.files.get(pe_file, None)
if not f_sucici:
continue
ef_sucici = EF_SUCI_Calc_Info()
body = ef_sucici.encode_bin(val)
# 0xff pad up to the existing file size, so that the underlying template doesn't come through
is_size = f_sucici.file_size
pad_n = is_size - len(body)
if pad_n > 0:
body = body + b'\xff' * pad_n
f_sucici.body = body
pe.file2pe(f_sucici)
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
cls._apply_suci(pes, val, *cls.suci_calc_info_pe)
@staticmethod
def normalize_sucici(sucici:dict):
"""Normalize the CalcInfo dict so it can be json encoded:
convert bytes to hex strings."""
if not sucici:
sucici = {}
for hnet_pubkey in sucici.get('hnet_pubkey_list', ()):
val = hnet_pubkey['hnet_pubkey']
if isinstance(val, bytes):
val = b2h(val)
hnet_pubkey['hnet_pubkey'] = val
return sucici
@classmethod
def _get_suci(cls, pes: ProfileElementSequence, pe_type="df-5gs", pe_file="ef-suci-calc-info"):
for pe in pes.get_pes_for_type(pe_type):
f_sucici = pe.files.get(pe_file, None)
if not f_sucici:
continue
ef_sucici = EF_SUCI_Calc_Info()
sucici = ef_sucici.decode_bin(f_sucici.body)
# normalize to string (bytes cannot go into json)
sucici = cls.normalize_sucici(sucici)
yield { cls.name: json.dumps(sucici) }
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
yield from cls._get_suci(pes, *cls.suci_calc_info_pe)
class SuciCalcInfoUe(SuciCalcInfoParameter):
"""SUCI Calculation Information as in section 4.4.11.8 of 3GPP TS 31.102, readable by UE (DF-5GS)"""
name = '5G-SUCI-CalcInfo-UE'
suci_calc_info_pe = SuciCalcInfoParameter.PE_IN_UE
class SuciCalcInfoUsim(SuciCalcInfoParameter):
"""SUCI Calculation Information as in section 4.4.11.8 of 3GPP TS 31.102, readable only by USIM (DF-SAIP)"""
name = '5G-SUCI-CalcInfo-USIM'
suci_calc_info_pe = SuciCalcInfoParameter.PE_IN_USIM
def gfm_find(pes: ProfileElementSequence, file_path:bytes, ef_fid:bytes):
"""look through genericFileManagement PE and return the fmc list with start and end indexes as
(fmc_list, first_idx, after_last_idx)
so that fmc_list[first_idx:after_last_idx] is the slice of file management commands relevant to the given
file_path/ef_fid.
"""
for pe in pes.get_pes_for_type('genericFileManagement'):
path_match = False
creating_fid = False
for fmc in pe.decoded['fileManagementCMD']:
first = None
last = None
for idx in range(len(fmc)):
cmd, arg = fmc[idx]
if cmd == 'filePath':
path_match = (arg == file_path)
if not path_match:
creating_fid = False
elif path_match and cmd == 'createFCP':
creating_fid = (arg.get('fileID') == ef_fid)
if creating_fid:
if first is None:
first = idx
last = idx
first = min(first, idx)
last = max(last, idx)
if first is not None:
yield fmc, first, last + 1
# genericFileManagement 5G params
def pes_get_adf_fid(pes:ProfileElementSequence, naa_name="usim", adf_name="adf-usim"):
adf = pes.get_pe_for_type(naa_name)
return adf.decoded[adf_name][0][1]['fileID']
def mk_adf_df_path(pes, naa:str, adf:str, file_path:bytes) -> bytes:
adf_file_id = pes_get_adf_fid(pes, naa, adf)
return b''.join((adf_file_id, file_path))
def gfm_get_file_content(pes: ProfileElementSequence, naa:str, adf:str, file_path:bytes, ef_fid:bytes) -> bytes:
'''find a given file in the genericFileManagement section, and return the bytes from the first fillFileContent
item.
TODO: implement File.from_gfm() and return the full resulting bytes?
'''
adf_df_path = mk_adf_df_path(pes, naa, adf, file_path)
data = []
for fmc, first_idx, after_last_idx in gfm_find(pes, adf_df_path, ef_fid):
assert fmc[first_idx][0] == 'createFCP'
assert after_last_idx > first_idx
idx = first_idx + 1
while idx < after_last_idx:
if fmc[idx][0] == 'fillFileContent':
data.append(fmc[idx][1])
idx += 1
return data
def gfm_set_file_content(pes: ProfileElementSequence, naa:str, adf:str, file_path:bytes, ef_fid:bytes, file_content:bytes) -> int:
adf_df_path = mk_adf_df_path(pes, naa, adf, file_path)
found = 0
for fmc, first_idx, after_last_idx in gfm_find(pes, adf_df_path, ef_fid):
assert fmc[first_idx][0] == 'createFCP'
assert after_last_idx > first_idx
new_fmc = [
fmc[first_idx],
('fillFileContent', file_content),
]
new_fmc[0][1]['efFileSize'] = bytes((len(file_content), ))
fmc[first_idx:after_last_idx] = new_fmc
found += 1
return found
class GfmSuciRi(SuciRi):
"""SUCI Routing Indicator as in section 4.4.11.11 of 3GPP TS 31.102,
applied via General File Management. Intended for SAIP 2.1 profiles."""
name = 'GFM-5G-SUCI-RI'
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
ri = {
"routing_indicator": str(val),
"rfu": "ffff"
}
ef_ri = EF_Routing_Indicator()
found = gfm_set_file_content(pes, 'usim', 'adf-usim', file_path_df_5gs, fid_ri,
ef_ri.encode_bin(ri))
if not found:
raise ValueError(f"No target file found, Cannot apply {cls.name} = {ri}")
data = gfm_get_file_content(pes, 'usim', 'adf-usim', file_path_df_5gs, fid_ri)
val = ef_ri.decode_bin(b''.join(data))
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
data = gfm_get_file_content(pes, 'usim', 'adf-usim', file_path_df_5gs, fid_ri)
if not data:
return
data = b''.join(data)
if not data:
return
ef_ri = EF_Routing_Indicator()
ri = ef_ri.decode_bin(data)
yield { cls.name: ri.get(cls.KEY_RI) }
class GfmSuciCalcInfoUe(SuciCalcInfoUe):
"""SUCI Calculation Information as in section 4.4.11.8 of 3GPP TS 31.102, readable by UE (DF-5GS),
applied via General File Management. Intended for SAIP 2.1 profiles."""
name = 'GFM-5G-SUCI-CalcInfo-UE'
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
if not isinstance(val, dict):
raise ValueError("val should be a dict, after 'val = SuciCalcInfoParameter.validate_val(val)'")
ef_sucici = EF_SUCI_Calc_Info()
body = ef_sucici.encode_bin(val)
gfm_set_file_content(pes, 'usim', 'adf-usim', file_path_df_5gs, fid_sucici,
body)
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
data = gfm_get_file_content(pes, 'usim', 'adf-usim', file_path_df_5gs, fid_sucici)
if not data:
return
data = b''.join(data)
if not data:
return
ef_sucici = EF_SUCI_Calc_Info()
sucici = ef_sucici.decode_bin(data)
sucici = cls.normalize_sucici(sucici)
yield { cls.name: json.dumps(sucici) }
class EuiccMandatoryServiceParam(EnumParam):
"""superclass for managing items of the ProfileHeader / eUICC-Mandatory-services ServicesList"""
service_name = None
value_map = { 'mandatory': True, 'optional': False }
default_source = param_source.ConstantSource
example_input = sorted(value_map.keys())[0]
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
for pe in pes.get_pes_for_type('header'):
assert isinstance(pe, ProfileElementHeader)
if val:
pe.mandatory_service_add(cls.service_name)
else:
# explicitly check to avoid exception when then service is already not present
if pe.mandatory_service_present(cls.service_name):
pe.mandatory_service_remove(cls.service_name)
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('header'):
assert isinstance(pe, ProfileElementHeader)
val = bool(pe.mandatory_service_present(cls.service_name))
yield { cls.name: cls.map_val_to_name(val) }
class EuiccMandatoryServiceGetIdentity(EuiccMandatoryServiceParam):
"""eUICC Mandatory Services: get-identity. The eUICC must be capable of providing a 5G identity using SUCI-CalcInfo
located in the USIM's DF-SAIP, see parameter 5G-SUCI-CalcInfo-USIM."""
name = '5G-eUICC-get-identity'
service_name = 'get-identity'
class EuiccMandatoryServiceProfileA(EuiccMandatoryServiceParam):
"""eUICC Mandatory Services: profile-a-x25519. The eUICC must be able to estblish a 5G identity using an X25519 key,
as provided in a profile-A ("identifier": 1) key in SUCI-CalcInfo located in the USIM's DF-SAIP, see parameter
5G-SUCI-CalcInfo-USIM."""
name = '5G-eUICC-profile-a-x25519'
service_name = 'profile-a-x25519'
class EuiccMandatoryServiceProfileB(EuiccMandatoryServiceParam):
"""eUICC Mandatory Services: profile-b-p256. The eUICC must be able to estblish a 5G identity using a P256 key, as
provided in a profile-B ("identifier": 2) key in SUCI-CalcInfo located in the USIM's DF-SAIP, see parameter
5G-SUCI-CalcInfo-USIM."""
name = '5G-eUICC-profile-b-p256'
service_name = 'profile-b-p256'
+41 -3
View File
@@ -226,9 +226,28 @@ class Icon(BER_TLV_IE, tag=0x94):
_construct = GreedyBytes
class ProfileClass(BER_TLV_IE, tag=0x95):
_construct = Enum(Int8ub, test=0, provisioning=1, operational=2)
class ProfilePolicyRules(BER_TLV_IE, tag=0x99):
_construct = GreedyBytes
class NotificationConfigurationInfo(BER_TLV_IE, tag=0xb6):
_construct = GreedyBytes
# ProfileOwner
class ProfileOwnerPLMN(BER_TLV_IE, tag=0x80):
_construct = PlmnAdapter(Bytes(3))
class ProfileOwnerGID1(BER_TLV_IE, tag=0x81):
_construct = GreedyBytes
class ProfileOwnerGID2(BER_TLV_IE, tag=0x82):
_construct = GreedyBytes
class ProfileOwner(BER_TLV_IE, tag=0xb7, nested=[ProfileOwnerPLMN, ProfileOwnerGID1, ProfileOwnerGID2]):
_construct = GreedyBytes
class SMDPPProprietaryData(BER_TLV_IE, tag=0xb8):
_construct = GreedyBytes
class ProfileInfo(BER_TLV_IE, tag=0xe3, nested=[Iccid, IsdpAid, ProfileState, ProfileNickname,
ServiceProviderName, ProfileName, IconType, Icon,
ProfileClass]): # FIXME: more IEs
ProfileClass, ProfilePolicyRules, NotificationConfigurationInfo,
ProfileOwner, SMDPPProprietaryData]):
pass
class ProfileInfoSeq(BER_TLV_IE, tag=0xa0, nested=[ProfileInfo]):
pass
@@ -444,9 +463,28 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
d = rn.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['notification_sent_resp']))
def do_get_profiles_info(self, _opts):
get_profiles_info_parser = argparse.ArgumentParser()
get_profiles_info_parser.add_argument('--all', action='store_true', help='Retrieve all known tags of a profile')
@cmd2.with_argparser(get_profiles_info_parser)
def do_get_profiles_info(self, opts):
"""Perform an ES10c GetProfilesInfo function."""
pi = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, ProfileInfoListReq(), ProfileInfoListResp)
if opts.all:
tags = [nest.tag for nest in ProfileInfo.nested_collection_cls().nested]
u8tags = []
# TODO: rework TagList to support 2 byte tags to not filter it into u8 tags
for tag in tags:
if tag <= 255:
u8tags.append(tag)
elif tag <= 65535:
u8tags.append(tag >> 8)
u8tags.append(tag & 0xff)
# Ignoring 3 byte tags
req = ProfileInfoListReq(children=[TagList(decoded=u8tags)])
else:
req = ProfileInfoListReq()
pi = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, req, ProfileInfoListResp)
d = pi.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['profile_info_list_resp']))
+5 -2
View File
@@ -44,6 +44,7 @@ from pySim.utils import sw_match, decomposeATR
from pySim.jsonpath import js_path_modify
from pySim.commands import SimCardCommands
from pySim.exceptions import SwMatchError
from pySim.log import PySimLogger
# int: a single service is associated with this file
# list: any of the listed services requires this file
@@ -52,6 +53,8 @@ CardFileService = Union[int, List[int], Tuple[int, ...]]
Size = Tuple[int, Optional[int]]
log = PySimLogger.get(__name__)
class CardFile:
"""Base class for all objects in the smart card filesystem.
Serve as a common ancestor to all other file types; rarely used directly.
@@ -1609,14 +1612,14 @@ class CardModel(abc.ABC):
card_atr = scc.get_atr()
for atr in cls._atrs:
if atr == card_atr:
print("Detected CardModel:", cls.__name__)
log.info("Detected CardModel: %s", cls.__name__)
return True
# if nothing found try to just compare the Historical Bytes of the ATR
card_atr_hb = decomposeATR(card_atr)['hb']
for atr in cls._atrs:
atr_hb = decomposeATR(atr)['hb']
if atr_hb == card_atr_hb:
print("Detected CardModel:", cls.__name__)
log.info("Detected CardModel: %s", cls.__name__)
return True
return False
+17 -17
View File
@@ -27,9 +27,9 @@ from osmocom.utils import b2h
from osmocom.tlv import bertlv_parse_len, bertlv_encode_len
from pySim.utils import parse_command_apdu
from pySim.secure_channel import SecureChannel
from pySim.log import PySimLogger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
log = PySimLogger.get(__name__)
def scp02_key_derivation(constant: bytes, counter: int, base_key: bytes) -> bytes:
assert len(constant) == 2
@@ -75,7 +75,7 @@ class Scp02SessionKeys:
h = e.encrypt(strxor(h, bytes(padded_data[8*i:8*(i+1)])))
h = d.decrypt(h)
h = e.encrypt(h)
logger.debug("mac_1des(%s,icv=%s) -> %s", b2h(data), b2h(icv), b2h(h))
log.debug("mac_1des(%s,icv=%s) -> %s", b2h(data), b2h(icv), b2h(h))
if self.des_icv_enc:
self.icv = self.des_icv_enc.encrypt(h)
else:
@@ -89,7 +89,7 @@ class Scp02SessionKeys:
h = b'\x00' * 8
for i in range(q):
h = e.encrypt(strxor(h, bytes(padded_data[8*i:8*(i+1)])))
logger.debug("mac_3des(%s) -> %s", b2h(data), b2h(h))
log.debug("mac_3des(%s) -> %s", b2h(data), b2h(h))
return h
def __init__(self, counter: int, card_keys: 'GpCardKeyset', icv_encrypt=True):
@@ -276,10 +276,10 @@ class SCP02(SCP):
return cipher.decrypt(ciphertext)
def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes):
logger.debug("host_challenge(%s), card_challenge(%s)", b2h(host_challenge), b2h(card_challenge))
log.debug("host_challenge(%s), card_challenge(%s)", b2h(host_challenge), b2h(card_challenge))
self.host_cryptogram = self.sk.calc_mac_3des(self.sk.counter.to_bytes(2, 'big') + card_challenge + host_challenge)
self.card_cryptogram = self.sk.calc_mac_3des(self.host_challenge + self.sk.counter.to_bytes(2, 'big') + card_challenge)
logger.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
log.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
def gen_init_update_apdu(self, host_challenge: bytes = b'\x00'*8) -> bytes:
"""Generate INITIALIZE UPDATE APDU."""
@@ -291,7 +291,7 @@ class SCP02(SCP):
resp = self.constr_iur.parse(resp_bin)
self.card_challenge = resp['card_challenge']
self.sk = Scp02SessionKeys(resp['seq_counter'], self.card_keys)
logger.debug(self.sk)
log.debug(self.sk)
self._compute_cryptograms(self.card_challenge, self.host_challenge)
if self.card_cryptogram != resp['card_cryptogram']:
raise ValueError("card cryptogram doesn't match")
@@ -311,7 +311,7 @@ class SCP02(SCP):
def _wrap_cmd_apdu(self, apdu: bytes, *args, **kwargs) -> bytes:
"""Wrap Command APDU for SCP02: calculate MAC and encrypt."""
logger.debug("wrap_cmd_apdu(%s)", b2h(apdu))
log.debug("wrap_cmd_apdu(%s)", b2h(apdu))
if not self.do_cmac:
return apdu
@@ -378,7 +378,7 @@ def scp03_key_derivation(constant: bytes, context: bytes, base_key: bytes, l: Op
if l is None:
l = len(base_key) * 8
logger.debug("scp03_kdf(constant=%s, context=%s, base_key=%s, l=%u)", b2h(constant), b2h(context), b2h(base_key), l)
log.debug("scp03_kdf(constant=%s, context=%s, base_key=%s, l=%u)", b2h(constant), b2h(context), b2h(base_key), l)
output_len = l // 8
# SCP03 Section 4.1.5 defines a different parameter order than NIST SP 800-108, so we cannot use the
# existing Cryptodome.Protocol.KDF.SP800_108_Counter function :(
@@ -451,7 +451,7 @@ class Scp03SessionKeys:
# This block SHALL be encrypted with S-ENC to produce the ICV for command encryption.
cipher = AES.new(self.s_enc, AES.MODE_CBC, iv)
icv = cipher.encrypt(data)
logger.debug("_get_icv(data=%s, is_resp=%s) -> icv=%s", b2h(data), is_response, b2h(icv))
log.debug("_get_icv(data=%s, is_resp=%s) -> icv=%s", b2h(data), is_response, b2h(icv))
return icv
# TODO: Resolve duplication with pySim.esim.bsp.BspAlgoCryptAES128 which provides pad80-wrapping
@@ -489,12 +489,12 @@ class SCP03(SCP):
return cipher.decrypt(ciphertext)
def _compute_cryptograms(self):
logger.debug("host_challenge(%s), card_challenge(%s)", b2h(self.host_challenge), b2h(self.card_challenge))
log.debug("host_challenge(%s), card_challenge(%s)", b2h(self.host_challenge), b2h(self.card_challenge))
# Card + Host Authentication Cryptogram: Section 6.2.2.2 + 6.2.2.3
context = self.host_challenge + self.card_challenge
self.card_cryptogram = scp03_key_derivation(self.sk.DERIV_CONST_AUTH_CGRAM_CARD, context, self.sk.s_mac, l=self.s_mode*8)
self.host_cryptogram = scp03_key_derivation(self.sk.DERIV_CONST_AUTH_CGRAM_HOST, context, self.sk.s_mac, l=self.s_mode*8)
logger.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
log.debug("host_cryptogram(%s), card_cryptogram(%s)", b2h(self.host_cryptogram), b2h(self.card_cryptogram))
def gen_init_update_apdu(self, host_challenge: Optional[bytes] = None) -> bytes:
"""Generate INITIALIZE UPDATE APDU."""
@@ -514,7 +514,7 @@ class SCP03(SCP):
self.i_param = resp['i_param']
# derive session keys and compute cryptograms
self.sk = Scp03SessionKeys(self.card_keys, self.host_challenge, self.card_challenge)
logger.debug(self.sk)
log.debug(self.sk)
self._compute_cryptograms()
# verify computed cryptogram matches received cryptogram
if self.card_cryptogram != resp['card_cryptogram']:
@@ -529,7 +529,7 @@ class SCP03(SCP):
def _wrap_cmd_apdu(self, apdu: bytes, skip_cenc: bool = False) -> bytes:
"""Wrap Command APDU for SCP03: calculate MAC and encrypt."""
logger.debug("wrap_cmd_apdu(%s)", b2h(apdu))
log.debug("wrap_cmd_apdu(%s)", b2h(apdu))
if not self.do_cmac:
return apdu
@@ -584,7 +584,7 @@ class SCP03(SCP):
# status word: in this case only the status word shall be returned in the response. All status words
# except '9000' and warning status words (i.e. '62xx' and '63xx') shall be interpreted as error status
# words.
logger.debug("unwrap_rsp_apdu(sw=%s, rsp_apdu=%s)", sw, rsp_apdu)
log.debug("unwrap_rsp_apdu(sw=%s, rsp_apdu=%s)", sw, rsp_apdu)
if not self.do_rmac:
assert not self.do_renc
return rsp_apdu
@@ -600,9 +600,9 @@ class SCP03(SCP):
if self.do_renc:
# decrypt response data
decrypted = self.sk._decrypt(response_data)
logger.debug("decrypted: %s", b2h(decrypted))
log.debug("decrypted: %s", b2h(decrypted))
# remove padding
response_data = unpad80(decrypted)
logger.debug("response_data: %s", b2h(response_data))
log.debug("response_data: %s", b2h(response_data))
return response_data
+2 -1
View File
@@ -152,7 +152,8 @@ class SimCard(SimCardBase):
return sw
def update_smsp(self, smsp):
data, sw = self._scc.update_record(EF['SMSP'], 1, rpad(smsp, 84))
print("using update_smsp")
data, sw = self._scc.update_record(EF['SMSP'], 1, smsp, leftpad=True)
return sw
def update_ad(self, mnc=None, opmode=None, ofm=None, path=EF['AD']):
+1 -1
View File
@@ -44,7 +44,7 @@ class PySimLogger:
"""
LOG_FMTSTR = "%(levelname)s: %(message)s"
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- " + LOG_FMTSTR
LOG_FMTSTR_VERBOSE = "%(name)s.%(lineno)d -- " + LOG_FMTSTR
__formatter = logging.Formatter(LOG_FMTSTR)
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
+46 -16
View File
@@ -301,24 +301,54 @@ class LinkBaseTpdu(LinkBase):
prev_tpdu = tpdu
data, sw = self.send_tpdu(tpdu)
log.debug("T0: case #%u TPDU: %s => %s %s", case, tpdu, data or "(no data)", sw or "(no status word)")
if sw is None:
raise ValueError("no status word received")
# When we have sent the first APDU, the SW may indicate that there are response bytes
# available. There are two SWs commonly used for this 9fxx (sim) and 61xx (usim), where
# xx is the number of response bytes available.
# See also:
if sw is not None:
while (sw[0:2] in ['9f', '61', '62', '63']):
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
tpdu_gr = tpdu[0:2] + 'c00000' + sw[2:4]
# After sending the APDU/TPDU the UICC/eUICC or SIM may response with a status word that indicates that further
# TPDUs have to be sent in order to complete the task.
if case == 4 or self.apdu_strict == False:
# In case the APDU is a case #4 APDU, the UICC/eUICC/SIM may indicate that there is response data
# available which has to be retrieved using a GET RESPONSE command TPDU.
#
# ETSI TS 102 221, section 7.3.1.1.4 is very cleare about the fact that the GET RESPONSE mechanism
# shall only apply on case #4 APDUs but unfortunately it is impossible to distinguish between case #3
# and case #4 when the APDU format is not strictly followed. In order to be able to detect case #4
# correctly the Le byte (usually 0x00) must be present, is often forgotten. To avoid problems with
# legacy scripts that use raw APDU strings, we will still loosely apply GET RESPONSE based on what
# the status word indicates. Unless the user explicitly enables the strict mode (set apdu_strict true)
while True:
if sw in ['9000', '9100']:
# A status word of 9000 (or 9100 in case there is pending data from a proactive SIM command)
# indicates that either no response data was returnd or all response data has been retrieved
# successfully. We may discontinue the processing at this point.
break;
if sw[0:2] in ['61', '9f']:
# A status word of 61xx or 9fxx indicates that there is (still) response data available. We
# send a GET RESPONSE command with the length value indicated in the second byte of the status
# word. (see also ETSI TS 102 221, section 7.3.1.1.4, clause 4a and 3GPP TS 51.011 9.4.1 and
# ISO/IEC 7816-4, Table 5)
le_gr = sw[2:4]
elif sw[0:2] in ['62', '63']:
# There are corner cases (status word is 62xx or 63xx) where the UICC/eUICC/SIM asks us
# to send a dummy GET RESPONSE command. We send a GET RESPONSE command with a length of 0.
# (see also ETSI TS 102 221, section 7.3.1.1.4, clause 4b and ETSI TS 151 011, section 9.4.1)
le_gr = '00'
else:
# A status word other then the ones covered by the above logic may indicate an error. In this
# case we will discontinue the processing as well.
# (see also ETSI TS 102 221, section 7.3.1.1.4, clause 4c)
break
tpdu_gr = tpdu[0:2] + 'c00000' + le_gr
prev_tpdu = tpdu_gr
d, sw = self.send_tpdu(tpdu_gr)
data += d
if sw[0:2] == '6c':
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
tpdu_gr = prev_tpdu[0:8] + sw[2:4]
data, sw = self.send_tpdu(tpdu_gr)
data_gr, sw = self.send_tpdu(tpdu_gr)
log.debug("T0: GET RESPONSE TPDU: %s => %s %s", tpdu_gr, data_gr or "(no data)", sw or "(no status word)")
data += data_gr
if sw[0:2] == '6c':
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
tpdu_gr = prev_tpdu[0:8] + sw[2:4]
data, sw = self.send_tpdu(tpdu_gr)
log.debug("T0: repated case #%u TPDU: %s => %s %s", case, tpdu_gr, data or "(no data)", sw or "(no status word)")
return data, sw
+1 -1
View File
@@ -327,7 +327,7 @@ class EF_SUCI_Calc_Info(TransparentEF):
"""conversion method to generate list of {hnet_pubkey_identifier, hnet_pubkey} dicts
from flat [{hnet_pubkey_identifier: }, {net_pubkey: }, ...] list"""
out = []
while len(l):
while l:
a = l.pop(0)
b = l.pop(0)
z = {**a, **b}
+51 -8
View File
@@ -251,6 +251,16 @@ class EF_SMSP(LinFixedEF):
"numbering_plan_id": "isdn_e164" },
"call_number": "4915790109999" },
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
( 'e1ffffffffffffffffffffffff0891945197109099f9ffffff0000a9',
{ "alpha_id": "", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
"tp_pid": True, "tp_dcs": True, "tp_vp": True },
"tp_dest_addr": { "length": 255, "ton_npi": { "ext": True, "type_of_number": "reserved_for_extension",
"numbering_plan_id": "reserved_for_extension" },
"call_number": "" },
"tp_sc_addr": { "length": 8, "ton_npi": { "ext": True, "type_of_number": "international",
"numbering_plan_id": "isdn_e164" },
"call_number": "4915790109999" },
"tp_pid": b"\x00", "tp_dcs": b"\x00", "tp_vp_minutes": 4320 } ),
( '454e6574776f726b73fffffffffffffff1ffffffffffffffffffffffffffffffffffffffffffffffff0000a7',
{ "alpha_id": "ENetworks", "parameter_indicators": { "tp_dest_addr": False, "tp_sc_addr": True,
"tp_pid": True, "tp_dcs": True, "tp_vp": False },
@@ -331,7 +341,8 @@ class EF_SMSP(LinFixedEF):
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
DestAddr = Struct('length'/Rebuild(Int8ub, lambda ctx: EF_SMSP.dest_addr_len(ctx)),
'ton_npi'/TonNpi, 'call_number'/PaddedBcdAdapter(Rpad(Bytes(10))))
self._construct = Struct('alpha_id'/COptional(GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28)))),
# (see comment below)
self._construct = Struct('alpha_id'/GsmOrUcs2Adapter(Rpad(Bytes(this._.total_len-28))),
'parameter_indicators'/InvertAdapter(BitStruct(
Const(7, BitsInteger(3)),
'tp_vp'/Flag,
@@ -345,6 +356,25 @@ class EF_SMSP(LinFixedEF):
'tp_dcs'/Bytes(1),
'tp_vp_minutes'/EF_SMSP.ValidityPeriodAdapter(Byte))
# Ensure 'alpha_id' is always present
def encode_record_hex(self, abstract_data: dict, record_nr: int, total_len: int = None) -> str:
# Problem: TS 51.011 Section 10.5.6 describes the 'alpha_id' field as optional. However, this is only true
# at the time when the record length of the file is set up in the file system. A card manufacturer may decide
# to remove the field by setting the record length to 28. Likewise, the card manaufacturer may also decide to
# set the field to a distinct length by setting the record length to a value greater than 28 (e.g. 14 bytes
# 'alpha_id' + 28 bytes). Due to the fixed nature of the record length, this eventually means that in practice
# 'alpha_id' is a mandatory field with a fixed length.
#
# Due to the problematic specification of 'alpha_id' as a pseudo-optional field at the beginning of a
# fixed-size memory, the construct definition in self._construct has been incorrectly implemented and the field
# has been marked as COptional. We may correct the problem by removing COptional. But to maintain compatibility,
# we then have to ensure that in case the field is not provided (None), it is set to an empty string ('').
#
# See also ts_31_102.py, class EF_OCI for a correct example.
if abstract_data['alpha_id'] is None:
abstract_data['alpha_id'] = ''
return super().encode_record_hex(abstract_data, record_nr, total_len)
# TS 51.011 Section 10.5.7
class EF_SMSS(TransparentEF):
class MemCapAdapter(Adapter):
@@ -1233,9 +1263,11 @@ class CardProfileSIM(CardProfile):
@staticmethod
def decode_select_response(resp_hex: str) -> object:
# we try to build something that resembles a dict resulting from the TLV decoder
# of TS 102.221 (FcpTemplate), so that higher-level code only has to deal with one
# format of SELECT response
"""
Decode the select response to a dict representation, similar to the one of TS 102.221 (see ts_102_221.py,
class FcpTemplate), so that higher-level code only has to deal with one respresentation. See also
3GPP TS 51.011, section 9.2.1
"""
resp_bin = h2b(resp_hex)
struct_of_file_map = {
0: 'transparent',
@@ -1273,13 +1305,24 @@ class CardProfileSIM(CardProfile):
record_len = resp_bin[14]
ret['file_descriptor']['record_len'] = record_len
ret['file_descriptor']['num_of_rec'] = ret['file_size'] // record_len
ret['access_conditions'] = b2h(resp_bin[8:10])
if resp_bin[11] & 0x01 == 0:
ret['access_conditions'] = b2h(resp_bin[8:11])
# Life cycle status integer, see also ETSI TS 102 221, table 11.7b
lcsi = resp_bin[11]
if lcsi == 0x00:
ret['life_cycle_status_int'] = 'no_information'
elif lcsi == 0x01:
ret['life_cycle_status_int'] = 'creation'
elif lcsi == 0x03:
ret['life_cycle_status_int'] = 'initialization'
elif lcsi & 0xFD == 0x05:
ret['life_cycle_status_int'] = 'operational_activated'
elif resp_bin[11] & 0x04:
elif lcsi & 0xFD == 0x04:
ret['life_cycle_status_int'] = 'operational_deactivated'
elif lcsi & 0xFC == 0x0C:
ret['life_cycle_status_int'] = 'termination'
else:
ret['life_cycle_status_int'] = 'terminated'
ret['life_cycle_status_int'] = lcsi
return ret
@classmethod
+4
View File
@@ -4,3 +4,7 @@ build-backend = "setuptools.build_meta"
[tool.pylint.main]
ignored-classes = ["twisted.internet.reactor"]
[tool.pylint.TYPECHECK]
# SdKey subclasses are generated dynamically via SdKey.generate_sd_key_classes()
generated-members = ["SdKey[A-Za-z0-9]+"]
Binary file not shown.
+1 -1
View File
@@ -5,7 +5,7 @@ ICCID: 8988219000000117833
IMSI: 001010000000111
GID1: ffffffffffffffff
GID2: ffffffffffffffff
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: Fairwaves
Show in HPLMN: False
+1 -1
View File
@@ -5,7 +5,7 @@ ICCID: 89445310150011013678
IMSI: 001010000000102
GID1: Can't read file -- SW match failed! Expected 9000 and got 6a82.
GID2: Can't read file -- SW match failed! Expected 9000 and got 6a82.
SMSP: e1ffffffffffffffffffffffff0581005155f5ffffffffffff000000ffffffffffffffffffffffffffff
SMSP: ffffffffffffffffffffffffffffe1ffffffffffffffffffffffff0581005155f5ffffffffffff000000
SMSC: 0015555
SPN: wavemobile
Show in HPLMN: False
@@ -7,10 +7,24 @@ set apdu_strict true
# No command data field, No response data field present
apdu 00700001 --expect-sw 9000 --expect-response-regex '^$'
# Case #1: (verify pin)
# This command returns the number of remaining authentication attempts in the
# form of a status that has the form 63cX, where X is the number of remaining
# attempts. Such a status word can be easily confused with the response to a
# case #4 APDU. This test checks if the transport layer correctly distinguishes
# the between APDU case #1 and APDU case #4.
apdu 0020000A --expect-sw 63c? --expect-response-regex '^$'
# Case #2: (status)
# No command data field, Response data field present
apdu 80F2000000 --expect-sw 9000 --expect-response-regex '^[a-fA-F0-9]+$'
# Case #2: (verify pin)
# (see also above). This test checks if the transport layer is also able to
# distinguish correctly between APDU case #2 (with zero length response) and
# APDU case #4.
apdu 0020000A00 --expect-sw 63c? --expect-response-regex '^$'
# Case #3: (terminal capability)
# Command data field present, No response data field
apdu 80AA000005a903830180 --expect-sw 9000 --expect-response-regex '^$'
@@ -1,6 +1,6 @@
#!/bin/bash
# Utility to verify the functionality of pySim-trace.py
# Utility to verify the functionality of pySim-smpp2sim.py
#
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
+2 -1
View File
@@ -20,7 +20,8 @@ class TestCardKeyProviderCsv(unittest.TestCase):
"KIK3" : "00010204040506070809488B0C0D0E0F"}
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
card_key_provider_register(CardKeyProviderCsv(csv_file_path, column_keys))
card_key_field_cryptor = CardKeyFieldCryptor(column_keys)
card_key_provider_register(CardKeyProviderCsv(csv_file_path, card_key_field_cryptor))
super().__init__(*args, **kwargs)
def test_card_key_provider_get(self):
+320 -22
View File
@@ -17,10 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import enum
import io
import sys
import unittest
import io
import json
from importlib import resources
from osmocom.utils import hexstr
from pySim.esim.saip import ProfileElementSequence
@@ -53,18 +54,21 @@ class ConfigurableParameterTest(unittest.TestCase):
def test_parameters(self):
upp_fnames = (
'TS48v5_SAIP2.1A_NoBERTLV.der',
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
'SAIP2.1_gfmsuci.der',
'TS48v5_SAIP2.1B_NoBERTLV.der',
'TS48v5_SAIP2.3_NoBERTLV.der',
)
class Paramtest:
def __init__(self, param_cls, val, expect_val, expect_clean_val=None):
iff_present_default = False
def __init__(self, param_cls, val, expect_val, expect_clean_val=None, iff_present=None):
self.param_cls = param_cls
self.val = val
self.expect_clean_val = expect_clean_val
self.expect_val = expect_val
if iff_present is None:
iff_present = Paramtest.iff_present_default
self.iff_present = iff_present
param_tests = [
Paramtest(param_cls=p13n.Imsi, val='123456',
@@ -267,7 +271,112 @@ class ConfigurableParameterTest(unittest.TestCase):
'11111111111111111111111111111111'
'22222222222222222222222222222222'),
]
Paramtest(param_cls=p13n.MncLen,
val='2',
expect_clean_val=2,
expect_val='2'),
Paramtest(param_cls=p13n.MncLen,
val=3,
expect_clean_val=3,
expect_val='3'),
Paramtest(param_cls=p13n.EuiccMandatoryServiceGetIdentity,
val='mandatory',
expect_clean_val=True,
expect_val='mandatory'),
Paramtest(param_cls=p13n.EuiccMandatoryServiceGetIdentity,
val='optional',
expect_clean_val=False,
expect_val='optional'),
Paramtest(param_cls=p13n.EuiccMandatoryServiceProfileA,
val='mandatory',
expect_clean_val=True,
expect_val='mandatory'),
Paramtest(param_cls=p13n.EuiccMandatoryServiceProfileA,
val='optional',
expect_clean_val=False,
expect_val='optional'),
Paramtest(param_cls=p13n.EuiccMandatoryServiceProfileB,
val='mandatory',
expect_clean_val=True,
expect_val='mandatory'),
Paramtest(param_cls=p13n.EuiccMandatoryServiceProfileB,
val='optional',
expect_clean_val=False,
expect_val='optional'),
]
Paramtest.iff_present_default = True
sucici = {
"prot_scheme_id_list": [
{"priority": 0, "identifier": 2, "key_index": 1},
{"priority": 1, "identifier": 1, "key_index": 2},
],
"hnet_pubkey_list": [
{"hnet_pubkey_identifier": 27,
"hnet_pubkey": "0472da71976234ce833a6907425867b82e074d44ef907dfb4b3e21c1c2256ebcd15a7ded52fcbb097a4ed250e036c7b9c8c7004c4eedc4f068cd7bf8d3f900e3b4"},
{"hnet_pubkey_identifier": 30,
"hnet_pubkey": "5a8d38864820197c3394b92613b20b91633cbd897119273bf8e4a6f4eec0a650"},
],
}
param_tests.extend([
Paramtest(param_cls=p13n.SuciActive, val='SUCI-on',
expect_clean_val=True,
expect_val={'5G-SUCI-active': 'SUCI-on'}),
Paramtest(param_cls=p13n.SuciActive, val='SUCI-off',
expect_clean_val=False,
expect_val={'5G-SUCI-active': 'SUCI-off'}),
Paramtest(param_cls=p13n.SuciInUsim, val='SUCI-in-UE',
expect_clean_val=False,
expect_val={'5G-SUCI-in-USIM': 'SUCI-in-UE'}),
Paramtest(param_cls=p13n.SuciInUsim, val='SUCI-in-USIM',
expect_clean_val=True,
expect_val={'5G-SUCI-in-USIM': 'SUCI-in-USIM'}),
Paramtest(param_cls=p13n.SuciRi, val='123',
expect_clean_val='123',
expect_val={'5G-SUCI-RI': '123'}),
Paramtest(param_cls=p13n.SuciRi, val='0',
expect_clean_val='0',
expect_val={'5G-SUCI-RI': '0'}),
Paramtest(param_cls=p13n.SuciRi, val='9999',
expect_clean_val='9999',
expect_val={'5G-SUCI-RI': '9999'}),
Paramtest(param_cls=p13n.SuciCalcInfoUe,
val=json.dumps(sucici),
expect_clean_val=sucici,
expect_val={'5G-SUCI-CalcInfo-UE': json.dumps(sucici)}),
Paramtest(param_cls=p13n.SuciCalcInfoUsim,
val=json.dumps(sucici),
expect_clean_val=sucici,
expect_val={'5G-SUCI-CalcInfo-USIM': json.dumps(sucici)}),
Paramtest(param_cls=p13n.GfmSuciRi, val='123',
expect_clean_val='123',
expect_val={'GFM-5G-SUCI-RI': '123'}),
Paramtest(param_cls=p13n.GfmSuciRi, val='0',
expect_clean_val='0',
expect_val={'GFM-5G-SUCI-RI': '0'}),
Paramtest(param_cls=p13n.GfmSuciRi, val='9999',
expect_clean_val='9999',
expect_val={'GFM-5G-SUCI-RI': '9999'}),
Paramtest(param_cls=p13n.GfmSuciCalcInfoUe,
val=json.dumps(sucici),
expect_clean_val=sucici,
expect_val={'GFM-5G-SUCI-CalcInfo-UE': json.dumps(sucici)}),
])
Paramtest.iff_present_default = False
for sdkey_cls in (
# thin out the number of tests, as a compromise between completeness and test runtime
@@ -310,14 +419,11 @@ class ConfigurableParameterTest(unittest.TestCase):
p13n.SdKeyScp80Kvn03DesDek,
#p13n.SdKeyScp80Kvn03DesEnc,
#p13n.SdKeyScp80Kvn03DesMac,
#p13n.SdKeyScp81Kvn40AesDek,
p13n.SdKeyScp81Kvn40DesDek,
p13n.SdKeyScp81Kvn40AesDek,
#p13n.SdKeyScp81Kvn40Tlspsk,
#p13n.SdKeyScp81Kvn41AesDek,
#p13n.SdKeyScp81Kvn41DesDek,
p13n.SdKeyScp81Kvn41Tlspsk,
#p13n.SdKeyScp81Kvn42AesDek,
#p13n.SdKeyScp81Kvn42DesDek,
#p13n.SdKeyScp81Kvn42Tlspsk,
):
@@ -363,7 +469,8 @@ class ConfigurableParameterTest(unittest.TestCase):
for t in param_tests:
test_idx += 1
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
testlog = []
testlog.append(f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})')
param = None
try:
@@ -371,21 +478,32 @@ class ConfigurableParameterTest(unittest.TestCase):
param.input_value = t.val
param.validate()
except ValueError as e:
raise ValueError(f'{logloc}: {e}') from e
raise ValueError(f'{" ".join(testlog)}: {e}') from e
clean_val = param.value
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
testlog.append(f'clean_val={valtypestr(clean_val)}')
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
raise ValueError(f'{logloc}: expected'
raise ValueError(f'{" ".join(testlog)}: expected'
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
# pes = copy.deepcopy(orig_pes)
pes = ProfileElementSequence.from_der(der)
found = list((t.param_cls.get_value_from_pes(pes) or {}).values())
testlog.append(f"previous value: {found}")
if t.iff_present and not found:
testlog.append("skipping, param not in template.")
output = "\nskip: " + "\n ".join(testlog)
outputs.append(output)
print(output)
continue
try:
param.apply(pes)
except ValueError as e:
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
raise ValueError(f'{" ".join(testlog)} apply_val(clean_val): {e}') from e
changed_der = pes.to_der()
@@ -403,22 +521,18 @@ class ConfigurableParameterTest(unittest.TestCase):
else:
read_back_val_type = f'{type(read_back_val).__name__}'
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
testlog.append(f'read_back_val={valtypestr(read_back_val)}')
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
raise ValueError(f'{" ".join(testlog)}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
expect_val = t.expect_val
if not isinstance(expect_val, dict):
expect_val = { t.param_cls.get_name(): expect_val }
if read_back_val != expect_val:
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
raise ValueError(f'{" ".join(testlog)}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
ok = logloc.replace(' clean_val', '\n\tclean_val'
).replace(' read_back_val', '\n\tread_back_val'
).replace('=', '=\t'
)
output = f'\nok: {ok}'
output = "\nok: " + "\n ".join(testlog)
outputs.append(output)
print(output)
@@ -444,6 +558,190 @@ class ConfigurableParameterTest(unittest.TestCase):
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
class TestValidateVal(unittest.TestCase):
"""validate_val() tests for various ConfigurableParameter subclasses."""
def _ok(self, cls, val, expected=None):
result = cls.validate_val(val)
if expected is not None:
self.assertEqual(result, expected)
return result
def _err(self, cls, val):
with self.assertRaises(ValueError):
cls.validate_val(val)
# --- Iccid ---
def test_iccid_18digits_adds_luhn(self):
result = self._ok(p13n.Iccid, '998877665544332211')
self.assertIsInstance(result, str)
self.assertEqual(len(result), 19)
self.assertTrue(result.isdecimal())
def test_iccid_19digits_passthrough(self):
result = self._ok(p13n.Iccid, '9988776655443322110')
self.assertIsInstance(result, str)
self.assertEqual(len(result), 19)
def test_iccid_too_short(self):
self._err(p13n.Iccid, '12345678901234567') # 17 digits
def test_iccid_too_long(self):
self._err(p13n.Iccid, '1' * 21)
def test_iccid_non_digits(self):
self._err(p13n.Iccid, '99887766554433221X')
# --- Imsi ---
def test_imsi_valid_short(self):
self._ok(p13n.Imsi, '001010', '001010')
def test_imsi_valid_long(self):
self._ok(p13n.Imsi, '001010123456789', '001010123456789')
def test_imsi_too_short(self):
self._err(p13n.Imsi, '12345') # 5 digits, min is 6
def test_imsi_too_long(self):
self._err(p13n.Imsi, '1' * 16)
def test_imsi_non_digits(self):
self._err(p13n.Imsi, '00101A123456789')
# --- Pin1 ---
def test_pin1_4digits(self):
# DecimalHexParam encodes each digit as its ASCII byte, then rpad to 8 bytes with 0xff
self._ok(p13n.Pin1, '1234', b'1234\xff\xff\xff\xff')
def test_pin1_8digits(self):
self._ok(p13n.Pin1, '12345678', b'12345678')
def test_pin1_too_short(self):
self._err(p13n.Pin1, '123')
def test_pin1_too_long(self):
self._err(p13n.Pin1, '123456789')
def test_pin1_non_digits(self):
self._err(p13n.Pin1, '123A')
# --- Puk1 ---
def test_puk1_8digits(self):
self._ok(p13n.Puk1, '12345678', b'12345678')
def test_puk1_wrong_length(self):
self._err(p13n.Puk1, '1234567') # 7 digits
self._err(p13n.Puk1, '123456789') # 9 digits
def test_puk1_non_digits(self):
self._err(p13n.Puk1, '1234567X')
# --- K (BinaryParam) ---
def test_k_valid_hex_str(self):
self._ok(p13n.K, '000102030405060708090a0b0c0d0e0f',
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f')
def test_k_valid_bytes(self):
raw = bytes(range(16))
self._ok(p13n.K, raw, raw)
def test_k_wrong_length(self):
self._err(p13n.K, '00' * 15) # 15 bytes, allow_len requires 16 or 32
def test_k_non_hex(self):
self._err(p13n.K, 'gg' * 16)
def test_k_odd_hex_digits(self):
self._err(p13n.K, '0' * 31) # odd number of hex digits
class TestEnumParam(unittest.TestCase):
"""Tests for the EnumParam machinery, using AlgorithmID as the concrete subclass."""
# --- validate_val ---
def test_validate_by_name_exact(self):
self.assertEqual(p13n.AlgorithmID.validate_val('Milenage'), 1)
self.assertEqual(p13n.AlgorithmID.validate_val('TUAK'), 2)
self.assertEqual(p13n.AlgorithmID.validate_val('usim-test'), 3)
def test_validate_by_int(self):
self.assertEqual(p13n.AlgorithmID.validate_val(1), 1)
self.assertEqual(p13n.AlgorithmID.validate_val(2), 2)
self.assertEqual(p13n.AlgorithmID.validate_val(3), 3)
def test_validate_fuzzy_case(self):
self.assertEqual(p13n.AlgorithmID.validate_val('milenage'), 1)
self.assertEqual(p13n.AlgorithmID.validate_val('MILENAGE'), 1)
self.assertEqual(p13n.AlgorithmID.validate_val('tuak'), 2)
def test_validate_fuzzy_hyphen_underscore(self):
self.assertEqual(p13n.AlgorithmID.validate_val('usim-test'), 3)
def test_validate_invalid_name(self):
with self.assertRaises(ValueError):
p13n.AlgorithmID.validate_val('unknown')
def test_validate_invalid_int(self):
with self.assertRaises(ValueError):
p13n.AlgorithmID.validate_val(99)
def test_validate_returns_int(self):
result = p13n.AlgorithmID.validate_val('Milenage')
self.assertIsInstance(result, int)
self.assertNotIsInstance(result, enum.Enum)
# --- map_name_to_val ---
def test_map_name_exact(self):
self.assertEqual(p13n.AlgorithmID.map_name_to_val('Milenage'), 1)
def test_map_name_fuzzy(self):
self.assertEqual(p13n.AlgorithmID.map_name_to_val('milenage'), 1)
self.assertEqual(p13n.AlgorithmID.map_name_to_val('usim-test'), 3)
def test_map_name_strict_raises(self):
with self.assertRaises(ValueError):
p13n.AlgorithmID.map_name_to_val('unknown', strict=True)
def test_map_name_nonstrict_returns_none(self):
self.assertIsNone(p13n.AlgorithmID.map_name_to_val('unknown', strict=False))
# --- map_val_to_name ---
def test_map_val_known(self):
self.assertEqual(p13n.AlgorithmID.map_val_to_name(1), 'Milenage')
self.assertEqual(p13n.AlgorithmID.map_val_to_name(2), 'TUAK')
self.assertEqual(p13n.AlgorithmID.map_val_to_name(3), 'usim-test')
def test_map_val_unknown_nonstrict(self):
self.assertIsNone(p13n.AlgorithmID.map_val_to_name(99))
def test_map_val_unknown_strict(self):
with self.assertRaises(ValueError):
p13n.AlgorithmID.map_val_to_name(99, strict=True)
# --- name_normalize ---
def test_name_normalize(self):
self.assertEqual(p13n.AlgorithmID.name_normalize('Milenage'), 'Milenage')
self.assertEqual(p13n.AlgorithmID.name_normalize('milenage'), 'Milenage')
self.assertEqual(p13n.AlgorithmID.name_normalize('usim-test'), 'usim-test')
# --- clean_name_str ---
def test_clean_name_str(self):
self.assertEqual(p13n.AlgorithmID.clean_name_str('usim-test'), 'usim-test')
self.assertEqual(p13n.AlgorithmID.clean_name_str('usim_test'), 'usim_test')
self.assertEqual(p13n.AlgorithmID.clean_name_str('Milenage'), 'milenage')
self.assertEqual(p13n.AlgorithmID.clean_name_str('foo bar!'), 'foobar')
if __name__ == "__main__":
if '-u' in sys.argv:
update_expected_output = True
@@ -0,0 +1,78 @@
#!/usr/bin/env python3
# (C) 2026 by sysmocom - s.f.m.c. GmbH
# All Rights Reserved
#
# Author: Philipp Maier <pmaier@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import os
from pySim.profile import CardProfile
from pySim.ts_51_011 import CardProfileSIM
from pySim.ts_102_221 import CardProfileUICC
class TestDecodeSelectResponse_CardProfile(unittest.TestCase):
def decode_select_response(self, card_Profile: CardProfile, testcases: list[dict]):
for testcase in testcases:
resp_hex = testcase['resp_hex']
decoded = card_Profile.decode_select_response(resp_hex)
if testcase['decoded']:
self.assertEqual(decoded, testcase['decoded'])
else:
print("no testvector to compare against, assuming the following output is correct:")
print("resp_hex:", resp_hex)
print("decoded:", decoded)
def test_CardProfileSIM(self):
testcases = [
# MF
{"resp_hex" : "000000003f000100000000000981020c0400838a838a",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'mf'}}, 'proprietary_info': {'available_memory': 0}, 'file_id': '3f00', 'file_characteristics': '81', 'num_direct_child_df': 2, 'num_direct_child_ef': 12, 'num_chv_unblock_adm_codes': 4}},
# DF.TELECOM
{"resp_hex" : "000000007f100200000000000981000d0400838a838a",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'df'}}, 'proprietary_info': {'available_memory': 0}, 'file_id': '7f10', 'file_characteristics': '81', 'num_direct_child_df': 0, 'num_direct_child_ef': 13, 'num_chv_unblock_adm_codes': 4}},
# EF.MSISDN
{"resp_hex" : "000000346f40040011ffff0102011a",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'working_ef', 'structure': 'linear_fixed'}, 'record_len': 26, 'num_of_rec': 2}, 'proprietary_info': {}, 'file_id': '6f40', 'file_size': 52, 'access_conditions': '11ffff', 'life_cycle_status_int': 'creation'}},
# EF.ICCID
{"resp_hex" : "0000000a2fe204000cffff01020000",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'file_type': 'working_ef', 'structure': 'transparent'}}, 'proprietary_info': {}, 'file_id': '2fe2', 'file_size': 10, 'access_conditions': '0cffff', 'life_cycle_status_int': 'creation'}},
]
self.decode_select_response(CardProfileSIM, testcases)
def test_CardProfileUICC(self):
testcases = [
# MF
{"resp_hex" : "622c8202782183023f00a50c80017183040003a7388701018a01058b032f0601c60c90016083010183010a83010b",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'df', 'structure': 'no_info_given'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'?\x00', 'proprietary_information': {'uicc_characteristics': b'q', 'available_memory': 239416, 'supported_filesystem_commands': {'terminal_capability': True}}, 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'/\x06', 'ef_arr_record_nr': 1}, 'pin_status_template_do': [{'ps_do': b'`'}, {'key_reference': 1}, {'key_reference': 10}, {'key_reference': 11}]}},
# ADF.USIM
{"resp_hex" : "623d8202782183027fd0840ca0000000871002ff49ff0589a50c80017183040003a7388701018a01058b032f0601c60f90017083010183018183010a83010b",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'df', 'structure': 'no_info_given'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'\x7f\xd0', 'df_name': b'\xa0\x00\x00\x00\x87\x10\x02\xffI\xff\x05\x89', 'proprietary_information': {'uicc_characteristics': b'q', 'available_memory': 239416, 'supported_filesystem_commands': {'terminal_capability': True}}, 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'/\x06', 'ef_arr_record_nr': 1}, 'pin_status_template_do': [{'ps_do': b'p'}, {'key_reference': 1}, {'key_reference': 129}, {'key_reference': 10}, {'key_reference': 11}]}},
# ADF.ISIM
{"resp_hex" : "623d8202782183027fb0840ca0000000871004ff49ff0589a50c80017183040003a7388701018a01058b032f0601c60f90017083010183018183010a83010b",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'df', 'structure': 'no_info_given'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'\x7f\xb0', 'df_name': b'\xa0\x00\x00\x00\x87\x10\x04\xffI\xff\x05\x89', 'proprietary_information': {'uicc_characteristics': b'q', 'available_memory': 239416, 'supported_filesystem_commands': {'terminal_capability': True}}, 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'/\x06', 'ef_arr_record_nr': 1}, 'pin_status_template_do': [{'ps_do': b'p'}, {'key_reference': 1}, {'key_reference': 129}, {'key_reference': 10}, {'key_reference': 11}]}},
# EF.IMSI
{"resp_hex" : "62178202412183026f078a01058b036f060a80020009880138",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'working_ef', 'structure': 'transparent'}, 'record_len': None, 'num_of_rec': None}, 'file_identifier': b'o\x07', 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'o\x06', 'ef_arr_record_nr': 10}, 'file_size': 9, 'short_file_identifier': 7}},
# EF.ECC
{"resp_hex" : "621a82054221000e0283026fb78a01058b036f06088002001c880108",
"decoded" : {'file_descriptor': {'file_descriptor_byte': {'shareable': True, 'file_type': 'working_ef', 'structure': 'linear_fixed'}, 'record_len': 14, 'num_of_rec': 2}, 'file_identifier': b'o\xb7', 'life_cycle_status_integer': 'operational_activated', 'security_attrib_referenced': {'ef_arr_file_id': b'o\x06', 'ef_arr_record_nr': 8}, 'file_size': 28, 'short_file_identifier': 1}},
]
self.decode_select_response(CardProfileUICC, testcases)
if __name__ == "__main__":
unittest.main()
+59 -69
View File
@@ -68,7 +68,7 @@ class ParamSourceTest(unittest.TestCase):
def test_param_source(self):
class ParamSourceTest(D):
class Paramtest(D):
mandatory = (
'param_source',
'n',
@@ -78,6 +78,11 @@ class ParamSourceTest(unittest.TestCase):
'expect_arg',
'csv_rows',
)
param_source: param_source.ParamSource
n: int
expect: object
expect_arg: object
csv_rows: object
def expect_const(t, vals):
return tuple(t.expect_arg) == tuple(vals)
@@ -100,74 +105,59 @@ class ParamSourceTest(unittest.TestCase):
return True
param_source_tests = [
ParamSourceTest(param_source=param_source.ConstantSource.from_str('123'),
n=3,
expect=expect_const,
expect_arg=('123', '123', '123')
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('12345'),
n=3,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 5,
'val_maxlen': 5,
},
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('1..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 1,
'val_maxlen': 3,
},
),
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('001..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 3,
'val_maxlen': 3,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8,
},
),
ParamSourceTest(param_source=param_source.IncDigitSource.from_str('10001'),
n=3,
expect=expect_const,
expect_arg=('10001', '10002', '10003')
),
ParamSourceTest(param_source=param_source.CsvSource('column_name'),
n=3,
expect=expect_const,
expect_arg=('first val', 'second val', 'third val'),
csv_rows=(
{'column_name': 'first val',},
{'column_name': 'second val',},
{'column_name': 'third val',},
)
),
Paramtest(param_source=param_source.ConstantSource.from_str('123'),
n=3,
expect=expect_const,
expect_arg=('123', '123', '123')),
Paramtest(param_source=param_source.RandomDigitSource.from_str('12345'),
n=3,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 5,
'val_maxlen': 5}),
Paramtest(param_source=param_source.RandomDigitSource.from_str('1..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 1,
'val_maxlen': 3}),
Paramtest(param_source=param_source.RandomDigitSource.from_str('001..999'),
n=10,
expect=expect_random,
expect_arg={'digits': decimals,
'val_minlen': 3,
'val_maxlen': 3}),
Paramtest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8}),
Paramtest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8}),
Paramtest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
n=3,
expect=expect_random,
expect_arg={'digits': hexadecimals,
'val_minlen': 8,
'val_maxlen': 8}),
Paramtest(param_source=param_source.IncDigitSource.from_str('10001'),
n=3,
expect=expect_const,
expect_arg=('10001', '10002', '10003')),
Paramtest(param_source=param_source.CsvSource('column_name'),
n=3,
expect=expect_const,
expect_arg=('first val', 'second val', 'third val'),
csv_rows=(
{'column_name': 'first val'},
{'column_name': 'second val'},
{'column_name': 'third val'},
)),
]
outputs = []
File diff suppressed because it is too large Load Diff