Compare commits

..

55 Commits

Author SHA1 Message Date
Neels Hofmeyr
f199dd6a8d ConfigurableParameter.get_typical_input_len: limit to 10 lines
Change-Id: Ia3d79e786f397a02bf2a8fafac5030d1198d9f76
2026-04-28 05:18:09 +02:00
Neels Hofmeyr
181c85d012 add SUCI parameters
Change-Id: I3c0793b8a67bbd0c8247784bd3b5cbd265f94ec2
2026-04-28 05:18:09 +02:00
Neels Hofmeyr
d28cf0a05e tweak test_configurable_parameters.py: add iff_present flag
apply a parameter only when it exists in the template, will be useful
for suci

Change-Id: I5811ecde4c4e880bb8dbd22fffe23faafcfe36ad
2026-04-25 06:13:02 +02:00
Neels Hofmeyr
a1a85c3214 tweak test_configurable_parameters.py: show value found in template
Change-Id: If9039bbb8547ee24ae784a932f60cd5de6c9247b
2026-04-25 06:06:37 +02:00
Neels Hofmeyr
f3760f7572 tweak test_configurable_parameters.py: saner output composition.
Change-Id: Id3e3b46b2b3d7919a75c620803ce28d2a715008b
2026-04-25 06:01:21 +02:00
Neels Hofmeyr
8b91249781 xo/test_configurable_parameters
Change-Id: I15573e801a62f94f0701637562e2d64a212041ca
2026-04-25 05:59:32 +02:00
Neels Hofmeyr
cc3f99b472 personalization: EF_SMSP: keep same length as found in template
Change-Id: Id24752101ae82c4986209f4103cc9cbdcce8ce1d
2026-04-23 23:50:12 +02:00
Neels Hofmeyr
d8c3d55c20 personalization: fix EF_SMSP length, alpha_id padding
The efFileSize needs to be updated and the alpha_id needs to be != None.

Change-Id: Ief6e02517f3e96158a2509d763b88aec4bd5a296
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
8333d6a340 saip: MncLen: fix for missing EF-AD
Change-Id: I7f5ce22d23808c91234c6a75b8a22264d3c5bc92
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
c28abecf8c saip.batch: log parameter errors
Change-Id: I6a46b2dc9018078ab8361226d1e6b50d3b4e1aaa
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
5fac39bb51 MncLen
Change-Id: I6c600faeab00ffb072acbe94c9a8b2d1397c07d3
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
61357d223e sdkeys kv40 aes
Change-Id: If5b53c840ebd1f224f9bb4706a602b415194f47b
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
66acb109ab esim/http_json_api.py: support text/plain response Content-Type
Allow returning text/plain Content-Types as 'data' output argument.

So far, all the esim/http_json_api functions require a JSON response.
However, a specific vendor has a list function where the request is JSON
but the response is text/plain CSV data. Allow and return in a dict.

Change-Id: Iba6e4cef1048b376050a435a900c0f395655a790
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
0c0a395f00 Revert "esim/http_json_api: extend JSON API with server functionality"
This reverts commit e00c0becca.
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
ab07954999 Revert "esim/http_json_api: add missing apidoc"
This reverts commit 0a1c5a27d7.
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
907c29735e Revert "http_json_api: Only require Content-Type if response body is non-empty"
This reverts commit e0a9e73267.
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
e6aef652b0 Revert "esim/http_json_api: add alternative API interface"
This reverts commit f9d7c82b4d.
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
09fbfdc39e Revert "esim/http_json_api: add alternative API interface (follow up)"
This reverts commit 8b2a49aa8e.
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
6d0b3f8b85 Revert "esim/http_json_api: allow URL rewriting"
This reverts commit 0634f77308.
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
e0ac64d501 saip: add numeric_base indicator to ConfigurableParameter and ParamSource
By default, numeric_base = None, to indicate that there are no explicit
limitations on the number space.

For parameters that are definitely decimal, set numeric_base = 10.
For definitely hexadecimal, set numeric_base = 16.

Do the same for ConfigurableParameter as well as ParamSource, so callers
can match them up: if a parameter is numeric_base = 10, then omit
sources that are numeric_base = 16, and vice versa.

Change-Id: Ib0977bbdd9a85167be7eb46dd331fedd529dae01
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
f66b6fcc5b saip SmspTpScAddr.get_values_from_pes: allow empty values
Change-Id: Ibbdd08f96160579238b50699091826883f2e9f5a
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
6f04e5e400 SdKey KVN4X ID02: set key_usage_qual=0x48
Related: SYS#7865
Change-Id: Idc5d33a4a003801f60c95fff6931706a9aeb6692
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
3470a3e062 saip: SdKey.__doc__: update SdKey listing
Change-Id: Ib5011b0c7d76b082231744cf09077628dc4e69b7
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
696da34e81 esim.saip.personalization: fix TLSPSK keys
Add AES variant of TLSPSK DEK (SCP81 KVN40 key_id=0x02).

Change-Id: I713a008fd26bbfcf437e0f29717b753f058ce76a
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
42b8a70085 add comment about not updating existing key_usage_qualifier
Change-Id: Ie23ae5fde17be6b37746784bf1601b4d0874397a
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
61264e32b8 test_configurable_parameters.py: add tests for new parameters
For:
SmspTpScAddr
MilenageRotation
MilenageXoringConstants
TuakNrOfKeccak

Change-Id: Iecbea14fe31a9ee08d871dcde7f295d26d7bd001
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
659cb5d6c4 saip: SmspTpScAddr: fix get_values_from_pes
Change-Id: I2010305340499c907bb7618c04c61e194db34814
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
c20b979875 ConfigurableParameter: safer val length check
Change-Id: Ibe91722ed1477b00d20ef5e4e7abd9068ff2f3e4
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
ddb5e3168e UppAudit: better indicate exception cause
Change-Id: I4d986b89a473a5b12ed56b4710263b034876a33e
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
ebeb00b48d remove transitional name mapping
This reverts commit I974cb6c393a2ed2248a6240c2722d157e9235c33

Now, finally, all SdKey classes have a unified logical naming scheme.

Change-Id: Ic185af4a903c2211a5361d023af9e7c6fc57ae78
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
69a74eb930 transitional name mapping
To help existing applications transition to a common naming scheme for
the SdKey classes, offer this intermediate result, where the SdKey
classes' .name are still unchanged as before generating them.

Change-Id: I974cb6c393a2ed2248a6240c2722d157e9235c33
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
c00813f66f generate sdkey classes from a list
Change-Id: Ic92ddea6e1fad8167ea75baf78ffc3eb419838c4
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
0b8b59e74f saip SmspTpScAddr: safeguard against decoding error
Reading the TS48 V6.0 eSIM_GTP_SAIP2.1A_NoBERTLV profile results in an
exception [1] in SmspTpScAddr. I have a caller that needs to skip
erratic values instead of raising.

The underlying issue, I presume, is that either the data needs
validation before decode_record_bin(), or decode_record_bin() needs
well-defined error handling.

So far I know only of this IndexError, so, as a workaround, catch that.

[1]
  File "/pysim/pySim/esim/saip/personalization.py", line 617, in get_values_from_pes
    ef_smsp_dec = ef_smsp.decode_record_bin(f_smsp.body, 1)
  File "/pysim/pySim/filesystem.py", line 1047, in decode_record_bin
    return parse_construct(self._construct, raw_bin_data)
  File "/application/venv/lib/python3.13/site-packages/osmocom/construct.py", line 550, in parse_construct
    parsed = c.parse(raw_bin_data, total_len=length, **context)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 404, in parse
    return self.parse_stream(io.BytesIO(data), **contextkw)
           ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 416, in parse_stream
    return self._parsereport(stream, context, "(parsing)")
           ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 428, in _parsereport
    obj = self._parse(stream, context, path)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 2236, in _parse
    subobj = sc._parsereport(stream, context, path)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 428, in _parsereport
    obj = self._parse(stream, context, path)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 2770, in _parse
    return self.subcon._parsereport(stream, context, path)
           ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 428, in _parsereport
    obj = self._parse(stream, context, path)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 2236, in _parse
    subobj = sc._parsereport(stream, context, path)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 428, in _parsereport
    obj = self._parse(stream, context, path)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 2770, in _parse
    return self.subcon._parsereport(stream, context, path)
           ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 428, in _parsereport
    obj = self._parse(stream, context, path)
  File "/application/venv/lib/python3.13/site-packages/construct/core.py", line 820, in _parse
    return self._decode(obj, context, path)
           ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
  File "/application/venv/lib/python3.13/site-packages/osmocom/construct.py", line 268, in _decode
    if r[-1] == 'f':
       ~^^^^
  File "/application/venv/lib/python3.13/site-packages/osmocom/utils.py", line 50, in __getitem__
    return hexstr(super().__getitem__(val))
                  ~~~~~~~~~~~~~~~~~~~^^^^^
IndexError: string index out of range

Change-Id: Ic436e206776b81f24de126e8ee0ae8bf5f3e8d7a
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
09f2e50bd6 saip/param_source: try to not repeat random values
Change-Id: I4fa743ef5677580f94b9df16a5051d1d178edeb0
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
c712ecbb0e use secrets.SystemRandom as secure random nr source
secrets.SystemRandom is defined as the most secure random source
available on the given operating system.

Change-Id: I8049cd1292674b3ced82b0926569128535af6efe
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
69218d135f use random.SystemRandom as random nr source (/dev/urandom)
/dev/urandom is somewhat better than python's PRNG

Change-Id: I6de38c14ac6dd55bc84d53974192509c18d02bfa
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
405ccabafd add test_param_src.py
Change-Id: I03087b84030fddae98b965e0075d44e04ec6ba5c
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
86fecce1db param_source: allow plugging a random implementation (for testing)
Change-Id: Idce2b18af70c17844d6f09f7704efc869456ac39
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
a24dd6ac4a RandomHexDigitSource: rather return in string format, not bytes
Change-Id: I4e86289f6fb72cbd4cf0c90b8b49538cfab69a7f
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
350aa9e01f personalization: add int as input type for BinaryParameter
Change-Id: I31d8142cb0847a8b291f8dc614d57cb4734f0190
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
167783ef0a personalization.ConfigurableParameter: fix BytesIO() input
Change-Id: I0ad160eef9015e76eef10baee7c6b606fe249123
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
5dbe1d3360 add test_configurable_parameters.py
Change-Id: Ia55f0d11f8197ca15a948a83a34b3488acf1a0b4
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
ed5c032f76 ConfigurableParameter: do not magically overwrite the 'name' attribute
Change-Id: I6f631444c6addeb7ccc5f6c55b9be3dc83409169
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
f677a99471 personalization audit: optionally audit all (unknown) SD keys
By a flag, allow to audit also all Security Domain KVN that we have
*not* created ConfigurableParameter subclasses for.

For example, SCP80 has reserved kvn 0x01..0x0f, but we offer only
Scp80Kvn01, Scp80Kvn02, Scp80Kvn03. So we would not show kvn
0x03..0x0f in an audit.

This patch includes audits of all SD key kvn there may be in the UPP.
This will help to spot SD keys that may already be present in a UPP
template, with unexpected / unusual kvn.

Change-Id: Icaf6f7b589f117868633c0968a99f2f0252cf612
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
566a578a63 personalization: implement UppAudit and BatchAudit
Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
475d98dcea param_source: allow input val expansion like '0 * 32'
Working with keys, we often generate 4, 8, 16, 32 digit wide random
values. Those then typically have default input values like

 00000000000000000000000000000000

it is hard for humans to count the number of digits. Much easier:

 00*16

Teach the ParamSource subclasses dealing with random values to
understand an expansion like this. Any expansion is carried out before
all other input value handling.

Use this expansion also in the default_value of ConfigurableParameter
subclasses that have a default_source pointing at a ParamSource that now
understand this expansion.

Related: SYS#6768
Change-Id: Ie7171c152a7b478736f8825050305606b5af5735
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
a0ceb319e4 comment in uicc.py on Security Domain Keys: add SCP81
Change-Id: Ib0205880f58e78c07688b4637abd5f67ea0570d1
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
f091d53cff personalization: fix SdKey.apply_val() implementation
'securityDomain' elements are decoded to ProfileElementSD instances,
which keep higher level representations of the key data apart from the
decoded[] lists.

So far, apply_val() was dropping binary values in decoded[], which does
not work, because ProfileElementSD._pre_encode() overwrites
self.decoded[] from the higher level representation.

Implement using
- ProfileElementSD.find_key() and SecurityDomainKeyComponent to modify
  an exsiting entry, or
- ProfileElementSD.add_key() to create a new entry.

Before this patch, SdKey parameters seemed to patch PES successfully,
but their modifications did not end up in the encoded DER.

(BTW, this does not fix any other errors that may still be present in
the various SdKey subclasses, patches coming up.)

Related: SYS#6768
Change-Id: I07dfc378705eba1318e9e8652796cbde106c6a52
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
f45fb769e1 personalization: add get_typical_input_len() to ConfigurableParameter
The aim is to tell a user interface how wide an input text field should
be chosen to be convenient -- ideally showing the entire value in all
cases, but not too huge for fields that have no sane size limit.

Change-Id: I2568a032167a10517d4d75d8076a747be6e21890
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
e293227ead personalization: make AlgorithmID a new EnumParam
The AlgorithmID has a few preset values, and hardly anyone knows which
is which. So instead of entering '1', '2' or '3', make it work with
prededined values 'Milenage', 'TUAK' and 'usim-test'.

Implement the enum value part abstractly in new EnumParam.

Make AlgorithmID a subclass of EnumParam and define the values as from
pySim/esim/asn1/saip/PE_Definitions-3.3.1.asn

Related: SYS#6768
Change-Id: I71c2ec1b753c66cb577436944634f32792353240
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
a3a124acea personalization: indicate default ParamSource per ConfigurableParameter
Add default_source class members pointing to ParamSource classes to all
ConfigurableParameter subclasses.

This is useful to automatically set up a default ParamSource for a given
ConfigurableParameter subclass, during user interaction to produce a
batch personalization.

For example, if the user selects a Pin1 parameter, a calling program can
implicitly set this to a RandomDigitSource, which will magically make it
work the way that most users need.

BTW, default_source and default_value can be combined to configure a
matching ParamSource instance:

  my_source = MyParam.default_source.from_str( MyParam.default_value )

Change-Id: Ie58d13bce3fa1aa2547cf3cee918c2f5b30a8b32
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
f4dd30dcf4 personalization: allow reading back multiple values from PES
Change-Id: Iecb68af7c216c6b9dc3add469564416b6f37f7b2
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
8c58834697 personalization: implement reading back values from a PES
Implement get_values_from_pes(), the reverse direction of apply_val():
read back and return values from a ProfileElementSequence. Implement for
all ConfigurableParameter subclasses.

Future: SdKey.get_values_from_pes() is reading pe.decoded[], which works
fine, but I07dfc378705eba1318e9e8652796cbde106c6a52 will change this
implementation to use the higher level ProfileElementSD members.

Implementation detail:

Implement get_values_from_pes() as classmethod that returns a generator.
Subclasses should yield all occurences of their parameter in a given
PES.

For example, the ICCID can appear in multiple places.
Iccid.get_values_from_pes() yields all of the individual values. A set()
of the results quickly tells whether the PES is consistent.

Rationales for reading back values:

This allows auditing an eSIM profile, particularly for producing an
output.csv from a batch personalization (that generated lots of random
key material which now needs to be fed to an HLR...).

Reading back from a binary result is more reliable than storing the
values that were fed into a personalization.
By auditing final DER results with this code, I discovered:
- "oh, there already was some key material in my UPP template."
- "all IMSIs ended up the same, forgot to set up the parameter."
- the SdKey.apply() implementations currently don't work, see
  I07dfc378705eba1318e9e8652796cbde106c6a52 for a fix.

Change-Id: I234fc4317f0bdc1a486f0cee4fa432c1dce9b463
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
9325a04952 personalization: add param_source.py, add batch.py
Implement pySim.esim.saip.batch.BatchPersonalization,
generating N eSIM profiles from a preset configuration.

Batch parameters can be fed by a constant, incrementing, random or from
CSV rows: add pySim.esim.saip.param_source.* classes to feed such input
to each of the BatchPersonalization's ConfigurableParameter instances.

Related: SYS#6768
Change-Id: I01ae40a06605eb205bfb409189fcd2b3a128855a
2026-04-22 20:12:13 +02:00
Neels Hofmeyr
2128cd3c5b ts_31_102.py: EF_SUCI_Calc_Info(TransparentEF): fix len test
while len(foo):

throws an exception when foo == None.
Instead doing

    while foo:

fixes a problem when reading in empty SUCI calc info data, e.g. from
TS48v7.0_SAIP2.3_BERTLV_SUCI_NoRAMRFM.der.

Change-Id: Ia4e2356d0241d7a6ca399ba7e8be7f27ec836104
2026-04-22 20:12:13 +02:00
9 changed files with 3943 additions and 3026 deletions

View File

@@ -16,12 +16,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import requests
from klein import Klein
from twisted.internet import defer, protocol, ssl, task, endpoints, reactor
from twisted.internet.posixbase import PosixReactorBase
from pathlib import Path
from twisted.web.server import Site, Request
import logging
from datetime import datetime
import time
@@ -129,12 +123,10 @@ class Es2PlusApiFunction(JsonHttpApiFunction):
class DownloadOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/downloadOrder'
input_params = {
'header': JsonRequestHeader,
'eid': param.Eid,
'iccid': param.Iccid,
'profileType': param.ProfileType
}
input_mandatory = ['header']
output_params = {
'header': JsonResponseHeader,
'iccid': param.Iccid,
@@ -145,7 +137,6 @@ class DownloadOrder(Es2PlusApiFunction):
class ConfirmOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/confirmOrder'
input_params = {
'header': JsonRequestHeader,
'iccid': param.Iccid,
'eid': param.Eid,
'matchingId': param.MatchingId,
@@ -153,7 +144,7 @@ class ConfirmOrder(Es2PlusApiFunction):
'smdsAddress': param.SmdsAddress,
'releaseFlag': param.ReleaseFlag,
}
input_mandatory = ['header', 'iccid', 'releaseFlag']
input_mandatory = ['iccid', 'releaseFlag']
output_params = {
'header': JsonResponseHeader,
'eid': param.Eid,
@@ -166,13 +157,12 @@ class ConfirmOrder(Es2PlusApiFunction):
class CancelOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/cancelOrder'
input_params = {
'header': JsonRequestHeader,
'iccid': param.Iccid,
'eid': param.Eid,
'matchingId': param.MatchingId,
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
}
input_mandatory = ['header', 'finalProfileStatusIndicator', 'iccid']
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
output_params = {
'header': JsonResponseHeader,
}
@@ -182,10 +172,9 @@ class CancelOrder(Es2PlusApiFunction):
class ReleaseProfile(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/releaseProfile'
input_params = {
'header': JsonRequestHeader,
'iccid': param.Iccid,
}
input_mandatory = ['header', 'iccid']
input_mandatory = ['iccid']
output_params = {
'header': JsonResponseHeader,
}
@@ -195,7 +184,6 @@ class ReleaseProfile(Es2PlusApiFunction):
class HandleDownloadProgressInfo(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
input_params = {
'header': JsonRequestHeader,
'eid': param.Eid,
'iccid': param.Iccid,
'profileType': param.ProfileType,
@@ -204,9 +192,10 @@ class HandleDownloadProgressInfo(Es2PlusApiFunction):
'notificationPointStatus': param.NotificationPointStatus,
'resultData': param.ResultData,
}
input_mandatory = ['header', 'iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
expected_http_status = 204
class Es2pApiClient:
"""Main class representing a full ES2+ API client. Has one method for each API function."""
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
@@ -217,17 +206,18 @@ class Es2pApiClient:
if client_cert:
self.session.cert = client_cert
self.downloadOrder = JsonHttpApiClient(DownloadOrder(), url_prefix, func_req_id, self.session)
self.confirmOrder = JsonHttpApiClient(ConfirmOrder(), url_prefix, func_req_id, self.session)
self.cancelOrder = JsonHttpApiClient(CancelOrder(), url_prefix, func_req_id, self.session)
self.releaseProfile = JsonHttpApiClient(ReleaseProfile(), url_prefix, func_req_id, self.session)
self.handleDownloadProgressInfo = JsonHttpApiClient(HandleDownloadProgressInfo(), url_prefix, func_req_id, self.session)
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
def _gen_func_id(self) -> str:
"""Generate the next function call id."""
self.func_id += 1
return 'FCI-%u-%u' % (time.time(), self.func_id)
def call_downloadOrder(self, data: dict) -> dict:
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
return self.downloadOrder.call(data, self._gen_func_id())
@@ -247,116 +237,3 @@ class Es2pApiClient:
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())
class Es2pApiServerHandlerSmdpp(abc.ABC):
"""ES2+ (SMDP+ side) API Server handler class. The API user is expected to override the contained methods."""
@abc.abstractmethod
def call_downloadOrder(self, data: dict) -> (dict, str):
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
pass
@abc.abstractmethod
def call_confirmOrder(self, data: dict) -> (dict, str):
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
pass
@abc.abstractmethod
def call_cancelOrder(self, data: dict) -> (dict, str):
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
pass
@abc.abstractmethod
def call_releaseProfile(self, data: dict) -> (dict, str):
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
pass
class Es2pApiServerHandlerMno(abc.ABC):
"""ES2+ (MNO side) API Server handler class. The API user is expected to override the contained methods."""
@abc.abstractmethod
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
pass
class Es2pApiServer(abc.ABC):
"""Main class representing a full ES2+ API server. Has one method for each API function."""
app = None
def __init__(self, port: int, interface: str, server_cert: str = None, client_cert_verify: str = None):
logger.debug("HTTP SRV: starting ES2+ API server on %s:%s" % (interface, port))
self.port = port
self.interface = interface
if server_cert:
self.server_cert = ssl.PrivateCertificate.loadPEM(Path(server_cert).read_text())
else:
self.server_cert = None
if client_cert_verify:
self.client_cert_verify = ssl.Certificate.loadPEM(Path(client_cert_verify).read_text())
else:
self.client_cert_verify = None
def reactor(self, reactor: PosixReactorBase):
logger.debug("HTTP SRV: listen on %s:%s" % (self.interface, self.port))
if self.server_cert:
if self.client_cert_verify:
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(self.client_cert_verify),
interface=self.interface)
else:
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(),
interface=self.interface)
else:
reactor.listenTCP(self.port, Site(self.app.resource()), interface=self.interface)
return defer.Deferred()
class Es2pApiServerSmdpp(Es2pApiServer):
"""ES2+ (SMDP+ side) API Server."""
app = Klein()
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerSmdpp,
server_cert: str = None, client_cert_verify: str = None):
super().__init__(port, interface, server_cert, client_cert_verify)
self.handler = handler
self.downloadOrder = JsonHttpApiServer(DownloadOrder(), handler.call_downloadOrder)
self.confirmOrder = JsonHttpApiServer(ConfirmOrder(), handler.call_confirmOrder)
self.cancelOrder = JsonHttpApiServer(CancelOrder(), handler.call_cancelOrder)
self.releaseProfile = JsonHttpApiServer(ReleaseProfile(), handler.call_releaseProfile)
task.react(self.reactor)
@app.route(DownloadOrder.path)
def call_downloadOrder(self, request: Request) -> dict:
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
return self.downloadOrder.call(request)
@app.route(ConfirmOrder.path)
def call_confirmOrder(self, request: Request) -> dict:
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
return self.confirmOrder.call(request)
@app.route(CancelOrder.path)
def call_cancelOrder(self, request: Request) -> dict:
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
return self.cancelOrder.call(request)
@app.route(ReleaseProfile.path)
def call_releaseProfile(self, request: Request) -> dict:
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
return self.releaseProfile.call(request)
class Es2pApiServerMno(Es2pApiServer):
"""ES2+ (MNO side) API Server."""
app = Klein()
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerMno,
server_cert: str = None, client_cert_verify: str = None):
super().__init__(port, interface, server_cert, client_cert_verify)
self.handler = handler
self.handleDownloadProgressInfo = JsonHttpApiServer(HandleDownloadProgressInfo(),
handler.call_handleDownloadProgressInfo)
task.react(self.reactor)
@app.route(HandleDownloadProgressInfo.path)
def call_handleDownloadProgressInfo(self, request: Request) -> dict:
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
return self.handleDownloadProgressInfo.call(request)

View File

@@ -155,11 +155,11 @@ class Es9pApiClient:
if server_cert_verify:
self.session.verify = server_cert_verify
self.initiateAuthentication = JsonHttpApiClient(InitiateAuthentication(), url_prefix, '', self.session)
self.authenticateClient = JsonHttpApiClient(AuthenticateClient(), url_prefix, '', self.session)
self.getBoundProfilePackage = JsonHttpApiClient(GetBoundProfilePackage(), url_prefix, '', self.session)
self.handleNotification = JsonHttpApiClient(HandleNotification(), url_prefix, '', self.session)
self.cancelSession = JsonHttpApiClient(CancelSession(), url_prefix, '', self.session)
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
self.handleNotification = HandleNotification(url_prefix, '', self.session)
self.cancelSession = CancelSession(url_prefix, '', self.session)
def call_initiateAuthentication(self, data: dict) -> dict:
return self.initiateAuthentication.call(data)

View File

@@ -19,10 +19,8 @@ import abc
import requests
import logging
import json
from typing import Optional, Tuple
from typing import Optional
import base64
from twisted.web.server import Request
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@@ -133,16 +131,6 @@ class JsonResponseHeader(ApiParam):
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
raise ValueError('Unknown/unspecified status "%s"' % status)
class JsonRequestHeader(ApiParam):
"""SGP.22 section 6.5.1.3."""
@classmethod
def verify_decoded(cls, data):
func_req_id = data.get('functionRequesterIdentifier')
if not func_req_id:
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
func_call_id = data.get('functionCallIdentifier')
if not func_call_id:
raise ValueError('Missing mandatory functionCallIdentifier in header')
class HttpStatusError(Exception):
pass
@@ -173,118 +161,65 @@ class ApiError(Exception):
class JsonHttpApiFunction(abc.ABC):
"""Base class for representing an HTTP[s] API Function."""
# The below class variables are used to describe the properties of the API function. Derived classes are expected
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
# prefix.
# the below class variables are expected to be overridden in derived classes
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder', see also method rewrite_url).
path = None
# dictionary of input parameters. key is parameter name, value is ApiParam class
input_params = {}
# list of mandatory input parameters
input_mandatory = []
# dictionary of output parameters. key is parameter name, value is ApiParam class
output_params = {}
# list of mandatory output parameters (for successful response)
output_mandatory = []
# list of mandatory output parameters (for failed response)
output_mandatory_failed = []
# expected HTTP status code of the response
expected_http_status = 200
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
http_method = 'POST'
# additional custom HTTP headers (client requests)
extra_http_req_headers = {}
# additional custom HTTP headers (server responses)
extra_http_res_headers = {}
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
self.url_prefix = url_prefix
self.func_req_id = func_req_id
self.session = session
def __new__(cls, *args, role = 'legacy_client', **kwargs):
"""
Args:
args: (see JsonHttpApiClient and JsonHttpApiServer)
role: role ('server' or 'client') in which the JsonHttpApiFunction should be created.
kwargs: (see JsonHttpApiClient and JsonHttpApiServer)
"""
# Create a dictionary with the class attributes of this class (the properties listed above and the encode_
# decode_ methods below). The dictionary will not include any dunder/magic methods
cls_attr = {attr_name: getattr(cls, attr_name) for attr_name in dir(cls) if not attr_name.startswith('__')}
# Normal instantiation as JsonHttpApiFunction:
if len(args) == 0 and len(kwargs) == 0:
return type(cls.__name__, (abc.ABC,), cls_attr)()
# Instantiation as as JsonHttpApiFunction with a JsonHttpApiClient or JsonHttpApiServer base
if role == 'legacy_client':
# Deprecated: With the advent of the server role (JsonHttpApiServer) the API had to be changed. To maintain
# compatibility with existing code (out-of-tree) the original behaviour and API interface and behaviour had
# to be preserved. Already existing JsonHttpApiFunction definitions will still work and the related objects
# may still be created on the original way: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session)
logger.warning('implicit role (falling back to legacy JsonHttpApiClient) is deprecated, please specify role explcitly')
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
result.api_func = result
result.legacy = True
return result
elif role == 'client':
# Create a JsonHttpApiFunction in client role
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='client')
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
result.api_func = result
return result
elif role == 'server':
# Create a JsonHttpApiFunction in server role
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='server')
result = type(cls.__name__, (JsonHttpApiServer,), cls_attr)(None, *args, **kwargs)
result.api_func = result
return result
else:
raise ValueError('Invalid role \'%s\' specified' % role)
def encode_client(self, data: dict) -> dict:
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
"""Validate an encode input dict into JSON-serializable dict for request body."""
output = {}
if func_call_id:
output['header'] = {
'functionRequesterIdentifier': self.func_req_id,
'functionCallIdentifier': func_call_id
}
for p in self.input_mandatory:
if not p in data:
raise ValueError('Mandatory input parameter %s missing' % p)
for p, v in data.items():
p_class = self.input_params.get(p)
if not p_class:
# pySim/esim/http_json_api.py:269:47: E1101: Instance of 'JsonHttpApiFunction' has no 'legacy' member (no-member)
# pylint: disable=no-member
if hasattr(self, 'legacy') and self.legacy:
output[p] = JsonRequestHeader.encode(v)
else:
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
output[p] = v
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
output[p] = v
else:
output[p] = p_class.encode(v)
return output
def decode_client(self, data: dict) -> dict:
def decode(self, data: dict) -> dict:
"""[further] Decode and validate the JSON-Dict of the response body."""
output = {}
output_mandatory = self.output_mandatory
if 'header' in self.output_params:
# let's first do the header, it's special
if not 'header' in data:
raise ValueError('Mandatory output parameter "header" missing')
hdr_class = self.output_params.get('header')
output['header'] = hdr_class.decode(data['header'])
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
# different set of mandatory parameters applies.
header = data.get('header')
if header:
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
output_mandatory = self.output_mandatory_failed
for p in output_mandatory:
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
raise ApiError(output['header']['functionExecutionStatus'])
# we can only expect mandatory parameters to be present in case of successful execution
for p in self.output_mandatory:
if p == 'header':
continue
if not p in data:
raise ValueError('Mandatory output parameter "%s" missing' % p)
for p, v in data.items():
@@ -296,195 +231,35 @@ class JsonHttpApiFunction(abc.ABC):
output[p] = p_class.decode(v)
return output
def encode_server(self, data: dict) -> dict:
"""Validate an encode input dict into JSON-serializable dict for response body."""
output = {}
output_mandatory = self.output_mandatory
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
# different set of mandatory parameters applies.
header = data.get('header')
if header:
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
output_mandatory = self.output_mandatory_failed
for p in output_mandatory:
if not p in data:
raise ValueError('Mandatory output parameter %s missing' % p)
for p, v in data.items():
p_class = self.output_params.get(p)
if not p_class:
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
output[p] = v
else:
output[p] = p_class.encode(v)
return output
def decode_server(self, data: dict) -> dict:
"""[further] Decode and validate the JSON-Dict of the request body."""
output = {}
for p in self.input_mandatory:
if not p in data:
raise ValueError('Mandatory input parameter "%s" missing' % p)
for p, v in data.items():
p_class = self.input_params.get(p)
if not p_class:
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
output[p] = v
else:
output[p] = p_class.decode(v)
return output
def rewrite_url(self, data: dict, url: str) -> Tuple[dict, str]:
"""
Rewrite a static URL using information passed in the data dict. This method may be overloaded by a derived
class to allow fully dynamic URLs. The input parameters required for the URL rewriting may be passed using
data parameter. In case those parameters are additional parameters that are not intended to be passed to
the encode_client method later, they must be removed explcitly.
Args:
data: (see JsonHttpApiClient and JsonHttpApiServer)
url: statically generated URL string (see comment in JsonHttpApiClient)
"""
# This implementation is a placeholder in which we do not perform any URL rewriting. We just pass through data
# and url unmodified.
return data, url
class JsonHttpApiClient():
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
session: requests.Session):
"""
Args:
api_func : API function definition (JsonHttpApiFunction)
url_prefix : prefix to be put in front of the API function path (see JsonHttpApiFunction)
func_req_id : function requestor id to use for requests
session : session object (requests)
"""
self.api_func = api_func
self.url_prefix = url_prefix
self.func_req_id = func_req_id
self.session = session
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
"""
Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
json-serializable fields. `data` may also contain additional parameters required for URL rewriting (see
rewrite_url in class JsonHttpApiFunction). Output data is returned as json-deserialized dict.
Args:
data: Input data required to perform the request.
func_call_id: Function Call Identifier, if present a header field is generated automatically.
timeout: Maximum amount of time to wait for the request to complete.
"""
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
# field is checked by the encode_client method)
if func_call_id:
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
'functionCallIdentifier': func_call_id}} | data
# The URL used for the HTTP request (see below) normally consists of the initially given url_prefix
# concatenated with the path defined by the JsonHttpApiFunction definition. This static URL path may be
# rewritten by rewrite_url method defined in the JsonHttpApiFunction.
data, url = self.api_func.rewrite_url(data, self.url_prefix + self.api_func.path)
# Encode the message (the presence of mandatory fields is checked during encoding)
encoded = json.dumps(self.api_func.encode_client(data))
# Apply HTTP request headers according to SGP.22, section 6.5.1
"""Make an API call to the HTTP API endpoint represented by this object.
Input data is passed in `data` as json-serializable dict. Output data
is returned as json-deserialized dict."""
url = self.url_prefix + self.path
encoded = json.dumps(self.encode(data, func_call_id))
req_headers = {
'Content-Type': 'application/json',
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
}
req_headers.update(self.api_func.extra_http_req_headers)
req_headers.update(self.extra_http_req_headers)
# Perform HTTP request
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
response = self.session.request(self.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
logger.debug("HTTP RSP: %s" % (response.content))
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
# SGP.22, section 6.5.1)
if response.status_code != self.api_func.expected_http_status:
if response.status_code != self.expected_http_status:
raise HttpStatusError(response)
if response.content and not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
if not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
raise HttpHeaderError(response)
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
raise HttpHeaderError(response)
# Decode response and return the result back to the caller
if response.content:
output = self.api_func.decode_client(response.json())
# In case the response contains a header, check it to make sure that the API call was executed successfully
# (the presence of the header field is checked by the decode_client method)
if 'header' in output:
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
raise ApiError(output['header']['functionExecutionStatus'])
return output
if response.headers.get('Content-Type').startswith('application/json'):
return self.decode(response.json())
elif response.headers.get('Content-Type').startswith('text/plain;charset=UTF-8'):
return { 'data': response.content.decode('utf-8') }
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
return None
class JsonHttpApiServer():
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
"""
Args:
api_func : API function definition (JsonHttpApiFunction)
call_handler : handler function to process the request. This function must accept the
decoded request as a dictionary. The handler function must return a tuple consisting
of the response in the form of a dictionary (may be empty), and a function execution
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
"""
self.api_func = api_func
if call_handler:
self.call_handler = call_handler
else:
self.call_handler = self.default_handler
def default_handler(self, data: dict) -> (dict, str):
"""default handler, used in case no call handler is provided."""
logger.error("no handler function for request: %s" % str(data))
return {}, 'Failed'
def call(self, request: Request) -> str:
""" Process an incoming request.
Args:
request : request object as received using twisted.web.server
Returns:
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
provided the request object)
"""
# Make sure the request is done with the correct HTTP method
if (request.method.decode() != self.api_func.http_method):
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
# Decode the request
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
# Run call handler (see above)
data, fe_status = self.call_handler(decoded_request)
# In case a function execution status is returned, use it to generate and prepend the header field according to
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
if fe_status:
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
# Encode the message (the presence of mandatory fields is checked during encoding)
encoded = json.dumps(self.api_func.encode_server(data))
# Apply HTTP request headers according to SGP.22, section 6.5.1
res_headers = {
'Content-Type': 'application/json',
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
}
res_headers.update(self.api_func.extra_http_res_headers)
for header, value in res_headers.items():
request.setHeader(header, value)
request.setResponseCode(self.api_func.expected_http_status)
# Return the encoded result back to the caller for sending (using twisted/klein)
return encoded

View File

@@ -20,6 +20,9 @@
import copy
import pprint
import logging
import traceback
import inspect
from typing import List, Generator
from pySim.esim.saip.personalization import ConfigurableParameter
from pySim.esim.saip import param_source
@@ -27,6 +30,10 @@ from pySim.esim.saip import ProfileElementSequence, ProfileElementSD
from pySim.global_platform import KeyUsageQualifier
from osmocom.utils import b2h
logger = logging.getLogger(__name__)
def _func_():
return inspect.currentframe().f_back.f_code.co_name
class BatchPersonalization:
"""Produce a series of eSIM profiles from predefined parameters.
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
@@ -57,18 +64,15 @@ class BatchPersonalization:
"""
class ParamAndSrc:
"""tie a ConfigurableParameter to a source of actual values"""
'tie a ConfigurableParameter to a source of actual values'
def __init__(self, param: ConfigurableParameter, src: param_source.ParamSource):
if isinstance(param, type):
self.param_cls = param
else:
self.param_cls = param.__class__
self.param = param
self.src = src
def __init__(self,
n: int,
src_pes: ProfileElementSequence,
params: list[ParamAndSrc]=[],
params: list[ParamAndSrc]=None,
csv_rows: Generator=None,
):
"""
@@ -77,10 +81,10 @@ class BatchPersonalization:
copied.
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
profile values.
csv_rows: A generator (e.g. iter(list_of_rows)) producing all CSV rows one at a time, starting with a row
containing the column headers. This is compatible with the python csv.reader. Each row gets passed to
ParamSource.get_next(), such that ParamSource implementations can access the row items. See
param_source.CsvSource.
csv_rows: A list or generator producing all CSV rows one at a time, starting with a row containing the column
headers. This is compatible with the python csv.reader. Each row gets passed to
ParamSource.get_next(), such that ParamSource implementations can access the row items.
See param_source.CsvSource.
"""
self.n = n
self.params = params or []
@@ -88,7 +92,7 @@ class BatchPersonalization:
self.csv_rows = csv_rows
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
self.params.append(BatchPersonalization.ParamAndSrc(param, src))
self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src))
def generate_profiles(self):
# get first row of CSV: column names
@@ -115,10 +119,12 @@ class BatchPersonalization:
try:
input_value = p.src.get_next(csv_row=csv_row)
assert input_value is not None
value = p.param_cls.validate_val(input_value)
p.param_cls.apply_val(pes, value)
value = p.param.__class__.validate_val(input_value)
p.param.__class__.apply_val(pes, value)
except Exception as e:
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e
print(traceback.format_exc())
logger.error('during %s: %r', _func_(), e)
raise ValueError(f'{p.param.name} fed by {p.src.name}: {e!r}') from e
yield pes
@@ -132,7 +138,7 @@ class UppAudit(dict):
@classmethod
def from_der(cls, der: bytes, params: List, der_size=False, additional_sd_keys=False):
"""return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
'''return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
both 'IMSI' and 'IMSI-ACC' parameters.
@@ -154,7 +160,7 @@ class UppAudit(dict):
Scp80Kvn03. So we would not show kvn 0x04..0x0f in an audit. additional_sd_keys=True includes audits of all SD
key KVN there may be in the UPP. This helps to spot SD keys that may already be present in a UPP template, with
unexpected / unusual kvn.
"""
'''
# make an instance of this class
upp_audit = cls()
@@ -320,7 +326,7 @@ class BatchAudit(list):
return batch_audit
def to_csv_rows(self, headers=True, sort_key=None):
"""generator that yields all audits' values as rows, useful feed to a csv.writer."""
'''generator that yields all audits' values as rows, useful feed to a csv.writer.'''
columns = set()
for audit in self:
columns.update(audit.keys())

View File

@@ -37,10 +37,13 @@ class ParamSource:
name = "none"
numeric_base = None # or 10 or 16
def __init__(self, input_str:str):
"""Subclasses should call super().__init__(input_str) before evaluating self.input_str. Each subclass __init__()
may in turn manipulate self.input_str to apply expansions or decodings."""
self.input_str = input_str
@classmethod
def from_str(cls, s:str):
"""Subclasses implement this:
if a parameter source defines some string input magic, override this function.
For example, a RandomDigitSource derives the number of digits from the string length,
so the user can enter '0000' to get a four digit random number."""
return cls(s)
def get_next(self, csv_row:dict=None):
"""Subclasses implement this: return the next value from the parameter source.
@@ -48,81 +51,78 @@ class ParamSource:
This default implementation is an empty source."""
raise ParamSourceExhaustedExn()
@classmethod
def from_str(cls, input_str:str):
"""compatibility with earlier version of ParamSource. Just use the constructor."""
return cls(input_str)
class ConstantSource(ParamSource):
"""one value for all"""
name = "constant"
def __init__(self, val:str):
self.val = val
def get_next(self, csv_row:dict=None):
return self.input_str
return self.val
class InputExpandingParamSource(ParamSource):
def __init__(self, input_str:str):
super().__init__(input_str)
self.input_str = self.expand_input_str(self.input_str)
@classmethod
def expand_input_str(cls, input_str:str):
def expand_str(cls, s:str):
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
if "*" not in input_str:
return input_str
# re: "XX * 123" with optional spaces
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", input_str)
if "*" not in s:
return s
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
if len(tokens) < 3:
return input_str
return s
parts = []
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
parts.append(unchanged)
repeat = int(repeat_str)
parts.append(snippet * repeat)
return "".join(parts)
@classmethod
def from_str(cls, s:str):
return cls(cls.expand_str(s))
class DecimalRangeSource(InputExpandingParamSource):
"""abstract: decimal numbers with a value range"""
numeric_base = 10
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
"""Constructor to set up values from a (user entered) string: DecimalRangeSource(input_str).
Constructor to set up values directly: DecimalRangeSource(num_digits=3, first_value=123, last_value=456)
num_digits produces leading zeros when first_value..last_value are shorter.
def __init__(self, num_digits, first_value, last_value):
"""
assert ((input_str is not None and (num_digits, first_value, last_value) == (None, None, None))
or (input_str is None and None not in (num_digits, first_value, last_value)))
if input_str is not None:
super().__init__(input_str)
input_str = self.input_str
if ".." in input_str:
first_str, last_str = input_str.split('..')
first_str = first_str.strip()
last_str = last_str.strip()
else:
first_str = input_str.strip()
last_str = None
num_digits = len(first_str)
first_value = int(first_str)
last_value = int(last_str if last_str is not None else "9" * num_digits)
See also from_str().
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
num_digits: fixed number of digits (possibly with leading zeros) to generate.
first_value, last_value: the decimal range in which to provide digits.
"""
num_digits = int(num_digits)
first_value = int(first_value)
last_value = int(last_value)
assert num_digits > 0
assert first_value <= last_value
self.num_digits = num_digits
self.first_value = first_value
self.last_value = last_value
self.val_first_last = (first_value, last_value)
def val_to_digit(self, val:int):
return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string
@classmethod
def from_str(cls, s:str):
s = cls.expand_str(s)
if ".." in s:
first_str, last_str = s.split('..')
first_str = first_str.strip()
last_str = last_str.strip()
else:
first_str = s.strip()
last_str = None
first_value = int(first_str)
last_value = int(last_str) if last_str is not None else "9" * len(first_str)
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
class RandomSourceMixin:
random_impl = secrets.SystemRandom()
@@ -135,7 +135,7 @@ class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
# try to generate random digits that are always different from previously produced random bytes
attempts = 10
while True:
val = self.random_impl.randint(self.first_value, self.last_value)
val = self.random_impl.randint(*self.val_first_last)
if val in RandomDigitSource.used_keys:
attempts -= 1
if attempts:
@@ -150,11 +150,9 @@ class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
numeric_base = 16
used_keys = set()
def __init__(self, input_str:str):
super().__init__(input_str)
input_str = self.input_str
num_digits = len(input_str.strip())
def __init__(self, num_digits):
"""see from_str()"""
num_digits = int(num_digits)
if num_digits < 1:
raise ValueError("zero number of digits")
# hex digits always come in two
@@ -176,20 +174,23 @@ class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
return b2h(val)
@classmethod
def from_str(cls, s:str):
s = cls.expand_str(s)
return cls(num_digits=len(s.strip()))
class IncDigitSource(DecimalRangeSource):
"""incrementing sequence of digits"""
name = "incrementing decimal digits"
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
"""input_str: the first value to return, a string of an integer number with optional leading zero digits. The
leading zero digits are preserved."""
super().__init__(input_str, num_digits, first_value, last_value)
def __init__(self, num_digits, first_value, last_value):
super().__init__(num_digits, first_value, last_value)
self.next_val = None
self.reset()
def reset(self):
"""Restart from the first value of the defined range passed to __init__()."""
self.next_val = self.first_value
self.next_val = self.val_first_last[0]
def get_next(self, csv_row:dict=None):
val = self.next_val
@@ -199,7 +200,7 @@ class IncDigitSource(DecimalRangeSource):
returnval = self.val_to_digit(val)
val += 1
if val > self.last_value:
if val > self.val_first_last[1]:
self.next_val = None
else:
self.next_val = val
@@ -210,15 +211,13 @@ class CsvSource(ParamSource):
"""apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)"""
name = "from CSV"
def __init__(self, input_str:str):
"""self.csv_column = input_str:
column name indicating the column to use for this parameter.
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
CsvSource picks the column with the name matching csv_column.
def __init__(self, csv_column):
"""
"""Parse input_str into self.num_digits, self.first_value, self.last_value."""
super().__init__(input_str)
self.csv_column = self.input_str
csv_column: column name indicating the column to use for this parameter.
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
CsvSource picks the column with the name matching csv_column.
"""
self.csv_column = csv_column
def get_next(self, csv_row:dict=None):
val = None

View File

@@ -20,13 +20,14 @@ import io
import os
import re
import pprint
import json
from typing import List, Tuple, Generator, Optional
from construct.core import StreamError
from osmocom.tlv import camel_to_snake
from osmocom.utils import hexstr
from pySim.utils import enc_iccid, dec_iccid, enc_imsi, dec_imsi, h2b, b2h, rpad, sanitize_iccid
from pySim.ts_31_102 import EF_AD
from pySim.ts_31_102 import EF_AD, EF_UST, EF_Routing_Indicator, EF_SUCI_Calc_Info
from pySim.ts_51_011 import EF_SMSP
from pySim.esim.saip import param_source
from pySim.esim.saip import ProfileElement, ProfileElementSD, ProfileElementSequence
@@ -291,7 +292,9 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
May be overridden by subclasses.
This default implementation returns the maximum allowed value length -- a good fit for most subclasses.
'''
return cls.get_len_range()[1] or 16
l = cls.get_len_range()[1] or 16
l = min(10*80, l)
return l
@classmethod
def is_super_of(cls, other_class):
@@ -330,7 +333,6 @@ class DecimalHexParam(DecimalParam):
@classmethod
def validate_val(cls, val):
val = super().validate_val(val)
assert isinstance(val, str)
val = ''.join('%02x' % ord(x) for x in val)
if cls.rpad is not None:
c = cls.rpad_char
@@ -340,7 +342,7 @@ class DecimalHexParam(DecimalParam):
@classmethod
def decimal_hex_to_str(cls, val):
"""useful for get_values_from_pes() implementations of subclasses"""
'useful for get_values_from_pes() implementations of subclasses'
if isinstance(val, bytes):
val = b2h(val)
assert isinstance(val, hexstr)
@@ -634,13 +636,14 @@ class SmspTpScAddr(ConfigurableParameter):
# - To generate the right amount of fillFileContent, pass total_len=42 to encode_record_bin().
# - To show the right size in the PES, set f_smsp.rec_len = 42
ef_smsp_dec['alpha_id'] = ''
f_smsp.rec_len = 42
# we can set this to choose a fixed length:
#f_smsp.rec_len = 42
# but leave rec_len unchanged to keep the same length as was found in the eSIM template.
# re-encode into the File body.
#
#print("SMSP (new): %s" % f_smsp.body)
# re-generate the pe.decoded member from the File instance
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1, total_len=f_smsp.rec_len)
# re-generate the pe.decoded member from the File instance
pe.file2pe(f_smsp)
@classmethod
@@ -692,8 +695,10 @@ class MncLen(ConfigurableParameter):
for pe in pes.get_pes_for_type('usim'):
if not hasattr(pe, 'files'):
continue
f_ad = pe.files.get('ef-ad')
if not f_ad:
continue
# decode existing values
f_ad = pe.files['ef-ad']
if not f_ad.body:
continue
try:
@@ -711,25 +716,24 @@ class MncLen(ConfigurableParameter):
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for naa in ('isim',):# 'isim', 'csim'):
for pe in pes.get_pes_for_type(naa):
if not hasattr(pe, 'files'):
continue
f_ad = pe.files.get('ef-ad', None)
if f_ad is None:
continue
for pe in pes.get_pes_for_type('usim'):
if not hasattr(pe, 'files'):
continue
f_ad = pe.files.get('ef-ad', None)
if f_ad is None:
continue
try:
ef_ad = EF_AD()
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
except StreamError:
continue
try:
ef_ad = EF_AD()
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
except StreamError:
continue
mnc_len = ef_ad_dec.get('mnc_len', None)
if mnc_len is None:
continue
mnc_len = ef_ad_dec.get('mnc_len', None)
if mnc_len is None:
continue
yield { cls.name: str(mnc_len) }
yield { cls.name: str(mnc_len) }
class SdKey(BinaryParam):
@@ -1193,3 +1197,184 @@ class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
max_val = 255
example_input = '1'
default_source = param_source.ConstantSource
class EfUstServiceParam(EnumParam):
"""superclass for EF-UST service flag parameters"""
service_idx = 0
value_map = { 'enabled': True, 'disabled': False }
default_source = param_source.ConstantSource
example_input = sorted(value_map.keys())[0]
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
for pe in pes.get_pes_for_type('usim'):
f_ust = pe.files['ef-ust']
ef_ust = EF_UST()
ust = ef_ust.decode_bin(f_ust.body)
ust[cls.service_idx]['activated'] = val
f_ust.body = ef_ust.encode_bin(ust)
pe.file2pe(f_ust)
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('usim'):
f_ust = pe.files.get('ef-ust', None)
if not f_ust:
continue
ef_ust = EF_UST()
try:
ust = ef_ust.decode_bin(f_ust.body)
service_flag = ust[cls.service_idx]['activated']
yield { cls.name: cls.map_val_to_name(service_flag) }
except:
pass
class SuciActive(EfUstServiceParam):
"""EF-UST service nr 124: enable or disable the SUCI service."""
service_idx = 124
name = '5G-SUCI-active'
value_map = { 'SUCI-on': True, 'SUCI-off': False }
example_input = sorted(value_map.keys())[0]
class SuciInUsim(EfUstServiceParam):
"""EF-UST service nr 125: calculate SUCI in UE or in USIM"""
service_idx = 125
name = '5G-SUCI-in-USIM'
value_map = { 'SUCI-in-UE': False, 'SUCI-in-USIM': True }
example_input = sorted(value_map.keys())[0]
class SuciRi(ConfigurableParameter):
"""SUCI Routing Indicator as in section 4.4.11.11 of 3GPP TS 31.102"""
name = '5G-SUCI-RI'
allow_chars = '0123456789'
min_len = 1
max_len = 4
allow_types = (str,)
example_input = '0'
default_source = param_source.ConstantSource
KEY_RI = "routing_indicator"
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
for pe in pes.get_pes_for_type('df-5gs'):
f_ri = pe.files.get('ef-routing-indicator', None)
if f_ri is None:
continue
ef_ri = EF_Routing_Indicator()
ri = ef_ri.decode_bin(f_ri.body)
ri[cls.KEY_RI] = str(val)
f_ri.body = ef_ri.encode_bin(ri)
pe.file2pe(f_ri)
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('df-5gs'):
f_ri = pe.files.get('ef-routing-indicator', None)
if f_ri is None:
continue
ef_ri = EF_Routing_Indicator()
try:
ri = ef_ri.decode_bin(f_ri.body)
yield { cls.name: ri.get(cls.KEY_RI) }
except:
pass
class SuciCalcInfo(ConfigurableParameter):
"""SUCI Calculation Information as in section 4.4.11.8 of 3GPP TS 31.102"""
name = '5G-SUCI-CalcInfo'
example_input = '{}'
default_source = param_source.ConstantSource
allow_types = (str,)
max_len = 2000
@classmethod
def validate_val(cls, val):
val = super().validate_val(val)
if not val:
raise ValueError("SUCI Calc Info value is empty -- should at least be an empty dict like '{}'")
# check that it is a dict something like
# {
# "prot_scheme_id_list": [
# {"priority": 0, "identifier": 2, "key_index": 1},
# {"priority": 1, "identifier": 1, "key_index": 2},
# ],
# "hnet_pubkey_list": [
# {"hnet_pubkey_identifier": 27,
# "hnet_pubkey": "0472DA71976234CE833A6907425867B82E074D44EF907DFB4B3E21C1C2256EBCD15A7DED52FCBB097A4ED250E036C7B9C8C7004C4EEDC4F068CD7BF8D3F900E3B4"},
# {"hnet_pubkey_identifier": 30,
# "hnet_pubkey": "5A8D38864820197C3394B92613B20B91633CBD897119273BF8E4A6F4EEC0A650"},
# ],
# }
try:
d = json.loads(val)
except json.decoder.JSONDecodeError as e:
raise ValueError(f"Cannot parse SUCI Calc Info: {e}") from e
KEY_PSI_LIST = 'prot_scheme_id_list'
KEY_HPK_LIST = 'hnet_pubkey_list'
KEYS_D = set((KEY_HPK_LIST, KEY_PSI_LIST))
KEYS_PSI = set(('identifier', 'key_index', 'priority'))
KEYS_HPK = set(('hnet_pubkey_identifier', 'hnet_pubkey'))
if not (isinstance(d, dict)
and set(d.keys()) == KEYS_D):
raise ValueError(f"Unexpected structure in SUCI Calc Info: expected dict with entries {KEYS_D}")
psi = d.get(KEY_PSI_LIST, None)
if not all((set(e.keys()) == KEYS_PSI) for e in psi):
raise ValueError("Unexpected structure in SUCI Calc Info:"
f" in {KEY_PSI_LIST}, expected dict with entries {KEYS_PSI}")
hpk = d.get(KEY_HPK_LIST, None)
if not all((set(e.keys()) == KEYS_HPK) for e in hpk):
raise ValueError("Unexpected structure in SUCI Calc Info:"
f" in {KEY_HPK_LIST}, expected dict with entries {KEYS_HPK}")
return d
@classmethod
def _apply_suci(cls, pes: ProfileElementSequence, val, pe_type="df-5gs", pe_file="ef-suci-calc-info"):
for pe in pes.get_pes_for_type(pe_type):
f_sucici = pe.files.get(pe_file, None)
if not f_sucici:
continue
ef_sucici = EF_SUCI_Calc_Info()
f_sucici.body = ef_sucici.encode_bin(val)
pe.file2pe(f_sucici)
@classmethod
def apply_val(cls, pes: ProfileElementSequence, val):
cls._apply_suci(pes, val, "df-5gs", "ef-suci-calc-info")
cls._apply_suci(pes, val, "df-saip", "ef-suci-calc-info-usim")
@classmethod
def _get_suci(cls, pes: ProfileElementSequence, pe_type="df-5gs", pe_file="ef-suci-calc-info"):
for pe in pes.get_pes_for_type(pe_type):
f_sucici = pe.files.get(pe_file, None)
if not f_sucici:
continue
ef_sucici = EF_SUCI_Calc_Info()
sucici = ef_sucici.decode_bin(f_sucici.body)
# normalize to string (bytes cannot go into json)
for hnet_pubkey in sucici.get('hnet_pubkey_list', ()):
val = hnet_pubkey['hnet_pubkey']
if isinstance(val, bytes):
val = b2h(val)
hnet_pubkey['hnet_pubkey'] = val
yield { cls.name: json.dumps(sucici) }
@classmethod
def get_values_from_pes(cls, pes: ProfileElementSequence):
yield from cls._get_suci(pes, "df-5gs", "ef-suci-calc-info")
yield from cls._get_suci(pes, "df-saip", "ef-suci-calc-info-usim")

View File

@@ -327,7 +327,7 @@ class EF_SUCI_Calc_Info(TransparentEF):
"""conversion method to generate list of {hnet_pubkey_identifier, hnet_pubkey} dicts
from flat [{hnet_pubkey_identifier: }, {net_pubkey: }, ...] list"""
out = []
while len(l):
while l:
a = l.pop(0)
b = l.pop(0)
z = {**a, **b}

View File

@@ -21,6 +21,7 @@ import io
import sys
import unittest
import io
import json
from importlib import resources
from osmocom.utils import hexstr
from pySim.esim.saip import ProfileElementSequence
@@ -60,11 +61,15 @@ class ConfigurableParameterTest(unittest.TestCase):
)
class Paramtest:
def __init__(self, param_cls, val, expect_val, expect_clean_val=None):
iff_present_default = False
def __init__(self, param_cls, val, expect_val, expect_clean_val=None, iff_present=None):
self.param_cls = param_cls
self.val = val
self.expect_clean_val = expect_clean_val
self.expect_val = expect_val
if iff_present is None:
iff_present = Paramtest.iff_present_default
self.iff_present = iff_present
param_tests = [
Paramtest(param_cls=p13n.Imsi, val='123456',
@@ -267,7 +272,65 @@ class ConfigurableParameterTest(unittest.TestCase):
'11111111111111111111111111111111'
'22222222222222222222222222222222'),
]
Paramtest(param_cls=p13n.MncLen,
val='2',
expect_clean_val=2,
expect_val='2'),
Paramtest(param_cls=p13n.MncLen,
val=3,
expect_clean_val=3,
expect_val='3'),
]
Paramtest.iff_present_default = True
sucici = {
"prot_scheme_id_list": [
{"priority": 0, "identifier": 2, "key_index": 1},
{"priority": 1, "identifier": 1, "key_index": 2},
],
"hnet_pubkey_list": [
{"hnet_pubkey_identifier": 27,
"hnet_pubkey": "0472da71976234ce833a6907425867b82e074d44ef907dfb4b3e21c1c2256ebcd15a7ded52fcbb097a4ed250e036c7b9c8c7004c4eedc4f068cd7bf8d3f900e3b4"},
{"hnet_pubkey_identifier": 30,
"hnet_pubkey": "5a8d38864820197c3394b92613b20b91633cbd897119273bf8e4a6f4eec0a650"},
],
}
param_tests.extend([
Paramtest(param_cls=p13n.SuciActive, val='SUCI-on',
expect_clean_val=True,
expect_val={'5G-SUCI-active': 'SUCI-on'}),
Paramtest(param_cls=p13n.SuciActive, val='SUCI-off',
expect_clean_val=False,
expect_val={'5G-SUCI-active': 'SUCI-off'}),
Paramtest(param_cls=p13n.SuciInUsim, val='SUCI-in-UE',
expect_clean_val=False,
expect_val={'5G-SUCI-in-USIM': 'SUCI-in-UE'}),
Paramtest(param_cls=p13n.SuciInUsim, val='SUCI-in-USIM',
expect_clean_val=True,
expect_val={'5G-SUCI-in-USIM': 'SUCI-in-USIM'}),
Paramtest(param_cls=p13n.SuciRi, val='123',
expect_clean_val='123',
expect_val={'5G-SUCI-RI': '123'}),
Paramtest(param_cls=p13n.SuciRi, val='0',
expect_clean_val='0',
expect_val={'5G-SUCI-RI': '0'}),
Paramtest(param_cls=p13n.SuciRi, val='9999',
expect_clean_val='9999',
expect_val={'5G-SUCI-RI': '9999'}),
Paramtest(param_cls=p13n.SuciCalcInfo,
val=json.dumps(sucici),
expect_clean_val=sucici,
expect_val={'5G-SUCI-CalcInfo': json.dumps(sucici)}),
])
Paramtest.iff_present_default = False
for sdkey_cls in (
# thin out the number of tests, as a compromise between completeness and test runtime
@@ -363,7 +426,8 @@ class ConfigurableParameterTest(unittest.TestCase):
for t in param_tests:
test_idx += 1
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
testlog = []
testlog.append(f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})')
param = None
try:
@@ -371,21 +435,32 @@ class ConfigurableParameterTest(unittest.TestCase):
param.input_value = t.val
param.validate()
except ValueError as e:
raise ValueError(f'{logloc}: {e}') from e
raise ValueError(f'{" ".join(testlog)}: {e}') from e
clean_val = param.value
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
testlog.append(f'clean_val={valtypestr(clean_val)}')
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
raise ValueError(f'{logloc}: expected'
raise ValueError(f'{" ".join(testlog)}: expected'
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
# pes = copy.deepcopy(orig_pes)
pes = ProfileElementSequence.from_der(der)
found = list((t.param_cls.get_value_from_pes(pes) or {}).values())
testlog.append(f"previous value: {found}")
if t.iff_present and not found:
testlog.append("skipping, param not in template.")
output = "\nskip: " + "\n ".join(testlog)
outputs.append(output)
print(output)
continue
try:
param.apply(pes)
except ValueError as e:
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
raise ValueError(f'{" ".join(testlog)} apply_val(clean_val): {e}') from e
changed_der = pes.to_der()
@@ -403,22 +478,18 @@ class ConfigurableParameterTest(unittest.TestCase):
else:
read_back_val_type = f'{type(read_back_val).__name__}'
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
testlog.append(f'read_back_val={valtypestr(read_back_val)}')
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
raise ValueError(f'{" ".join(testlog)}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
expect_val = t.expect_val
if not isinstance(expect_val, dict):
expect_val = { t.param_cls.get_name(): expect_val }
if read_back_val != expect_val:
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
raise ValueError(f'{" ".join(testlog)}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
ok = logloc.replace(' clean_val', '\n\tclean_val'
).replace(' read_back_val', '\n\tread_back_val'
).replace('=', '=\t'
)
output = f'\nok: {ok}'
output = "\nok: " + "\n ".join(testlog)
outputs.append(output)
print(output)

File diff suppressed because it is too large Load Diff