mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-16 18:38:32 +03:00
Compare commits
41 Commits
pmaier/fix
...
neels/saip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4b5841f78 | ||
|
|
efc3f06f38 | ||
|
|
7d73ebf641 | ||
|
|
462346bb71 | ||
|
|
0c33330056 | ||
|
|
aa2eaee266 | ||
|
|
03e17a7366 | ||
|
|
59bec08eae | ||
|
|
5c34d36b48 | ||
|
|
5c74d5b4b3 | ||
|
|
6b2e5645b3 | ||
|
|
f9b581a985 | ||
|
|
789734d8a5 | ||
|
|
d364174ca0 | ||
|
|
b52d3ab9ed | ||
|
|
49d4ba38e0 | ||
|
|
8cc7421faa | ||
|
|
44dbbe5509 | ||
|
|
8b1db4f0d2 | ||
|
|
ae8465532c | ||
|
|
0f00bd14d8 | ||
|
|
ff7c252d63 | ||
|
|
f8aa78d370 | ||
|
|
ef995383a6 | ||
|
|
b39f69ba89 | ||
|
|
3bee435c2a | ||
|
|
9e33ae0486 | ||
|
|
913e8d12d4 | ||
|
|
346e2aeb01 | ||
|
|
b71fa5307e | ||
|
|
e065cd50ed | ||
|
|
5eec7b7478 | ||
|
|
9bdf1e3dee | ||
|
|
d5e550dcc4 | ||
|
|
8fb451b732 | ||
|
|
eca2fd39c0 | ||
|
|
2c9374ac53 | ||
|
|
c9c5bd2470 | ||
|
|
9bc795e7a1 | ||
|
|
7886e59f82 | ||
|
|
a65886ca3f |
@@ -16,12 +16,6 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import requests
|
||||
from klein import Klein
|
||||
from twisted.internet import defer, protocol, ssl, task, endpoints, reactor
|
||||
from twisted.internet.posixbase import PosixReactorBase
|
||||
from pathlib import Path
|
||||
from twisted.web.server import Site, Request
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import time
|
||||
@@ -129,12 +123,10 @@ class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||
class DownloadOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/downloadOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType
|
||||
}
|
||||
input_mandatory = ['header']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'iccid': param.Iccid,
|
||||
@@ -145,7 +137,6 @@ class DownloadOrder(Es2PlusApiFunction):
|
||||
class ConfirmOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/confirmOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
@@ -153,7 +144,7 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
'smdsAddress': param.SmdsAddress,
|
||||
'releaseFlag': param.ReleaseFlag,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'releaseFlag']
|
||||
input_mandatory = ['iccid', 'releaseFlag']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'eid': param.Eid,
|
||||
@@ -166,13 +157,12 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
class CancelOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/cancelOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
|
||||
}
|
||||
input_mandatory = ['header', 'finalProfileStatusIndicator', 'iccid']
|
||||
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -182,10 +172,9 @@ class CancelOrder(Es2PlusApiFunction):
|
||||
class ReleaseProfile(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/releaseProfile'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid']
|
||||
input_mandatory = ['iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -195,7 +184,6 @@ class ReleaseProfile(Es2PlusApiFunction):
|
||||
class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType,
|
||||
@@ -204,9 +192,10 @@ class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
'notificationPointStatus': param.NotificationPointStatus,
|
||||
'resultData': param.ResultData,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
expected_http_status = 204
|
||||
|
||||
|
||||
class Es2pApiClient:
|
||||
"""Main class representing a full ES2+ API client. Has one method for each API function."""
|
||||
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
|
||||
@@ -217,17 +206,18 @@ class Es2pApiClient:
|
||||
if client_cert:
|
||||
self.session.cert = client_cert
|
||||
|
||||
self.downloadOrder = JsonHttpApiClient(DownloadOrder(), url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = JsonHttpApiClient(ConfirmOrder(), url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = JsonHttpApiClient(CancelOrder(), url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = JsonHttpApiClient(ReleaseProfile(), url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = JsonHttpApiClient(HandleDownloadProgressInfo(), url_prefix, func_req_id, self.session)
|
||||
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
|
||||
|
||||
def _gen_func_id(self) -> str:
|
||||
"""Generate the next function call id."""
|
||||
self.func_id += 1
|
||||
return 'FCI-%u-%u' % (time.time(), self.func_id)
|
||||
|
||||
|
||||
def call_downloadOrder(self, data: dict) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(data, self._gen_func_id())
|
||||
@@ -247,116 +237,3 @@ class Es2pApiClient:
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())
|
||||
|
||||
class Es2pApiServerHandlerSmdpp(abc.ABC):
|
||||
"""ES2+ (SMDP+ side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_downloadOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_confirmOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_cancelOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_releaseProfile(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServerHandlerMno(abc.ABC):
|
||||
"""ES2+ (MNO side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServer(abc.ABC):
|
||||
"""Main class representing a full ES2+ API server. Has one method for each API function."""
|
||||
app = None
|
||||
|
||||
def __init__(self, port: int, interface: str, server_cert: str = None, client_cert_verify: str = None):
|
||||
logger.debug("HTTP SRV: starting ES2+ API server on %s:%s" % (interface, port))
|
||||
self.port = port
|
||||
self.interface = interface
|
||||
if server_cert:
|
||||
self.server_cert = ssl.PrivateCertificate.loadPEM(Path(server_cert).read_text())
|
||||
else:
|
||||
self.server_cert = None
|
||||
if client_cert_verify:
|
||||
self.client_cert_verify = ssl.Certificate.loadPEM(Path(client_cert_verify).read_text())
|
||||
else:
|
||||
self.client_cert_verify = None
|
||||
|
||||
def reactor(self, reactor: PosixReactorBase):
|
||||
logger.debug("HTTP SRV: listen on %s:%s" % (self.interface, self.port))
|
||||
if self.server_cert:
|
||||
if self.client_cert_verify:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(self.client_cert_verify),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenTCP(self.port, Site(self.app.resource()), interface=self.interface)
|
||||
return defer.Deferred()
|
||||
|
||||
class Es2pApiServerSmdpp(Es2pApiServer):
|
||||
"""ES2+ (SMDP+ side) API Server."""
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerSmdpp,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.downloadOrder = JsonHttpApiServer(DownloadOrder(), handler.call_downloadOrder)
|
||||
self.confirmOrder = JsonHttpApiServer(ConfirmOrder(), handler.call_confirmOrder)
|
||||
self.cancelOrder = JsonHttpApiServer(CancelOrder(), handler.call_cancelOrder)
|
||||
self.releaseProfile = JsonHttpApiServer(ReleaseProfile(), handler.call_releaseProfile)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(DownloadOrder.path)
|
||||
def call_downloadOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(request)
|
||||
|
||||
@app.route(ConfirmOrder.path)
|
||||
def call_confirmOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
return self.confirmOrder.call(request)
|
||||
|
||||
@app.route(CancelOrder.path)
|
||||
def call_cancelOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
return self.cancelOrder.call(request)
|
||||
|
||||
@app.route(ReleaseProfile.path)
|
||||
def call_releaseProfile(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
return self.releaseProfile.call(request)
|
||||
|
||||
class Es2pApiServerMno(Es2pApiServer):
|
||||
"""ES2+ (MNO side) API Server."""
|
||||
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerMno,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.handleDownloadProgressInfo = JsonHttpApiServer(HandleDownloadProgressInfo(),
|
||||
handler.call_handleDownloadProgressInfo)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(HandleDownloadProgressInfo.path)
|
||||
def call_handleDownloadProgressInfo(self, request: Request) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(request)
|
||||
|
||||
@@ -155,11 +155,11 @@ class Es9pApiClient:
|
||||
if server_cert_verify:
|
||||
self.session.verify = server_cert_verify
|
||||
|
||||
self.initiateAuthentication = JsonHttpApiClient(InitiateAuthentication(), url_prefix, '', self.session)
|
||||
self.authenticateClient = JsonHttpApiClient(AuthenticateClient(), url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = JsonHttpApiClient(GetBoundProfilePackage(), url_prefix, '', self.session)
|
||||
self.handleNotification = JsonHttpApiClient(HandleNotification(), url_prefix, '', self.session)
|
||||
self.cancelSession = JsonHttpApiClient(CancelSession(), url_prefix, '', self.session)
|
||||
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
|
||||
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
|
||||
self.handleNotification = HandleNotification(url_prefix, '', self.session)
|
||||
self.cancelSession = CancelSession(url_prefix, '', self.session)
|
||||
|
||||
def call_initiateAuthentication(self, data: dict) -> dict:
|
||||
return self.initiateAuthentication.call(data)
|
||||
|
||||
@@ -21,8 +21,6 @@ import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
import base64
|
||||
from twisted.web.server import Request
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
@@ -133,16 +131,6 @@ class JsonResponseHeader(ApiParam):
|
||||
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
|
||||
raise ValueError('Unknown/unspecified status "%s"' % status)
|
||||
|
||||
class JsonRequestHeader(ApiParam):
|
||||
"""SGP.22 section 6.5.1.3."""
|
||||
@classmethod
|
||||
def verify_decoded(cls, data):
|
||||
func_req_id = data.get('functionRequesterIdentifier')
|
||||
if not func_req_id:
|
||||
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
|
||||
func_call_id = data.get('functionCallIdentifier')
|
||||
if not func_call_id:
|
||||
raise ValueError('Missing mandatory functionCallIdentifier in header')
|
||||
|
||||
class HttpStatusError(Exception):
|
||||
pass
|
||||
@@ -173,118 +161,65 @@ class ApiError(Exception):
|
||||
|
||||
class JsonHttpApiFunction(abc.ABC):
|
||||
"""Base class for representing an HTTP[s] API Function."""
|
||||
# The below class variables are used to describe the properties of the API function. Derived classes are expected
|
||||
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
|
||||
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
|
||||
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
|
||||
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
|
||||
# prefix.
|
||||
# the below class variables are expected to be overridden in derived classes
|
||||
|
||||
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder')
|
||||
path = None
|
||||
|
||||
# dictionary of input parameters. key is parameter name, value is ApiParam class
|
||||
input_params = {}
|
||||
|
||||
# list of mandatory input parameters
|
||||
input_mandatory = []
|
||||
|
||||
# dictionary of output parameters. key is parameter name, value is ApiParam class
|
||||
output_params = {}
|
||||
|
||||
# list of mandatory output parameters (for successful response)
|
||||
output_mandatory = []
|
||||
|
||||
# list of mandatory output parameters (for failed response)
|
||||
output_mandatory_failed = []
|
||||
|
||||
# expected HTTP status code of the response
|
||||
expected_http_status = 200
|
||||
|
||||
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
|
||||
http_method = 'POST'
|
||||
|
||||
# additional custom HTTP headers (client requests)
|
||||
extra_http_req_headers = {}
|
||||
|
||||
# additional custom HTTP headers (server responses)
|
||||
extra_http_res_headers = {}
|
||||
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def __new__(cls, *args, role = 'legacy_client', **kwargs):
|
||||
"""
|
||||
Args:
|
||||
args: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
role: role ('server' or 'client') in which the JsonHttpApiFunction should be created.
|
||||
kwargs: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
"""
|
||||
|
||||
# Create a dictionary with the class attributes of this class (the properties listed above and the encode_
|
||||
# decode_ methods below). The dictionary will not include any dunder/magic methods
|
||||
cls_attr = {attr_name: getattr(cls, attr_name) for attr_name in dir(cls) if not attr_name.startswith('__')}
|
||||
|
||||
# Normal instantiation as JsonHttpApiFunction:
|
||||
if len(args) == 0 and len(kwargs) == 0:
|
||||
return type(cls.__name__, (abc.ABC,), cls_attr)()
|
||||
|
||||
# Instantiation as as JsonHttpApiFunction with a JsonHttpApiClient or JsonHttpApiServer base
|
||||
if role == 'legacy_client':
|
||||
# Deprecated: With the advent of the server role (JsonHttpApiServer) the API had to be changed. To maintain
|
||||
# compatibility with existing code (out-of-tree) the original behaviour and API interface and behaviour had
|
||||
# to be preserved. Already existing JsonHttpApiFunction definitions will still work and the related objects
|
||||
# may still be created on the original way: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session)
|
||||
logger.warning('implicit role (falling back to legacy JsonHttpApiClient) is deprecated, please specify role explcitly')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
result.legacy = True
|
||||
return result
|
||||
elif role == 'client':
|
||||
# Create a JsonHttpApiFunction in client role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='client')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
elif role == 'server':
|
||||
# Create a JsonHttpApiFunction in server role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='server')
|
||||
result = type(cls.__name__, (JsonHttpApiServer,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
else:
|
||||
raise ValueError('Invalid role \'%s\' specified' % role)
|
||||
|
||||
def encode_client(self, data: dict) -> dict:
|
||||
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for request body."""
|
||||
output = {}
|
||||
if func_call_id:
|
||||
output['header'] = {
|
||||
'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id
|
||||
}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
# pySim/esim/http_json_api.py:269:47: E1101: Instance of 'JsonHttpApiFunction' has no 'legacy' member (no-member)
|
||||
# pylint: disable=no-member
|
||||
if hasattr(self, 'legacy') and self.legacy:
|
||||
output[p] = JsonRequestHeader.encode(v)
|
||||
else:
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_client(self, data: dict) -> dict:
|
||||
def decode(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
if 'header' in self.output_params:
|
||||
# let's first do the header, it's special
|
||||
if not 'header' in data:
|
||||
raise ValueError('Mandatory output parameter "header" missing')
|
||||
hdr_class = self.output_params.get('header')
|
||||
output['header'] = hdr_class.decode(data['header'])
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
# we can only expect mandatory parameters to be present in case of successful execution
|
||||
for p in self.output_mandatory:
|
||||
if p == 'header':
|
||||
continue
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
@@ -296,167 +231,35 @@ class JsonHttpApiFunction(abc.ABC):
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
def encode_server(self, data: dict) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.output_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_server(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the request body."""
|
||||
output = {}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
class JsonHttpApiClient():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
|
||||
session: requests.Session):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
url_prefix : prefix to be put in front of the API function path (see JsonHttpApiFunction)
|
||||
func_req_id : function requestor id to use for requests
|
||||
session : session object (requests)
|
||||
"""
|
||||
self.api_func = api_func
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
|
||||
"""Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
|
||||
json-serializable dict. Output data is returned as json-deserialized dict."""
|
||||
|
||||
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
|
||||
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
|
||||
# field is checked by the encode_client method)
|
||||
if func_call_id:
|
||||
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_client(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
"""Make an API call to the HTTP API endpoint represented by this object.
|
||||
Input data is passed in `data` as json-serializable dict. Output data
|
||||
is returned as json-deserialized dict."""
|
||||
url = self.url_prefix + self.path
|
||||
encoded = json.dumps(self.encode(data, func_call_id))
|
||||
req_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
req_headers.update(self.api_func.extra_http_req_headers)
|
||||
req_headers.update(self.extra_http_req_headers)
|
||||
|
||||
# Perform HTTP request
|
||||
url = self.url_prefix + self.api_func.path
|
||||
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
|
||||
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
response = self.session.request(self.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
|
||||
logger.debug("HTTP RSP: %s" % (response.content))
|
||||
|
||||
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
|
||||
# SGP.22, section 6.5.1)
|
||||
if response.status_code != self.api_func.expected_http_status:
|
||||
if response.status_code != self.expected_http_status:
|
||||
raise HttpStatusError(response)
|
||||
if response.content and not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
if not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
raise HttpHeaderError(response)
|
||||
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
|
||||
raise HttpHeaderError(response)
|
||||
|
||||
# Decode response and return the result back to the caller
|
||||
if response.content:
|
||||
output = self.api_func.decode_client(response.json())
|
||||
# In case the response contains a header, check it to make sure that the API call was executed successfully
|
||||
# (the presence of the header field is checked by the decode_client method)
|
||||
if 'header' in output:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
return output
|
||||
if response.headers.get('Content-Type').startswith('application/json'):
|
||||
return self.decode(response.json())
|
||||
elif response.headers.get('Content-Type').startswith('text/plain;charset=UTF-8'):
|
||||
return { 'data': response.content.decode('utf-8') }
|
||||
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
|
||||
|
||||
return None
|
||||
|
||||
class JsonHttpApiServer():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
call_handler : handler function to process the request. This function must accept the
|
||||
decoded request as a dictionary. The handler function must return a tuple consisting
|
||||
of the response in the form of a dictionary (may be empty), and a function execution
|
||||
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
|
||||
"""
|
||||
self.api_func = api_func
|
||||
if call_handler:
|
||||
self.call_handler = call_handler
|
||||
else:
|
||||
self.call_handler = self.default_handler
|
||||
|
||||
def default_handler(self, data: dict) -> (dict, str):
|
||||
"""default handler, used in case no call handler is provided."""
|
||||
logger.error("no handler function for request: %s" % str(data))
|
||||
return {}, 'Failed'
|
||||
|
||||
def call(self, request: Request) -> str:
|
||||
""" Process an incoming request.
|
||||
Args:
|
||||
request : request object as received using twisted.web.server
|
||||
Returns:
|
||||
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
|
||||
provided the request object)
|
||||
"""
|
||||
|
||||
# Make sure the request is done with the correct HTTP method
|
||||
if (request.method.decode() != self.api_func.http_method):
|
||||
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
|
||||
|
||||
# Decode the request
|
||||
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
|
||||
|
||||
# Run call handler (see above)
|
||||
data, fe_status = self.call_handler(decoded_request)
|
||||
|
||||
# In case a function execution status is returned, use it to generate and prepend the header field according to
|
||||
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
|
||||
if fe_status:
|
||||
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_server(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
res_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
res_headers.update(self.api_func.extra_http_res_headers)
|
||||
for header, value in res_headers.items():
|
||||
request.setHeader(header, value)
|
||||
request.setResponseCode(self.api_func.expected_http_status)
|
||||
|
||||
# Return the encoded result back to the caller for sending (using twisted/klein)
|
||||
return encoded
|
||||
|
||||
|
||||
@@ -1071,6 +1071,13 @@ class SecurityDomainKey:
|
||||
'keyVersionNumber': bytes([self.key_version_number]),
|
||||
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
||||
|
||||
def get_key_component(self, key_type):
|
||||
for kc in self.key_components:
|
||||
if kc.key_type == key_type:
|
||||
return kc.key_data
|
||||
return None
|
||||
|
||||
|
||||
class ProfileElementSD(ProfileElement):
|
||||
"""Class representing a securityDomain ProfileElement."""
|
||||
type = 'securityDomain'
|
||||
|
||||
357
pySim/esim/saip/batch.py
Normal file
357
pySim/esim/saip/batch.py
Normal file
@@ -0,0 +1,357 @@
|
||||
"""Implementation of Personalization of eSIM profiles in SimAlliance/TCA Interoperable Profile:
|
||||
Run a batch of N personalizations"""
|
||||
|
||||
# (C) 2025-2026 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: nhofmeyr@sysmocom.de
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import copy
|
||||
import pprint
|
||||
from typing import List, Generator
|
||||
from pySim.esim.saip.personalization import ConfigurableParameter
|
||||
from pySim.esim.saip import param_source
|
||||
from pySim.esim.saip import ProfileElementSequence, ProfileElementSD
|
||||
from pySim.global_platform import KeyUsageQualifier
|
||||
from osmocom.utils import b2h
|
||||
|
||||
class BatchPersonalization:
|
||||
"""Produce a series of eSIM profiles from predefined parameters.
|
||||
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
|
||||
|
||||
Usage example:
|
||||
|
||||
der_input = some_file.open('rb').read()
|
||||
pes = ProfileElementSequence.from_der(der_input)
|
||||
p = pers.BatchPersonalization(
|
||||
n=10,
|
||||
src_pes=pes,
|
||||
csv_rows=get_csv_reader())
|
||||
|
||||
p.add_param_and_src(
|
||||
personalization.Iccid(),
|
||||
param_source.IncDigitSource(
|
||||
num_digits=18,
|
||||
first_value=123456789012340001,
|
||||
last_value=123456789012340010))
|
||||
|
||||
# add more parameters here, using ConfigurableParameter and ParamSource subclass instances to define the profile
|
||||
# ...
|
||||
|
||||
# generate all 10 profiles (from n=10 above)
|
||||
for result_pes in p.generate_profiles():
|
||||
upp = result_pes.to_der()
|
||||
store_upp(upp)
|
||||
"""
|
||||
|
||||
class ParamAndSrc:
|
||||
'tie a ConfigurableParameter to a source of actual values'
|
||||
def __init__(self, param: ConfigurableParameter, src: param_source.ParamSource):
|
||||
self.param = param
|
||||
self.src = src
|
||||
|
||||
def __init__(self,
|
||||
n: int,
|
||||
src_pes: ProfileElementSequence,
|
||||
params: list[ParamAndSrc]=None,
|
||||
csv_rows: Generator=None,
|
||||
):
|
||||
"""
|
||||
n: number of eSIM profiles to generate.
|
||||
src_pes: a decoded eSIM profile as ProfileElementSequence, to serve as template. This is not modified, only
|
||||
copied.
|
||||
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
|
||||
profile values.
|
||||
csv_rows: A list or generator producing all CSV rows one at a time, starting with a row containing the column
|
||||
headers. This is compatible with the python csv.reader. Each row gets passed to
|
||||
ParamSource.get_next(), such that ParamSource implementations can access the row items.
|
||||
See param_source.CsvSource.
|
||||
"""
|
||||
self.n = n
|
||||
self.params = params or []
|
||||
self.src_pes = src_pes
|
||||
self.csv_rows = csv_rows
|
||||
|
||||
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
|
||||
self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src))
|
||||
|
||||
def generate_profiles(self):
|
||||
# get first row of CSV: column names
|
||||
csv_columns = None
|
||||
if self.csv_rows:
|
||||
try:
|
||||
csv_columns = next(self.csv_rows)
|
||||
except StopIteration as e:
|
||||
raise ValueError('the input CSV file appears to be empty') from e
|
||||
|
||||
for i in range(self.n):
|
||||
csv_row = None
|
||||
if self.csv_rows and csv_columns:
|
||||
try:
|
||||
csv_row_list = next(self.csv_rows)
|
||||
except StopIteration as e:
|
||||
raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e
|
||||
|
||||
csv_row = dict(zip(csv_columns, csv_row_list))
|
||||
|
||||
pes = copy.deepcopy(self.src_pes)
|
||||
|
||||
for p in self.params:
|
||||
try:
|
||||
input_value = p.src.get_next(csv_row=csv_row)
|
||||
assert input_value is not None
|
||||
value = p.param.__class__.validate_val(input_value)
|
||||
p.param.__class__.apply_val(pes, value)
|
||||
except Exception as e:
|
||||
raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}') from e
|
||||
|
||||
yield pes
|
||||
|
||||
|
||||
class UppAudit(dict):
|
||||
"""
|
||||
Key-value pairs collected from a single UPP DER or PES.
|
||||
|
||||
UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def from_der(cls, der: bytes, params: List, der_size=False, additional_sd_keys=False):
|
||||
'''return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
|
||||
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
|
||||
both 'IMSI' and 'IMSI-ACC' parameters.
|
||||
|
||||
e.g.
|
||||
UppAudit.from_der(my_der, [Imsi, ])
|
||||
--> {'IMSI': '001010000000023', 'IMSI-ACC': '5'}
|
||||
|
||||
(where 'IMSI' == Imsi.name)
|
||||
|
||||
Read all parameters listed in params. params is a list of either ConfigurableParameter classes or
|
||||
ConfigurableParameter class instances. This calls only classmethods, so each entry in params can either be the
|
||||
class itself, or a class-instance of, a (non-abstract) ConfigurableParameter subclass.
|
||||
For example, params = [Imsi, ] is equivalent to params = [Imsi(), ].
|
||||
|
||||
For der_size=True, also include a {'der_size':12345} entry.
|
||||
|
||||
For additional_sd_keys=True, output also all Security Domain KVN that there are *no* ConfigurableParameter
|
||||
subclasses for. For example, SCP80 has reserved kvn 0x01..0x0f, but we offer only Scp80Kvn01, Scp80Kvn02,
|
||||
Scp80Kvn03. So we would not show kvn 0x04..0x0f in an audit. additional_sd_keys=True includes audits of all SD
|
||||
key KVN there may be in the UPP. This helps to spot SD keys that may already be present in a UPP template, with
|
||||
unexpected / unusual kvn.
|
||||
'''
|
||||
|
||||
# make an instance of this class
|
||||
upp_audit = cls()
|
||||
|
||||
if der_size:
|
||||
upp_audit['der_size'] = set((len(der), ))
|
||||
|
||||
pes = ProfileElementSequence.from_der(der)
|
||||
for param in params:
|
||||
try:
|
||||
for valdict in param.get_values_from_pes(pes):
|
||||
upp_audit.add_values(valdict)
|
||||
except Exception as e:
|
||||
raise ValueError(f'Error during audit for parameter {param}: {e}') from e
|
||||
|
||||
if not additional_sd_keys:
|
||||
return upp_audit
|
||||
|
||||
# additional_sd_keys
|
||||
for pe in pes.pe_list:
|
||||
if pe.type != 'securityDomain':
|
||||
continue
|
||||
assert isinstance(pe, ProfileElementSD)
|
||||
|
||||
for key in pe.keys:
|
||||
audit_key = f'SdKey_KVN{key.key_version_number:02x}_ID{key.key_identifier:02x}'
|
||||
kuq_bin = KeyUsageQualifier.build(key.key_usage_qualifier).hex()
|
||||
audit_val = f'{key.key_components=!r} key_usage_qualifier=0x{kuq_bin}={key.key_usage_qualifier!r}'
|
||||
upp_audit[audit_key] = set((audit_val, ))
|
||||
|
||||
return upp_audit
|
||||
|
||||
def get_single_val(self, key, validate=True, allow_absent=False, absent_val=None):
|
||||
"""
|
||||
Return the audit's value for the given audit key (like 'IMSI' or 'IMSI-ACC').
|
||||
Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value,
|
||||
return that value. When they do not agree, raise a ValueError.
|
||||
"""
|
||||
# key should be a string, but if someone passes a ConfigurableParameter, just use its default name
|
||||
if ConfigurableParameter.is_super_of(key):
|
||||
key = key.get_name()
|
||||
|
||||
assert isinstance(key, str)
|
||||
v = self.get(key)
|
||||
if v is None and allow_absent:
|
||||
return absent_val
|
||||
if not isinstance(v, set):
|
||||
raise ValueError(f'audit value should be a set(), got {v!r}')
|
||||
if len(v) != 1:
|
||||
raise ValueError(f'expected a single value for {key}, got {v!r}')
|
||||
v = tuple(v)[0]
|
||||
return v
|
||||
|
||||
@staticmethod
|
||||
def audit_val_to_str(v):
|
||||
"""
|
||||
Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values,
|
||||
audit values are always python sets. Turn it into a nice string representation: only the value when it is
|
||||
unambiguous, otherwise a list of the ambiguous values.
|
||||
A value may also be completely absent, then return 'not present'.
|
||||
"""
|
||||
def try_single_val(w):
|
||||
'change single-entry sets to just the single value'
|
||||
if isinstance(w, set):
|
||||
if len(w) == 1:
|
||||
return tuple(w)[0]
|
||||
if len(w) == 0:
|
||||
return None
|
||||
return w
|
||||
|
||||
v = try_single_val(v)
|
||||
if isinstance(v, bytes):
|
||||
v = bytes_to_hexstr(v)
|
||||
if v is None:
|
||||
return 'not present'
|
||||
return str(v)
|
||||
|
||||
def get_val_str(self, key):
|
||||
"""Return a string of the value stored for the given key"""
|
||||
return UppAudit.audit_val_to_str(self.get(key))
|
||||
|
||||
def add_values(self, src:dict):
|
||||
"""self and src are both a dict of sets.
|
||||
For example from
|
||||
self == { 'a': set((123,)) }
|
||||
and
|
||||
src == { 'a': set((456,)), 'b': set((789,)) }
|
||||
then after this function call:
|
||||
self == { 'a': set((123, 456,)), 'b': set((789,)) }
|
||||
"""
|
||||
assert isinstance(src, dict)
|
||||
for key, srcvalset in src.items():
|
||||
dstvalset = self.get(key)
|
||||
if dstvalset is None:
|
||||
dstvalset = set()
|
||||
self[key] = dstvalset
|
||||
dstvalset.add(srcvalset)
|
||||
|
||||
def __str__(self):
|
||||
return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys()))
|
||||
|
||||
class BatchAudit(list):
|
||||
"""
|
||||
Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization.
|
||||
Produce an output CSV.
|
||||
|
||||
Usage example:
|
||||
|
||||
ba = BatchAudit(params=(personalization.Iccid, ))
|
||||
for upp_der in upps:
|
||||
ba.add_audit(upp_der)
|
||||
print(ba.summarize())
|
||||
|
||||
with open('output.csv', 'wb') as csv_data:
|
||||
csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='')
|
||||
csv.writer(csv_str).writerows( ba.to_csv_rows() )
|
||||
csv_str.flush()
|
||||
|
||||
BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances.
|
||||
"""
|
||||
|
||||
def __init__(self, params:List):
|
||||
assert params
|
||||
self.params = params
|
||||
|
||||
def add_audit(self, upp_der:bytes):
|
||||
audit = UppAudit.from_der(upp_der, self.params)
|
||||
self.append(audit)
|
||||
return audit
|
||||
|
||||
def summarize(self):
|
||||
batch_audit = UppAudit()
|
||||
|
||||
audits = self
|
||||
|
||||
if len(audits) > 2:
|
||||
val_sep = ', ..., '
|
||||
else:
|
||||
val_sep = ', '
|
||||
|
||||
first_audit = None
|
||||
last_audit = None
|
||||
if len(audits) >= 1:
|
||||
first_audit = audits[0]
|
||||
if len(audits) >= 2:
|
||||
last_audit = audits[-1]
|
||||
|
||||
if first_audit:
|
||||
if last_audit:
|
||||
for key in first_audit.keys():
|
||||
first_val = first_audit.get_val_str(key)
|
||||
last_val = last_audit.get_val_str(key)
|
||||
|
||||
if first_val == last_val:
|
||||
val = first_val
|
||||
else:
|
||||
val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}"
|
||||
val = val_sep_with_newline.join((first_val, last_val))
|
||||
batch_audit[key] = val
|
||||
else:
|
||||
batch_audit.update(first_audit)
|
||||
|
||||
return batch_audit
|
||||
|
||||
def to_csv_rows(self, headers=True, sort_key=None):
|
||||
'''generator that yields all audits' values as rows, useful feed to a csv.writer.'''
|
||||
columns = set()
|
||||
for audit in self:
|
||||
columns.update(audit.keys())
|
||||
|
||||
columns = tuple(sorted(columns, key=sort_key))
|
||||
|
||||
if headers:
|
||||
yield columns
|
||||
|
||||
for audit in self:
|
||||
yield (audit.get_single_val(col, allow_absent=True, absent_val="") for col in columns)
|
||||
|
||||
def bytes_to_hexstr(b:bytes, sep=''):
|
||||
return sep.join(f'{x:02x}' for x in b)
|
||||
|
||||
def esim_profile_introspect(upp):
|
||||
pes = ProfileElementSequence.from_der(upp.read())
|
||||
d = {}
|
||||
d['upp'] = repr(pes)
|
||||
|
||||
def show_bytes_as_hexdump(item):
|
||||
if isinstance(item, bytes):
|
||||
return bytes_to_hexstr(item)
|
||||
if isinstance(item, list):
|
||||
return list(show_bytes_as_hexdump(i) for i in item)
|
||||
if isinstance(item, tuple):
|
||||
return tuple(show_bytes_as_hexdump(i) for i in item)
|
||||
if isinstance(item, dict):
|
||||
d = {}
|
||||
for k, v in item.items():
|
||||
d[k] = show_bytes_as_hexdump(v)
|
||||
return d
|
||||
return item
|
||||
|
||||
l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes)
|
||||
d['pp'] = pprint.pformat(l, width=120)
|
||||
return d
|
||||
228
pySim/esim/saip/param_source.py
Normal file
228
pySim/esim/saip/param_source.py
Normal file
@@ -0,0 +1,228 @@
|
||||
# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
|
||||
#
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: nhofmeyr@sysmocom.de
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import secrets
|
||||
import re
|
||||
from osmocom.utils import b2h
|
||||
|
||||
class ParamSourceExn(Exception):
|
||||
pass
|
||||
|
||||
class ParamSourceExhaustedExn(ParamSourceExn):
|
||||
pass
|
||||
|
||||
class ParamSourceUndefinedExn(ParamSourceExn):
|
||||
pass
|
||||
|
||||
class ParamSource:
|
||||
"""abstract parameter source. For usage, see personalization.BatchPersonalization."""
|
||||
|
||||
# This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
|
||||
name = "none"
|
||||
numeric_base = None # or 10 or 16
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
"""Subclasses implement this:
|
||||
if a parameter source defines some string input magic, override this function.
|
||||
For example, a RandomDigitSource derives the number of digits from the string length,
|
||||
so the user can enter '0000' to get a four digit random number."""
|
||||
return cls(s)
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
"""Subclasses implement this: return the next value from the parameter source.
|
||||
When there are no more values from the source, raise a ParamSourceExhaustedExn.
|
||||
This default implementation is an empty source."""
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
|
||||
class ConstantSource(ParamSource):
|
||||
"""one value for all"""
|
||||
name = "constant"
|
||||
|
||||
def __init__(self, val:str):
|
||||
self.val = val
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
return self.val
|
||||
|
||||
class InputExpandingParamSource(ParamSource):
|
||||
|
||||
@classmethod
|
||||
def expand_str(cls, s:str):
|
||||
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
|
||||
if "*" not in s:
|
||||
return s
|
||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
|
||||
if len(tokens) < 3:
|
||||
return s
|
||||
parts = []
|
||||
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
|
||||
parts.append(unchanged)
|
||||
repeat = int(repeat_str)
|
||||
parts.append(snippet * repeat)
|
||||
return "".join(parts)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
return cls(cls.expand_str(s))
|
||||
|
||||
class DecimalRangeSource(InputExpandingParamSource):
|
||||
"""abstract: decimal numbers with a value range"""
|
||||
|
||||
numeric_base = 10
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
"""
|
||||
See also from_str().
|
||||
|
||||
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
|
||||
num_digits: fixed number of digits (possibly with leading zeros) to generate.
|
||||
first_value, last_value: the decimal range in which to provide digits.
|
||||
"""
|
||||
num_digits = int(num_digits)
|
||||
first_value = int(first_value)
|
||||
last_value = int(last_value)
|
||||
assert num_digits > 0
|
||||
assert first_value <= last_value
|
||||
self.num_digits = num_digits
|
||||
self.val_first_last = (first_value, last_value)
|
||||
|
||||
def val_to_digit(self, val:int):
|
||||
return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
|
||||
if ".." in s:
|
||||
first_str, last_str = s.split('..')
|
||||
first_str = first_str.strip()
|
||||
last_str = last_str.strip()
|
||||
else:
|
||||
first_str = s.strip()
|
||||
last_str = None
|
||||
|
||||
first_value = int(first_str)
|
||||
last_value = int(last_str) if last_str is not None else "9" * len(first_str)
|
||||
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
|
||||
|
||||
class RandomSourceMixin:
|
||||
random_impl = secrets.SystemRandom()
|
||||
|
||||
class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
|
||||
"""return a different sequence of random decimal digits each"""
|
||||
name = "random decimal digits"
|
||||
used_keys = set()
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
# try to generate random digits that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randint(*self.val_first_last)
|
||||
if val in RandomDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
continue
|
||||
RandomDigitSource.used_keys.add(val)
|
||||
break
|
||||
return self.val_to_digit(val)
|
||||
|
||||
class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
"""return a different sequence of random hexadecimal digits each"""
|
||||
name = "random hexadecimal digits"
|
||||
numeric_base = 16
|
||||
used_keys = set()
|
||||
|
||||
def __init__(self, num_digits):
|
||||
"""see from_str()"""
|
||||
num_digits = int(num_digits)
|
||||
if num_digits < 1:
|
||||
raise ValueError("zero number of digits")
|
||||
# hex digits always come in two
|
||||
if (num_digits & 1) != 0:
|
||||
raise ValueError(f"hexadecimal value should have even number of digits, not {num_digits}")
|
||||
self.num_digits = num_digits
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
# try to generate random bytes that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randbytes(self.num_digits // 2)
|
||||
if val in RandomHexDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
continue
|
||||
RandomHexDigitSource.used_keys.add(val)
|
||||
break
|
||||
|
||||
return b2h(val)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
return cls(num_digits=len(s.strip()))
|
||||
|
||||
class IncDigitSource(DecimalRangeSource):
|
||||
"""incrementing sequence of digits"""
|
||||
name = "incrementing decimal digits"
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
super().__init__(num_digits, first_value, last_value)
|
||||
self.next_val = None
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
"""Restart from the first value of the defined range passed to __init__()."""
|
||||
self.next_val = self.val_first_last[0]
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = self.next_val
|
||||
if val is None:
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
returnval = self.val_to_digit(val)
|
||||
|
||||
val += 1
|
||||
if val > self.val_first_last[1]:
|
||||
self.next_val = None
|
||||
else:
|
||||
self.next_val = val
|
||||
|
||||
return returnval
|
||||
|
||||
class CsvSource(ParamSource):
|
||||
"""apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)"""
|
||||
name = "from CSV"
|
||||
|
||||
def __init__(self, csv_column):
|
||||
"""
|
||||
csv_column: column name indicating the column to use for this parameter.
|
||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
||||
CsvSource picks the column with the name matching csv_column.
|
||||
"""
|
||||
self.csv_column = csv_column
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = None
|
||||
if csv_row:
|
||||
val = csv_row.get(self.csv_column)
|
||||
if not val:
|
||||
raise ParamSourceUndefinedExn(f"no value for CSV column {self.csv_column!r}")
|
||||
return val
|
||||
@@ -17,12 +17,22 @@
|
||||
|
||||
import abc
|
||||
import io
|
||||
from typing import List, Tuple
|
||||
import os
|
||||
import re
|
||||
import pprint
|
||||
from typing import List, Tuple, Generator, Optional
|
||||
|
||||
from osmocom.tlv import camel_to_snake
|
||||
from pySim.utils import enc_iccid, enc_imsi, h2b, rpad, sanitize_iccid
|
||||
from pySim.esim.saip import ProfileElement, ProfileElementSequence
|
||||
from osmocom.utils import hexstr
|
||||
from pySim.utils import enc_iccid, dec_iccid, enc_imsi, dec_imsi, h2b, b2h, rpad, sanitize_iccid
|
||||
from pySim.ts_51_011 import EF_SMSP
|
||||
from pySim.esim.saip import param_source
|
||||
from pySim.esim.saip import ProfileElement, ProfileElementSD, ProfileElementSequence
|
||||
from pySim.esim.saip import SecurityDomainKey, SecurityDomainKeyComponent
|
||||
from pySim.global_platform import KeyUsageQualifier, KeyType
|
||||
|
||||
def unrpad(s: hexstr, c='f') -> hexstr:
|
||||
return hexstr(s.rstrip(c))
|
||||
|
||||
def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]:
|
||||
"""In a list of tuples, remove all tuples whose first part equals 'unwanted_key'."""
|
||||
@@ -43,7 +53,6 @@ class ClassVarMeta(abc.ABCMeta):
|
||||
x = super().__new__(metacls, name, bases, namespace)
|
||||
for k, v in kwargs.items():
|
||||
setattr(x, k, v)
|
||||
setattr(x, 'name', camel_to_snake(name))
|
||||
return x
|
||||
|
||||
class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
@@ -63,6 +72,7 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
min_len: minimum length of an input str; min_len = 4
|
||||
max_len: maximum length of an input str; max_len = 8
|
||||
allow_len: permit only specific lengths; allow_len = (8, 16, 32)
|
||||
numeric_base: indicate hex / decimal, if any; numeric_base = None; numeric_base = 10; numeric_base = 16
|
||||
|
||||
Subclasses may change the meaning of these by overriding validate_val(), for example that the length counts
|
||||
resulting bytes instead of a hexstring length. Most subclasses will be covered by the default validate_val().
|
||||
@@ -117,6 +127,8 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
max_len = None
|
||||
allow_len = None # a list of specific lengths
|
||||
example_input = None
|
||||
default_source = None # a param_source.ParamSource subclass
|
||||
numeric_base = None # or 10 or 16
|
||||
|
||||
def __init__(self, input_value=None):
|
||||
self.input_value = input_value # the raw input value as given by caller
|
||||
@@ -178,19 +190,28 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
if cls.allow_chars is not None:
|
||||
if any(c not in cls.allow_chars for c in val):
|
||||
raise ValueError(f"invalid characters in input value {val!r}, valid chars are {cls.allow_chars}")
|
||||
elif isinstance(val, io.BytesIO):
|
||||
val = val.getvalue()
|
||||
|
||||
if hasattr(val, '__len__'):
|
||||
val_len = len(val)
|
||||
else:
|
||||
# e.g. int length
|
||||
val_len = len(str(val))
|
||||
|
||||
if cls.allow_len is not None:
|
||||
l = cls.allow_len
|
||||
# cls.allow_len could be one int, or a tuple of ints. Wrap a single int also in a tuple.
|
||||
if not isinstance(l, (tuple, list)):
|
||||
l = (l,)
|
||||
if len(val) not in l:
|
||||
raise ValueError(f'length must be one of {cls.allow_len}, not {len(val)}: {val!r}')
|
||||
if val_len not in l:
|
||||
raise ValueError(f'length must be one of {cls.allow_len}, not {val_len}: {val!r}')
|
||||
if cls.min_len is not None:
|
||||
if len(val) < cls.min_len:
|
||||
raise ValueError(f'length must be at least {cls.min_len}, not {len(val)}: {val!r}')
|
||||
if val_len < cls.min_len:
|
||||
raise ValueError(f'length must be at least {cls.min_len}, not {val_len}: {val!r}')
|
||||
if cls.max_len is not None:
|
||||
if len(val) > cls.max_len:
|
||||
raise ValueError(f'length must be at most {cls.max_len}, not {len(val)}: {val!r}')
|
||||
if val_len > cls.max_len:
|
||||
raise ValueError(f'length must be at most {cls.max_len}, not {val_len}: {val!r}')
|
||||
return val
|
||||
|
||||
@classmethod
|
||||
@@ -199,6 +220,49 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
Write the given val in the right format in all the right places in pes."""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def get_value_from_pes(cls, pes: ProfileElementSequence):
|
||||
"""Same as get_values_from_pes() but expecting a single value.
|
||||
get_values_from_pes() may return values like this:
|
||||
[{ 'AlgorithmID': 'Milenage' }, { 'AlgorithmID': 'Milenage' }]
|
||||
This ensures that all these entries are identical and would return only
|
||||
{ 'AlgorithmID': 'Milenage' }.
|
||||
|
||||
This is relevant for any profile element that may appear multiple times in the same PES (only a few),
|
||||
where each occurrence should reflect the same value (all currently known parameters).
|
||||
"""
|
||||
|
||||
val = None
|
||||
for v in cls.get_values_from_pes(pes):
|
||||
if val is None:
|
||||
val = v
|
||||
elif val != v:
|
||||
raise ValueError(f'get_value_from_pes(): got distinct values: {val!r} != {v!r}')
|
||||
return val
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence) -> Generator:
|
||||
"""This is what subclasses implement: yield all values from a decoded profile package.
|
||||
Find all values in the pes, and yield them decoded to a valid cls.input_value format.
|
||||
Should be a generator function, i.e. use 'yield' instead of 'return'.
|
||||
|
||||
Yielded value must be a dict(). Usually, an implementation will return only one key, like
|
||||
|
||||
{ "ICCID": "1234567890123456789" }
|
||||
|
||||
Some implementations have more than one value to return, like
|
||||
|
||||
{ "IMSI": "00101012345678", "IMSI-ACC" : "5" }
|
||||
|
||||
Implementation example:
|
||||
|
||||
for pe in pes:
|
||||
if my_condition(pe):
|
||||
yield { cls.name: b2h(my_bin_value_from(pe)) }
|
||||
"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def get_len_range(cls):
|
||||
"""considering all of min_len, max_len and allow_len, get a tuple of the resulting (min, max) of permitted
|
||||
@@ -219,6 +283,20 @@ class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
return (None, None)
|
||||
return (min(vals), max(vals))
|
||||
|
||||
@classmethod
|
||||
def get_typical_input_len(cls):
|
||||
'''return a good length to use as the visible width of a user interface input field.
|
||||
May be overridden by subclasses.
|
||||
This default implementation returns the maximum allowed value length -- a good fit for most subclasses.
|
||||
'''
|
||||
return cls.get_len_range()[1] or 16
|
||||
|
||||
@classmethod
|
||||
def is_super_of(cls, other_class):
|
||||
try:
|
||||
return issubclass(other_class, cls)
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
class DecimalParam(ConfigurableParameter):
|
||||
"""Decimal digits. The input value may be a string of decimal digits like '012345', or an int. The output of
|
||||
@@ -226,6 +304,7 @@ class DecimalParam(ConfigurableParameter):
|
||||
"""
|
||||
allow_types = (str, int)
|
||||
allow_chars = '0123456789'
|
||||
numeric_base = 10
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
@@ -256,9 +335,21 @@ class DecimalHexParam(DecimalParam):
|
||||
# a DecimalHexParam subclass expects the apply_val() input to be a bytes instance ready for the pes
|
||||
return h2b(val)
|
||||
|
||||
@classmethod
|
||||
def decimal_hex_to_str(cls, val):
|
||||
'useful for get_values_from_pes() implementations of subclasses'
|
||||
if isinstance(val, bytes):
|
||||
val = b2h(val)
|
||||
assert isinstance(val, hexstr)
|
||||
if cls.rpad is not None:
|
||||
c = cls.rpad_char or 'f'
|
||||
val = unrpad(val, c)
|
||||
return val.to_bytes().decode('ascii')
|
||||
|
||||
class IntegerParam(ConfigurableParameter):
|
||||
allow_types = (str, int)
|
||||
allow_chars = '0123456789'
|
||||
numeric_base = 10
|
||||
|
||||
# two integers, if the resulting int should be range limited
|
||||
min_val = None
|
||||
@@ -279,14 +370,28 @@ class IntegerParam(ConfigurableParameter):
|
||||
raise ValueError(f'Value {val} is out of range, must be [{cls.min_val}..{cls.max_val}]')
|
||||
return val
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for valdict in super().get_values_from_pes(pes):
|
||||
for key, val in valdict.items():
|
||||
if isinstance(val, int):
|
||||
valdict[key] = str(val)
|
||||
yield valdict
|
||||
|
||||
class BinaryParam(ConfigurableParameter):
|
||||
allow_types = (str, io.BytesIO, bytes, bytearray)
|
||||
allow_types = (str, io.BytesIO, bytes, bytearray, int)
|
||||
allow_chars = '0123456789abcdefABCDEF'
|
||||
strip_chars = ' \t\r\n'
|
||||
numeric_base = 16
|
||||
default_source = param_source.RandomHexDigitSource
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
# take care that min_len and max_len are applied to the binary length by converting to bytes first
|
||||
if isinstance(val, int):
|
||||
min_len, _max_len = cls.get_len_range()
|
||||
val = '%0*d' % (min_len, val)
|
||||
|
||||
if isinstance(val, str):
|
||||
if cls.strip_chars is not None:
|
||||
val = ''.join(c for c in val if c not in cls.strip_chars)
|
||||
@@ -301,6 +406,80 @@ class BinaryParam(ConfigurableParameter):
|
||||
val = super().validate_val(val)
|
||||
return bytes(val)
|
||||
|
||||
@classmethod
|
||||
def get_typical_input_len(cls):
|
||||
# override to return twice the length, because of hex digits.
|
||||
min_len, max_len = cls.get_len_range()
|
||||
if max_len is None:
|
||||
return None
|
||||
# two hex characters per value octet.
|
||||
# (maybe *3 to also allow for spaces?)
|
||||
return max_len * 2
|
||||
|
||||
|
||||
class EnumParam(ConfigurableParameter):
|
||||
value_map = {
|
||||
# For example:
|
||||
#'Meaningful label for value 23': 0x23,
|
||||
# Where 0x23 is a valid value to use for apply_val().
|
||||
}
|
||||
_value_map_reverse = None
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
orig_val = val
|
||||
enum_val = None
|
||||
if isinstance(val, str):
|
||||
enum_name = val
|
||||
enum_val = cls.map_name_to_val(enum_name)
|
||||
|
||||
# if the str is not one of the known value_map.keys(), is it maybe one of value_map.keys()?
|
||||
if enum_val is None and val in cls.value_map.values():
|
||||
enum_val = val
|
||||
|
||||
if enum_val not in cls.value_map.values():
|
||||
raise ValueError(f"{cls.get_name()}: invalid argument: {orig_val!r}. Valid arguments are:"
|
||||
f" {', '.join(cls.value_map.keys())}")
|
||||
|
||||
return enum_val
|
||||
|
||||
@classmethod
|
||||
def map_name_to_val(cls, name:str, strict=True):
|
||||
val = cls.value_map.get(name)
|
||||
if val is not None:
|
||||
return val
|
||||
|
||||
clean_name = cls.clean_name_str(name)
|
||||
for k, v in cls.value_map.items():
|
||||
if clean_name == cls.clean_name_str(k):
|
||||
return v
|
||||
|
||||
if strict:
|
||||
raise ValueError(f"Problem in {cls.get_name()}: {name!r} is not a known value."
|
||||
f" Known values are: {cls.value_map.keys()!r}")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def map_val_to_name(cls, val, strict=False) -> str:
|
||||
if cls._value_map_reverse is None:
|
||||
cls._value_map_reverse = dict((v, k) for k, v in cls.value_map.items())
|
||||
|
||||
name = cls._value_map_reverse.get(val)
|
||||
if name:
|
||||
return name
|
||||
if strict:
|
||||
raise ValueError(f"Problem in {cls.get_name()}: {val!r} ({type(val)}) is not a known value."
|
||||
f" Known values are: {cls.value_map.values()!r}")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def name_normalize(cls, name:str) -> str:
|
||||
return cls.map_val_to_name(cls.map_name_to_val(name))
|
||||
|
||||
@classmethod
|
||||
def clean_name_str(cls, val):
|
||||
return re.sub('[^0-9A-Za-z-_]', '', val).lower()
|
||||
|
||||
|
||||
class Iccid(DecimalParam):
|
||||
"""ICCID Parameter. Input: string of decimal digits.
|
||||
@@ -309,6 +488,7 @@ class Iccid(DecimalParam):
|
||||
min_len = 18
|
||||
max_len = 20
|
||||
example_input = '998877665544332211'
|
||||
default_source = param_source.IncDigitSource
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
@@ -322,6 +502,17 @@ class Iccid(DecimalParam):
|
||||
# patch MF/EF.ICCID
|
||||
file_replace_content(pes.get_pe_for_type('mf').decoded['ef-iccid'], h2b(enc_iccid(val)))
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
padded = b2h(pes.get_pe_for_type('header').decoded['iccid'])
|
||||
iccid = unrpad(padded)
|
||||
yield { cls.name: iccid }
|
||||
|
||||
for pe in pes.get_pes_for_type('mf'):
|
||||
iccid_f = pe.files.get('ef-iccid', None)
|
||||
if iccid_f is not None:
|
||||
yield { cls.name: dec_iccid(b2h(iccid_f.body)) }
|
||||
|
||||
class Imsi(DecimalParam):
|
||||
"""Configurable IMSI. Expects value to be a string of digits. Automatically sets the ACC to
|
||||
the last digit of the IMSI."""
|
||||
@@ -330,6 +521,7 @@ class Imsi(DecimalParam):
|
||||
min_len = 6
|
||||
max_len = 15
|
||||
example_input = '00101' + ('0' * 10)
|
||||
default_source = param_source.IncDigitSource
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
@@ -342,6 +534,18 @@ class Imsi(DecimalParam):
|
||||
file_replace_content(pe.decoded['ef-acc'], acc.to_bytes(2, 'big'))
|
||||
# TODO: DF.GSM_ACCESS if not linked?
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for pe in pes.get_pes_for_type('usim'):
|
||||
imsi_f = pe.files.get('ef-imsi', None)
|
||||
acc_f = pe.files.get('ef-acc', None)
|
||||
y = {}
|
||||
if imsi_f:
|
||||
y[cls.name] = dec_imsi(b2h(imsi_f.body))
|
||||
if acc_f:
|
||||
y[cls.name + '-ACC'] = b2h(acc_f.body)
|
||||
yield y
|
||||
|
||||
class SmspTpScAddr(ConfigurableParameter):
|
||||
"""Configurable SMSC (SMS Service Centre) TP-SC-ADDR. Expects to be a phone number in national or
|
||||
international format (designated by a leading +). Automatically sets the NPI to E.164 and the TON based on
|
||||
@@ -350,25 +554,45 @@ class SmspTpScAddr(ConfigurableParameter):
|
||||
name = 'SMSP-TP-SC-ADDR'
|
||||
allow_chars = '+0123456789'
|
||||
strip_chars = ' \t\r\n'
|
||||
numeric_base = 10
|
||||
max_len = 21 # '+' and 20 digits
|
||||
min_len = 1
|
||||
example_input = '+49301234567'
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
val = super().validate_val(val)
|
||||
addr_str = str(val)
|
||||
@staticmethod
|
||||
def str_to_tuple(addr_str):
|
||||
if addr_str[0] == '+':
|
||||
digits = addr_str[1:]
|
||||
international = True
|
||||
else:
|
||||
digits = addr_str
|
||||
international = False
|
||||
return (international, digits)
|
||||
|
||||
@staticmethod
|
||||
def tuple_to_str(addr_tuple):
|
||||
international, digits = addr_tuple
|
||||
if international:
|
||||
ret = '+'
|
||||
else:
|
||||
ret = ''
|
||||
ret += digits
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
val = super().validate_val(val)
|
||||
|
||||
addr_tuple = cls.str_to_tuple(str(val))
|
||||
|
||||
international, digits = addr_tuple
|
||||
if len(digits) > 20:
|
||||
raise ValueError(f'TP-SC-ADDR must not exceed 20 digits: {digits!r}')
|
||||
if not digits.isdecimal():
|
||||
raise ValueError(f'TP-SC-ADDR must only contain decimal digits: {digits!r}')
|
||||
return (international, digits)
|
||||
|
||||
return addr_tuple
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
@@ -398,92 +622,237 @@ class SmspTpScAddr(ConfigurableParameter):
|
||||
# re-generate the pe.decoded member from the File instance
|
||||
pe.file2pe(f_smsp)
|
||||
|
||||
class SdKey(BinaryParam, metaclass=ClassVarMeta):
|
||||
"""Configurable Security Domain (SD) Key. Value is presented as bytes."""
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for pe in pes.get_pes_for_type('usim'):
|
||||
f_smsp = pe.files.get('ef-smsp', None)
|
||||
if f_smsp is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
ef_smsp = EF_SMSP()
|
||||
ef_smsp_dec = ef_smsp.decode_record_bin(f_smsp.body, 1)
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
tp_sc_addr = ef_smsp_dec.get('tp_sc_addr', None)
|
||||
|
||||
digits = tp_sc_addr.get('call_number', None)
|
||||
|
||||
ton_npi = tp_sc_addr.get('ton_npi', None)
|
||||
international = ton_npi.get('type_of_number', None)
|
||||
international = (international == 'international')
|
||||
|
||||
yield { cls.name: cls.tuple_to_str((international, digits)) }
|
||||
|
||||
|
||||
class SdKey(BinaryParam):
|
||||
"""Configurable Security Domain (SD) Key. Value is presented as bytes.
|
||||
Non-abstract implementations are generated in SdKey.generate_sd_key_classes"""
|
||||
# these will be set by subclasses
|
||||
key_type = None
|
||||
key_id = None
|
||||
kvn = None
|
||||
reserved_kvn = tuple() # tuple of all reserved kvn for a given SCPxx
|
||||
key_id = None
|
||||
key_usage_qual = None
|
||||
|
||||
@classmethod
|
||||
def _apply_sd(cls, pe: ProfileElement, value):
|
||||
assert pe.type == 'securityDomain'
|
||||
for key in pe.decoded['keyList']:
|
||||
if key['keyIdentifier'][0] == cls.key_id and key['keyVersionNumber'][0] == cls.kvn:
|
||||
assert len(key['keyComponents']) == 1
|
||||
key['keyComponents'][0]['keyData'] = value
|
||||
return
|
||||
# Could not find matching key to patch, create a new one
|
||||
key = {
|
||||
'keyUsageQualifier': bytes([cls.key_usage_qual]),
|
||||
'keyIdentifier': bytes([cls.key_id]),
|
||||
'keyVersionNumber': bytes([cls.kvn]),
|
||||
'keyComponents': [
|
||||
{ 'keyType': bytes([cls.key_type]), 'keyData': value },
|
||||
]
|
||||
}
|
||||
pe.decoded['keyList'].append(key)
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
set_components = [ SecurityDomainKeyComponent(cls.key_type, val) ]
|
||||
|
||||
for pe in pes.pe_list:
|
||||
if pe.type != 'securityDomain':
|
||||
continue
|
||||
assert isinstance(pe, ProfileElementSD)
|
||||
|
||||
key = pe.find_key(key_version_number=cls.kvn, key_id=cls.key_id)
|
||||
if not key:
|
||||
# Could not find matching key to patch, create a new one
|
||||
key = SecurityDomainKey(
|
||||
key_version_number=cls.kvn,
|
||||
key_id=cls.key_id,
|
||||
key_usage_qualifier=cls.key_usage_qual,
|
||||
key_components=set_components,
|
||||
)
|
||||
pe.add_key(key)
|
||||
else:
|
||||
# A key of this KVN and ID already exists in the profile.
|
||||
|
||||
# Keep the key_usage_qualifier as it was in the profile, so skip this here:
|
||||
# key.key_usage_qualifier = cls.key_usage_qual
|
||||
|
||||
key.key_components = set_components
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, value):
|
||||
for pe in pes.get_pes_for_type('securityDomain'):
|
||||
cls._apply_sd(pe, value)
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for pe in pes.pe_list:
|
||||
if pe.type != 'securityDomain':
|
||||
continue
|
||||
assert isinstance(pe, ProfileElementSD)
|
||||
|
||||
class SdKeyScp80_01(SdKey, kvn=0x01, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
||||
pass
|
||||
class SdKeyScp80_01Kic(SdKeyScp80_01, key_id=0x01, key_usage_qual=0x18): # FIXME: ordering?
|
||||
pass
|
||||
class SdKeyScp80_01Kid(SdKeyScp80_01, key_id=0x02, key_usage_qual=0x14):
|
||||
pass
|
||||
class SdKeyScp80_01Kik(SdKeyScp80_01, key_id=0x03, key_usage_qual=0x48):
|
||||
pass
|
||||
|
||||
class SdKeyScp81_01(SdKey, kvn=0x81): # FIXME
|
||||
pass
|
||||
class SdKeyScp81_01Psk(SdKeyScp81_01, key_id=0x01, key_type=0x85, key_usage_qual=0x3C):
|
||||
pass
|
||||
class SdKeyScp81_01Dek(SdKeyScp81_01, key_id=0x02, key_type=0x88, key_usage_qual=0x48):
|
||||
pass
|
||||
|
||||
class SdKeyScp02_20(SdKey, kvn=0x20, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
||||
pass
|
||||
class SdKeyScp02_20Enc(SdKeyScp02_20, key_id=0x01, key_usage_qual=0x18):
|
||||
pass
|
||||
class SdKeyScp02_20Mac(SdKeyScp02_20, key_id=0x02, key_usage_qual=0x14):
|
||||
pass
|
||||
class SdKeyScp02_20Dek(SdKeyScp02_20, key_id=0x03, key_usage_qual=0x48):
|
||||
pass
|
||||
|
||||
class SdKeyScp03_30(SdKey, kvn=0x30, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
||||
pass
|
||||
class SdKeyScp03_30Enc(SdKeyScp03_30, key_id=0x01, key_usage_qual=0x18):
|
||||
pass
|
||||
class SdKeyScp03_30Mac(SdKeyScp03_30, key_id=0x02, key_usage_qual=0x14):
|
||||
pass
|
||||
class SdKeyScp03_30Dek(SdKeyScp03_30, key_id=0x03, key_usage_qual=0x48):
|
||||
pass
|
||||
|
||||
class SdKeyScp03_31(SdKey, kvn=0x31, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
||||
pass
|
||||
class SdKeyScp03_31Enc(SdKeyScp03_31, key_id=0x01, key_usage_qual=0x18):
|
||||
pass
|
||||
class SdKeyScp03_31Mac(SdKeyScp03_31, key_id=0x02, key_usage_qual=0x14):
|
||||
pass
|
||||
class SdKeyScp03_31Dek(SdKeyScp03_31, key_id=0x03, key_usage_qual=0x48):
|
||||
pass
|
||||
|
||||
class SdKeyScp03_32(SdKey, kvn=0x32, key_type=0x88, permitted_len=[16,24,32]): # AES key type
|
||||
pass
|
||||
class SdKeyScp03_32Enc(SdKeyScp03_32, key_id=0x01, key_usage_qual=0x18):
|
||||
pass
|
||||
class SdKeyScp03_32Mac(SdKeyScp03_32, key_id=0x02, key_usage_qual=0x14):
|
||||
pass
|
||||
class SdKeyScp03_32Dek(SdKeyScp03_32, key_id=0x03, key_usage_qual=0x48):
|
||||
pass
|
||||
key = pe.find_key(key_version_number=cls.kvn, key_id=cls.key_id)
|
||||
if not key:
|
||||
continue
|
||||
kc = key.get_key_component(cls.key_type)
|
||||
if kc:
|
||||
yield { cls.name: b2h(kc) }
|
||||
|
||||
|
||||
NO_OP = (('', {}))
|
||||
|
||||
LEN_128 = (16,)
|
||||
LEN_128_192_256 = (16, 24, 32)
|
||||
LEN_128_256 = (16, 32)
|
||||
|
||||
DES = ('DES', dict(key_type=KeyType.des, allow_len=LEN_128) )
|
||||
AES = ('AES', dict(key_type=KeyType.aes, allow_len=LEN_128_192_256) )
|
||||
|
||||
ENC = ('ENC', dict(key_id=0x01, key_usage_qual=0x18) )
|
||||
MAC = ('MAC', dict(key_id=0x02, key_usage_qual=0x14) )
|
||||
DEK = ('DEK', dict(key_id=0x03, key_usage_qual=0x48) )
|
||||
|
||||
TLSPSK_PSK = ('TLSPSK', dict(key_type=KeyType.tls_psk, key_id=0x01, key_usage_qual=0x3c, allow_len=LEN_128_192_256) )
|
||||
TLSPSK_DEK = ('DEK', dict(key_id=0x02, key_usage_qual=0x48) )
|
||||
|
||||
# THIS IS THE LIST that controls which SdKeyXxx subclasses exist:
|
||||
SD_KEY_DEFS = (
|
||||
# name KVN x variants x variants
|
||||
('SCP02', (0x20, 0x21, 0x22, 0xff), (AES, ), (ENC, MAC, DEK) ),
|
||||
('SCP03', (0x30, 0x31, 0x32), (AES, ), (ENC, MAC, DEK) ),
|
||||
('SCP80', (0x01, 0x02, 0x03), (DES, AES), (ENC, MAC, DEK) ),
|
||||
|
||||
# key_id=1
|
||||
('SCP81', (0x40, 0x41, 0x42), (TLSPSK_PSK, ), ),
|
||||
# key_id=2
|
||||
('SCP81', (0x40, 0x41, 0x42), (DES, AES), (TLSPSK_DEK, ) ),
|
||||
)
|
||||
|
||||
all_implementations = None
|
||||
|
||||
@classmethod
|
||||
def generate_sd_key_classes(cls, sd_key_defs=SD_KEY_DEFS):
|
||||
'''This generates python classes to be exported in this module, as subclasses of class SdKey.
|
||||
|
||||
We create SdKey subclasses dynamically from a list.
|
||||
You can list all of them via:
|
||||
from pySim.esim.saip.personalization import SdKey
|
||||
SdKey.all_implementations
|
||||
or
|
||||
print('\n'.join(sorted(f'{x.__name__}\t{x.name}' for x in SdKey.all_implementations)))
|
||||
|
||||
at time of writing this comment, this prints:
|
||||
|
||||
SdKeyScp02Kvn20AesDek SCP02-KVN20-AES-DEK
|
||||
SdKeyScp02Kvn20AesEnc SCP02-KVN20-AES-ENC
|
||||
SdKeyScp02Kvn20AesMac SCP02-KVN20-AES-MAC
|
||||
SdKeyScp02Kvn21AesDek SCP02-KVN21-AES-DEK
|
||||
SdKeyScp02Kvn21AesEnc SCP02-KVN21-AES-ENC
|
||||
SdKeyScp02Kvn21AesMac SCP02-KVN21-AES-MAC
|
||||
SdKeyScp02Kvn22AesDek SCP02-KVN22-AES-DEK
|
||||
SdKeyScp02Kvn22AesEnc SCP02-KVN22-AES-ENC
|
||||
SdKeyScp02Kvn22AesMac SCP02-KVN22-AES-MAC
|
||||
SdKeyScp02KvnffAesDek SCP02-KVNff-AES-DEK
|
||||
SdKeyScp02KvnffAesEnc SCP02-KVNff-AES-ENC
|
||||
SdKeyScp02KvnffAesMac SCP02-KVNff-AES-MAC
|
||||
SdKeyScp03Kvn30AesDek SCP03-KVN30-AES-DEK
|
||||
SdKeyScp03Kvn30AesEnc SCP03-KVN30-AES-ENC
|
||||
SdKeyScp03Kvn30AesMac SCP03-KVN30-AES-MAC
|
||||
SdKeyScp03Kvn31AesDek SCP03-KVN31-AES-DEK
|
||||
SdKeyScp03Kvn31AesEnc SCP03-KVN31-AES-ENC
|
||||
SdKeyScp03Kvn31AesMac SCP03-KVN31-AES-MAC
|
||||
SdKeyScp03Kvn32AesDek SCP03-KVN32-AES-DEK
|
||||
SdKeyScp03Kvn32AesEnc SCP03-KVN32-AES-ENC
|
||||
SdKeyScp03Kvn32AesMac SCP03-KVN32-AES-MAC
|
||||
SdKeyScp80Kvn01AesDek SCP80-KVN01-AES-DEK
|
||||
SdKeyScp80Kvn01AesEnc SCP80-KVN01-AES-ENC
|
||||
SdKeyScp80Kvn01AesMac SCP80-KVN01-AES-MAC
|
||||
SdKeyScp80Kvn01DesDek SCP80-KVN01-DES-DEK
|
||||
SdKeyScp80Kvn01DesEnc SCP80-KVN01-DES-ENC
|
||||
SdKeyScp80Kvn01DesMac SCP80-KVN01-DES-MAC
|
||||
SdKeyScp80Kvn02AesDek SCP80-KVN02-AES-DEK
|
||||
SdKeyScp80Kvn02AesEnc SCP80-KVN02-AES-ENC
|
||||
SdKeyScp80Kvn02AesMac SCP80-KVN02-AES-MAC
|
||||
SdKeyScp80Kvn02DesDek SCP80-KVN02-DES-DEK
|
||||
SdKeyScp80Kvn02DesEnc SCP80-KVN02-DES-ENC
|
||||
SdKeyScp80Kvn02DesMac SCP80-KVN02-DES-MAC
|
||||
SdKeyScp80Kvn03AesDek SCP80-KVN03-AES-DEK
|
||||
SdKeyScp80Kvn03AesEnc SCP80-KVN03-AES-ENC
|
||||
SdKeyScp80Kvn03AesMac SCP80-KVN03-AES-MAC
|
||||
SdKeyScp80Kvn03DesDek SCP80-KVN03-DES-DEK
|
||||
SdKeyScp80Kvn03DesEnc SCP80-KVN03-DES-ENC
|
||||
SdKeyScp80Kvn03DesMac SCP80-KVN03-DES-MAC
|
||||
SdKeyScp81Kvn40AesDek SCP81-KVN40-AES-DEK
|
||||
SdKeyScp81Kvn40DesDek SCP81-KVN40-DES-DEK
|
||||
SdKeyScp81Kvn40Tlspsk SCP81-KVN40-TLSPSK
|
||||
SdKeyScp81Kvn41AesDek SCP81-KVN41-AES-DEK
|
||||
SdKeyScp81Kvn41DesDek SCP81-KVN41-DES-DEK
|
||||
SdKeyScp81Kvn41Tlspsk SCP81-KVN41-TLSPSK
|
||||
SdKeyScp81Kvn42AesDek SCP81-KVN42-AES-DEK
|
||||
SdKeyScp81Kvn42DesDek SCP81-KVN42-DES-DEK
|
||||
SdKeyScp81Kvn42Tlspsk SCP81-KVN42-TLSPSK
|
||||
'''
|
||||
|
||||
SdKey.all_implementations = []
|
||||
|
||||
def camel(s):
|
||||
return s[:1].upper() + s[1:].lower()
|
||||
|
||||
def do_variants(name, kvn, remaining_variants, labels=[], attrs={}):
|
||||
'recurse to unfold as many variants as there may be'
|
||||
if remaining_variants:
|
||||
# not a leaf node, collect more labels and attrs
|
||||
variants = remaining_variants[0]
|
||||
remaining_variants = remaining_variants[1:]
|
||||
|
||||
for label, valdict in variants:
|
||||
# pass copies to recursion
|
||||
inner_labels = list(labels)
|
||||
inner_attrs = dict(attrs)
|
||||
|
||||
inner_labels.append(label)
|
||||
inner_attrs.update(valdict)
|
||||
do_variants(name, kvn, remaining_variants,
|
||||
labels=inner_labels,
|
||||
attrs=inner_attrs)
|
||||
return
|
||||
|
||||
# leaf node. create a new class with all the accumulated vals
|
||||
parts = [name, f'KVN{kvn:02x}',] + labels
|
||||
cls_label = '-'.join(p for p in parts if p)
|
||||
|
||||
parts = ['Sd', 'Key', name, f'Kvn{kvn:02x}'] + labels
|
||||
clsname = ''.join(camel(p) for p in parts)
|
||||
|
||||
max_key_len = attrs.get('allow_len')[-1]
|
||||
|
||||
attrs.update({
|
||||
'name' : cls_label,
|
||||
'kvn': kvn,
|
||||
'example_input': f'00*{max_key_len}',
|
||||
})
|
||||
|
||||
# below line is like
|
||||
# class SdKeyScpNNKvnXXYyyZzz(SdKey):
|
||||
# <set attrs>
|
||||
cls_def = type(clsname, (cls,), attrs)
|
||||
|
||||
# for some unknown reason, subclassing from abc.ABC makes cls_def.__module__ == 'abc',
|
||||
# but we don't want 'abc.SdKeyScp03Kvn32AesEnc'.
|
||||
# Make sure it is 'pySim.esim.saip.personalization.SdKeyScp03Kvn32AesEnc'
|
||||
cls_def.__module__ = __name__
|
||||
|
||||
globals()[clsname] = cls_def
|
||||
SdKey.all_implementations.append(cls_def)
|
||||
|
||||
|
||||
for items in sd_key_defs:
|
||||
name, kvns = items[:2]
|
||||
variants = items[2:]
|
||||
for kvn in kvns:
|
||||
do_variants(name, kvn, variants)
|
||||
|
||||
# this creates all of the classes named like SdKeyScp02Kvn20AesDek to be published in this python module:
|
||||
SdKey.generate_sd_key_classes()
|
||||
|
||||
def obtain_all_pe_from_pelist(l: List[ProfileElement], wanted_type: str) -> ProfileElement:
|
||||
return (pe for pe in l if pe.type == wanted_type)
|
||||
@@ -502,7 +871,8 @@ class Puk(DecimalHexParam):
|
||||
allow_len = 8
|
||||
rpad = 16
|
||||
keyReference = None
|
||||
example_input = '0' * allow_len
|
||||
example_input = f'0*{allow_len}'
|
||||
default_source = param_source.RandomDigitSource
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
@@ -516,6 +886,14 @@ class Puk(DecimalHexParam):
|
||||
raise ValueError("input template UPP has unexpected structure:"
|
||||
f" cannot find pukCode with keyReference={cls.keyReference}")
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
mf_pes = pes.pes_by_naa['mf'][0]
|
||||
for pukCodes in obtain_all_pe_from_pelist(mf_pes, 'pukCodes'):
|
||||
for pukCode in pukCodes.decoded['pukCodes']:
|
||||
if pukCode['keyReference'] == cls.keyReference:
|
||||
yield { cls.name: cls.decimal_hex_to_str(pukCode['pukValue']) }
|
||||
|
||||
class Puk1(Puk):
|
||||
name = 'PUK1'
|
||||
keyReference = 0x01
|
||||
@@ -529,7 +907,8 @@ class Pin(DecimalHexParam):
|
||||
rpad = 16
|
||||
min_len = 4
|
||||
max_len = 8
|
||||
example_input = '0' * max_len
|
||||
example_input = f'0*{max_len}'
|
||||
default_source = param_source.RandomDigitSource
|
||||
keyReference = None
|
||||
|
||||
@staticmethod
|
||||
@@ -551,9 +930,24 @@ class Pin(DecimalHexParam):
|
||||
raise ValueError('input template UPP has unexpected structure:'
|
||||
+ f' {cls.get_name()} cannot find pinCode with keyReference={cls.keyReference}')
|
||||
|
||||
@classmethod
|
||||
def _read_all_pinvalues_from_pe(cls, pe: ProfileElement):
|
||||
"This is a separate function because subclasses may feed different pe arguments."
|
||||
for pinCodes in obtain_all_pe_from_pelist(pe, 'pinCodes'):
|
||||
if pinCodes.decoded['pinCodes'][0] != 'pinconfig':
|
||||
continue
|
||||
|
||||
for pinCode in pinCodes.decoded['pinCodes'][1]:
|
||||
if pinCode['keyReference'] == cls.keyReference:
|
||||
yield { cls.name: cls.decimal_hex_to_str(pinCode['pinValue']) }
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
yield from cls._read_all_pinvalues_from_pe(pes.pes_by_naa['mf'][0])
|
||||
|
||||
class Pin1(Pin):
|
||||
name = 'PIN1'
|
||||
example_input = '0' * 4 # PIN are usually 4 digits
|
||||
example_input = '0*4' # PIN are usually 4 digits
|
||||
keyReference = 0x01
|
||||
|
||||
class Pin2(Pin1):
|
||||
@@ -572,6 +966,14 @@ class Pin2(Pin1):
|
||||
raise ValueError('input template UPP has unexpected structure:'
|
||||
+ f' {cls.get_name()} cannot find pinCode with keyReference={cls.keyReference} in {naa=}')
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for naa in pes.pes_by_naa:
|
||||
if naa not in ['usim','isim','csim','telecom']:
|
||||
continue
|
||||
for pe in pes.pes_by_naa[naa]:
|
||||
yield from cls._read_all_pinvalues_from_pe(pe)
|
||||
|
||||
class Adm1(Pin):
|
||||
name = 'ADM1'
|
||||
keyReference = 0x0A
|
||||
@@ -596,26 +998,61 @@ class AlgoConfig(ConfigurableParameter):
|
||||
raise ValueError('input template UPP has unexpected structure:'
|
||||
f' {cls.__name__} cannot find algoParameter with key={cls.algo_config_key}')
|
||||
|
||||
class AlgorithmID(DecimalParam, AlgoConfig):
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for pe in pes.get_pes_for_type('akaParameter'):
|
||||
algoConfiguration = pe.decoded['algoConfiguration']
|
||||
if len(algoConfiguration) < 2:
|
||||
continue
|
||||
if algoConfiguration[0] != 'algoParameter':
|
||||
continue
|
||||
if not algoConfiguration[1]:
|
||||
continue
|
||||
val = algoConfiguration[1].get(cls.algo_config_key, None)
|
||||
if val is None:
|
||||
continue
|
||||
if isinstance(val, bytes):
|
||||
val = b2h(val)
|
||||
# if it is an int (algorithmID), just pass thru as int
|
||||
yield { cls.name: val }
|
||||
|
||||
class AlgorithmID(EnumParam, AlgoConfig):
|
||||
'''use validate_val() from EnumParam, and apply_val() from AlgoConfig.
|
||||
In get_values_from_pes(), return enum value names, not raw values.'''
|
||||
name = "Algorithm"
|
||||
|
||||
# as in pySim/esim/asn1/saip/PE_Definitions-3.3.1.asn
|
||||
value_map = {
|
||||
"Milenage" : 1,
|
||||
"TUAK" : 2,
|
||||
"usim-test" : 3,
|
||||
}
|
||||
example_input = "Milenage"
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
algo_config_key = 'algorithmID'
|
||||
allow_len = 1
|
||||
example_input = 1 # Milenage
|
||||
|
||||
# EnumParam.validate_val() returns the int values from value_map
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
val = super().validate_val(val)
|
||||
val = int(val)
|
||||
valid = (1, 2, 3)
|
||||
if val not in valid:
|
||||
raise ValueError(f'Invalid algorithmID {val!r}, must be one of {valid}')
|
||||
return val
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
# return enum names, not raw values.
|
||||
# use of super(): this intends to call AlgoConfig.get_values_from_pes() so that the cls argument is this cls
|
||||
# here (AlgorithmID); i.e. AlgoConfig.get_values_from_pes(pes) doesn't work, because AlgoConfig needs to look up
|
||||
# cls.algo_config_key.
|
||||
for d in super(cls, cls).get_values_from_pes(pes):
|
||||
if cls.name in d:
|
||||
# convert int to value string
|
||||
val = d[cls.name]
|
||||
d[cls.name] = cls.map_val_to_name(val, strict=True)
|
||||
yield d
|
||||
|
||||
class K(BinaryParam, AlgoConfig):
|
||||
"""use validate_val() from BinaryParam, and apply_val() from AlgoConfig"""
|
||||
name = 'K'
|
||||
algo_config_key = 'key'
|
||||
allow_len = (128 // 8, 256 // 8) # length in bytes (from BinaryParam); TUAK also allows 256 bit
|
||||
example_input = '00' * allow_len[0]
|
||||
example_input = f'00*{allow_len[0]}'
|
||||
|
||||
class Opc(K):
|
||||
name = 'OPc'
|
||||
@@ -629,6 +1066,7 @@ class MilenageRotationConstants(BinaryParam, AlgoConfig):
|
||||
algo_config_key = 'rotationConstants'
|
||||
allow_len = 5 # length in bytes (from BinaryParam)
|
||||
example_input = '40 00 20 40 60'
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
@@ -659,6 +1097,7 @@ class MilenageXoringConstants(BinaryParam, AlgoConfig):
|
||||
' 00000000000000000000000000000002'
|
||||
' 00000000000000000000000000000004'
|
||||
' 00000000000000000000000000000008')
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
|
||||
"""Number of iterations of Keccak-f[1600] permutation as recomended by Section 7.2 of 3GPP TS 35.231"""
|
||||
@@ -667,3 +1106,4 @@ class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
|
||||
min_val = 1
|
||||
max_val = 255
|
||||
example_input = '1'
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
@@ -91,6 +91,7 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
||||
|
||||
# Key Usage:
|
||||
# KVN 0x01 .. 0x0F reserved for SCP80
|
||||
# KVN 0x81 .. 0x8f reserved for SCP81
|
||||
# KVN 0x11 reserved for DAP specified in ETSI TS 102 226
|
||||
# KVN 0x20 .. 0x2F reserved for SCP02
|
||||
# KID 0x01 = ENC; 0x02 = MAC; 0x03 = DEK
|
||||
|
||||
1
tests/unittests/smdpp_data
Symbolic link
1
tests/unittests/smdpp_data
Symbolic link
@@ -0,0 +1 @@
|
||||
../../smdpp-data
|
||||
448
tests/unittests/test_configurable_parameters.py
Executable file
448
tests/unittests/test_configurable_parameters.py
Executable file
@@ -0,0 +1,448 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: Neels Hofmeyr
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import io
|
||||
import sys
|
||||
import unittest
|
||||
import io
|
||||
from importlib import resources
|
||||
from osmocom.utils import hexstr
|
||||
from pySim.esim.saip import ProfileElementSequence
|
||||
import pySim.esim.saip.personalization as p13n
|
||||
import smdpp_data.upp
|
||||
|
||||
import xo
|
||||
update_expected_output = False
|
||||
|
||||
def valstr(val):
|
||||
if isinstance(val, io.BytesIO):
|
||||
val = val.getvalue()
|
||||
if isinstance(val, bytearray):
|
||||
val = bytes(val)
|
||||
return f'{val!r}'
|
||||
|
||||
def valtypestr(val):
|
||||
if isinstance(val, dict):
|
||||
types = []
|
||||
for v in val.values():
|
||||
types.append(f'{type(v).__name__}')
|
||||
|
||||
val_type = '{' + ', '.join(types) + '}'
|
||||
else:
|
||||
val_type = f'{type(val).__name__}'
|
||||
return f'{valstr(val)}:{val_type}'
|
||||
|
||||
class ConfigurableParameterTest(unittest.TestCase):
|
||||
|
||||
def test_parameters(self):
|
||||
|
||||
upp_fnames = (
|
||||
'TS48v5_SAIP2.1A_NoBERTLV.der',
|
||||
'TS48v5_SAIP2.3_BERTLV_SUCI.der',
|
||||
'TS48v5_SAIP2.1B_NoBERTLV.der',
|
||||
'TS48v5_SAIP2.3_NoBERTLV.der',
|
||||
)
|
||||
|
||||
class Paramtest:
|
||||
def __init__(self, param_cls, val, expect_val, expect_clean_val=None):
|
||||
self.param_cls = param_cls
|
||||
self.val = val
|
||||
self.expect_clean_val = expect_clean_val
|
||||
self.expect_val = expect_val
|
||||
|
||||
param_tests = [
|
||||
Paramtest(param_cls=p13n.Imsi, val='123456',
|
||||
expect_clean_val=str('123456'),
|
||||
expect_val={'IMSI': hexstr('123456'),
|
||||
'IMSI-ACC': '0040'}),
|
||||
Paramtest(param_cls=p13n.Imsi, val=int(123456),
|
||||
expect_val={'IMSI': hexstr('123456'),
|
||||
'IMSI-ACC': '0040'}),
|
||||
|
||||
Paramtest(param_cls=p13n.Imsi, val='123456789012345',
|
||||
expect_clean_val=str('123456789012345'),
|
||||
expect_val={'IMSI': hexstr('123456789012345'),
|
||||
'IMSI-ACC': '0020'}),
|
||||
Paramtest(param_cls=p13n.Imsi, val=int(123456789012345),
|
||||
expect_val={'IMSI': hexstr('123456789012345'),
|
||||
'IMSI-ACC': '0020'}),
|
||||
|
||||
Paramtest(param_cls=p13n.Puk1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Puk1,
|
||||
val=int(12345678),
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Puk2,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='1234',
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='123456',
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(1234),
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(123456),
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Pin1,
|
||||
val=int(12345678),
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='1234',
|
||||
expect_clean_val=b'1234\xff\xff\xff\xff',
|
||||
expect_val='1234'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='123456',
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val='12345678',
|
||||
expect_clean_val=b'12345678',
|
||||
expect_val='12345678'),
|
||||
Paramtest(param_cls=p13n.Adm1,
|
||||
val=int(123456),
|
||||
expect_clean_val=b'123456\xff\xff',
|
||||
expect_val='123456'),
|
||||
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='Milenage',
|
||||
expect_clean_val=1,
|
||||
expect_val='Milenage'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='TUAK',
|
||||
expect_clean_val=2,
|
||||
expect_val='TUAK'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val='usim-test',
|
||||
expect_clean_val=3,
|
||||
expect_val='usim-test'),
|
||||
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=1,
|
||||
expect_clean_val=1,
|
||||
expect_val='Milenage'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=2,
|
||||
expect_clean_val=2,
|
||||
expect_val='TUAK'),
|
||||
Paramtest(param_cls=p13n.AlgorithmID,
|
||||
val=3,
|
||||
expect_clean_val=3,
|
||||
expect_val='usim-test'),
|
||||
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.K,
|
||||
val=int(11020304050607080910111213141516),
|
||||
expect_clean_val=b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='11020304050607080910111213141516'),
|
||||
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val='01020304050607080910111213141516',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
Paramtest(param_cls=p13n.Opc,
|
||||
val=io.BytesIO(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'),
|
||||
expect_clean_val=b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16',
|
||||
expect_val='01020304050607080910111213141516'),
|
||||
|
||||
Paramtest(param_cls=p13n.SmspTpScAddr,
|
||||
val='+1234567',
|
||||
expect_clean_val=(True, '1234567'),
|
||||
expect_val='+1234567'),
|
||||
Paramtest(param_cls=p13n.SmspTpScAddr,
|
||||
val=1234567,
|
||||
expect_clean_val=(False, '1234567'),
|
||||
expect_val='1234567'),
|
||||
|
||||
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
|
||||
val='123',
|
||||
expect_clean_val=123,
|
||||
expect_val='123'),
|
||||
Paramtest(param_cls=p13n.TuakNumberOfKeccak,
|
||||
val=123,
|
||||
expect_clean_val=123,
|
||||
expect_val='123'),
|
||||
|
||||
Paramtest(param_cls=p13n.MilenageRotationConstants,
|
||||
val='0a 0b 0c 01 02',
|
||||
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
|
||||
expect_val='0a0b0c0102'),
|
||||
Paramtest(param_cls=p13n.MilenageRotationConstants,
|
||||
val=b'\x0a\x0b\x0c\x01\x02',
|
||||
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
|
||||
expect_val='0a0b0c0102'),
|
||||
Paramtest(param_cls=p13n.MilenageRotationConstants,
|
||||
val=bytearray(b'\x0a\x0b\x0c\x01\x02'),
|
||||
expect_clean_val=b'\x0a\x0b\x0c\x01\x02',
|
||||
expect_val='0a0b0c0102'),
|
||||
|
||||
Paramtest(param_cls=p13n.MilenageXoringConstants,
|
||||
val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
||||
' bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
|
||||
' cccccccccccccccccccccccccccccccc'
|
||||
' 11111111111111111111111111111111'
|
||||
' 22222222222222222222222222222222',
|
||||
expect_clean_val=b'\xaa' * 16
|
||||
+ b'\xbb' * 16
|
||||
+ b'\xcc' * 16
|
||||
+ b'\x11' * 16
|
||||
+ b'\x22' * 16,
|
||||
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
||||
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
|
||||
'cccccccccccccccccccccccccccccccc'
|
||||
'11111111111111111111111111111111'
|
||||
'22222222222222222222222222222222'),
|
||||
Paramtest(param_cls=p13n.MilenageXoringConstants,
|
||||
val=b'\xaa' * 16
|
||||
+ b'\xbb' * 16
|
||||
+ b'\xcc' * 16
|
||||
+ b'\x11' * 16
|
||||
+ b'\x22' * 16,
|
||||
expect_clean_val=b'\xaa' * 16
|
||||
+ b'\xbb' * 16
|
||||
+ b'\xcc' * 16
|
||||
+ b'\x11' * 16
|
||||
+ b'\x22' * 16,
|
||||
expect_val='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
||||
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
|
||||
'cccccccccccccccccccccccccccccccc'
|
||||
'11111111111111111111111111111111'
|
||||
'22222222222222222222222222222222'),
|
||||
|
||||
]
|
||||
|
||||
for sdkey_cls in (
|
||||
# thin out the number of tests, as a compromise between completeness and test runtime
|
||||
p13n.SdKeyScp02Kvn20AesDek,
|
||||
#p13n.SdKeyScp02Kvn20AesEnc,
|
||||
#p13n.SdKeyScp02Kvn20AesMac,
|
||||
#p13n.SdKeyScp02Kvn21AesDek,
|
||||
p13n.SdKeyScp02Kvn21AesEnc,
|
||||
#p13n.SdKeyScp02Kvn21AesMac,
|
||||
#p13n.SdKeyScp02Kvn22AesDek,
|
||||
#p13n.SdKeyScp02Kvn22AesEnc,
|
||||
p13n.SdKeyScp02Kvn22AesMac,
|
||||
#p13n.SdKeyScp02KvnffAesDek,
|
||||
#p13n.SdKeyScp02KvnffAesEnc,
|
||||
#p13n.SdKeyScp02KvnffAesMac,
|
||||
p13n.SdKeyScp03Kvn30AesDek,
|
||||
#p13n.SdKeyScp03Kvn30AesEnc,
|
||||
#p13n.SdKeyScp03Kvn30AesMac,
|
||||
#p13n.SdKeyScp03Kvn31AesDek,
|
||||
p13n.SdKeyScp03Kvn31AesEnc,
|
||||
#p13n.SdKeyScp03Kvn31AesMac,
|
||||
#p13n.SdKeyScp03Kvn32AesDek,
|
||||
#p13n.SdKeyScp03Kvn32AesEnc,
|
||||
p13n.SdKeyScp03Kvn32AesMac,
|
||||
#p13n.SdKeyScp80Kvn01AesDek,
|
||||
#p13n.SdKeyScp80Kvn01AesEnc,
|
||||
#p13n.SdKeyScp80Kvn01AesMac,
|
||||
p13n.SdKeyScp80Kvn01DesDek,
|
||||
#p13n.SdKeyScp80Kvn01DesEnc,
|
||||
#p13n.SdKeyScp80Kvn01DesMac,
|
||||
#p13n.SdKeyScp80Kvn02AesDek,
|
||||
p13n.SdKeyScp80Kvn02AesEnc,
|
||||
#p13n.SdKeyScp80Kvn02AesMac,
|
||||
#p13n.SdKeyScp80Kvn02DesDek,
|
||||
#p13n.SdKeyScp80Kvn02DesEnc,
|
||||
p13n.SdKeyScp80Kvn02DesMac,
|
||||
#p13n.SdKeyScp80Kvn03AesDek,
|
||||
#p13n.SdKeyScp80Kvn03AesEnc,
|
||||
#p13n.SdKeyScp80Kvn03AesMac,
|
||||
p13n.SdKeyScp80Kvn03DesDek,
|
||||
#p13n.SdKeyScp80Kvn03DesEnc,
|
||||
#p13n.SdKeyScp80Kvn03DesMac,
|
||||
p13n.SdKeyScp81Kvn40Dek ,
|
||||
#p13n.SdKeyScp81Kvn40Tlspsk,
|
||||
#p13n.SdKeyScp81Kvn41Dek ,
|
||||
p13n.SdKeyScp81Kvn41Tlspsk,
|
||||
#p13n.SdKeyScp81Kvn42Dek ,
|
||||
#p13n.SdKeyScp81Kvn42Tlspsk,
|
||||
):
|
||||
|
||||
for key_len in sdkey_cls.allow_len:
|
||||
val = '0102030405060708091011121314151617181920212223242526272829303132'
|
||||
expect_clean_val = (b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16'
|
||||
b'\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31\x32')
|
||||
expect_val = '0102030405060708091011121314151617181920212223242526272829303132'
|
||||
|
||||
val = val[:key_len*2]
|
||||
expect_clean_val = expect_clean_val[:key_len]
|
||||
expect_val = val
|
||||
|
||||
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||
|
||||
# test bytes input
|
||||
val = expect_clean_val
|
||||
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||
|
||||
# test bytearray input
|
||||
val = bytearray(expect_clean_val)
|
||||
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||
|
||||
# test BytesIO input
|
||||
val = io.BytesIO(expect_clean_val)
|
||||
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||
|
||||
if key_len == 16:
|
||||
# test huge integer input.
|
||||
# needs to start with nonzero.. stupid
|
||||
val = 11020304050607080910111213141516
|
||||
expect_clean_val = (b'\x11\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15\x16')
|
||||
expect_val = '11020304050607080910111213141516'
|
||||
param_tests.append(Paramtest(param_cls=sdkey_cls, val=val, expect_clean_val=expect_clean_val, expect_val=expect_val))
|
||||
|
||||
outputs = []
|
||||
|
||||
for upp_fname in upp_fnames:
|
||||
test_idx = -1
|
||||
try:
|
||||
|
||||
der = resources.read_binary(smdpp_data.upp, upp_fname)
|
||||
|
||||
for t in param_tests:
|
||||
test_idx += 1
|
||||
logloc = f'{upp_fname} {t.param_cls.__name__}(val={valtypestr(t.val)})'
|
||||
|
||||
param = None
|
||||
try:
|
||||
param = t.param_cls()
|
||||
param.input_value = t.val
|
||||
param.validate()
|
||||
except ValueError as e:
|
||||
raise ValueError(f'{logloc}: {e}') from e
|
||||
|
||||
clean_val = param.value
|
||||
logloc = f'{logloc} clean_val={valtypestr(clean_val)}'
|
||||
if t.expect_clean_val is not None and t.expect_clean_val != clean_val:
|
||||
raise ValueError(f'{logloc}: expected'
|
||||
f' expect_clean_val={valtypestr(t.expect_clean_val)}')
|
||||
|
||||
# on my laptop, deepcopy is about 30% slower than decoding the DER from scratch:
|
||||
# pes = copy.deepcopy(orig_pes)
|
||||
pes = ProfileElementSequence.from_der(der)
|
||||
try:
|
||||
param.apply(pes)
|
||||
except ValueError as e:
|
||||
raise ValueError(f'{logloc} apply_val(clean_val): {e}') from e
|
||||
|
||||
changed_der = pes.to_der()
|
||||
|
||||
pes2 = ProfileElementSequence.from_der(changed_der)
|
||||
|
||||
read_back_val = t.param_cls.get_value_from_pes(pes2)
|
||||
|
||||
# compose log string to show the precise type of dict values
|
||||
if isinstance(read_back_val, dict):
|
||||
types = set()
|
||||
for v in read_back_val.values():
|
||||
types.add(f'{type(v).__name__}')
|
||||
|
||||
read_back_val_type = '{' + ', '.join(types) + '}'
|
||||
else:
|
||||
read_back_val_type = f'{type(read_back_val).__name__}'
|
||||
|
||||
logloc = (f'{logloc} read_back_val={valtypestr(read_back_val)}')
|
||||
|
||||
if isinstance(read_back_val, dict) and not t.param_cls.get_name() in read_back_val.keys():
|
||||
raise ValueError(f'{logloc}: expected to find name {t.param_cls.get_name()!r} in read_back_val')
|
||||
|
||||
expect_val = t.expect_val
|
||||
if not isinstance(expect_val, dict):
|
||||
expect_val = { t.param_cls.get_name(): expect_val }
|
||||
if read_back_val != expect_val:
|
||||
raise ValueError(f'{logloc}: expected {expect_val=!r}:{type(t.expect_val).__name__}')
|
||||
|
||||
ok = logloc.replace(' clean_val', '\n\tclean_val'
|
||||
).replace(' read_back_val', '\n\tread_back_val'
|
||||
).replace('=', '=\t'
|
||||
)
|
||||
output = f'\nok: {ok}'
|
||||
outputs.append(output)
|
||||
print(output)
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f'Error while testing UPP {upp_fname} {test_idx=}: {e}') from e
|
||||
|
||||
output = '\n'.join(outputs) + '\n'
|
||||
xo_name = 'test_configurable_parameters'
|
||||
if update_expected_output:
|
||||
with resources.path(xo, xo_name) as xo_path:
|
||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
else:
|
||||
xo_str = resources.read_text(xo, xo_name)
|
||||
if xo_str != output:
|
||||
at = 0
|
||||
while at < len(output):
|
||||
if output[at] == xo_str[at]:
|
||||
at += 1
|
||||
continue
|
||||
break
|
||||
|
||||
raise RuntimeError(f'output differs from expected output at position {at}: "{output[at:at+20]}" != "{xo_str[at:at+20]}"')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if '-u' in sys.argv:
|
||||
update_expected_output = True
|
||||
sys.argv.remove('-u')
|
||||
unittest.main()
|
||||
@@ -21,7 +21,7 @@ import copy
|
||||
from osmocom.utils import h2b, b2h
|
||||
|
||||
from pySim.esim.saip import *
|
||||
from pySim.esim.saip.personalization import *
|
||||
from pySim.esim.saip import personalization
|
||||
from pprint import pprint as pp
|
||||
|
||||
|
||||
@@ -55,14 +55,56 @@ class SaipTest(unittest.TestCase):
|
||||
def test_personalization(self):
|
||||
"""Test some of the personalization operations."""
|
||||
pes = copy.deepcopy(self.pes)
|
||||
params = [Puk1('01234567'), Puk2(98765432), Pin1('1111'), Pin2(2222), Adm1('11111111'),
|
||||
K(h2b('000102030405060708090a0b0c0d0e0f')), Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
|
||||
params = [personalization.Puk1('01234567'),
|
||||
personalization.Puk2(98765432),
|
||||
personalization.Pin1('1111'),
|
||||
personalization.Pin2(2222),
|
||||
personalization.Adm1('11111111'),
|
||||
personalization.K(h2b('000102030405060708090a0b0c0d0e0f')),
|
||||
personalization.Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
|
||||
for p in params:
|
||||
p.validate()
|
||||
p.apply(pes)
|
||||
# TODO: we don't actually test the results here, but we just verify there is no exception
|
||||
pes.to_der()
|
||||
|
||||
def test_personalization2(self):
|
||||
"""Test some of the personalization operations."""
|
||||
cls = personalization.SdKeyScp80Kvn01DesEnc
|
||||
pes = ProfileElementSequence.from_der(self.per_input)
|
||||
prev_val = tuple(cls.get_values_from_pes(pes))
|
||||
print(f'{prev_val=}')
|
||||
self.assertTrue(prev_val)
|
||||
|
||||
set_val = '42342342342342342342342342342342'
|
||||
param = cls(set_val)
|
||||
param.validate()
|
||||
param.apply(pes)
|
||||
|
||||
get_val1 = tuple(cls.get_values_from_pes(pes))
|
||||
print(f'{get_val1=} {set_val=}')
|
||||
self.assertEqual(get_val1, ({cls.name: set_val},))
|
||||
|
||||
get_val1b = tuple(cls.get_values_from_pes(pes))
|
||||
print(f'{get_val1b=} {set_val=}')
|
||||
self.assertEqual(get_val1b, ({cls.name: set_val},))
|
||||
|
||||
der = pes.to_der()
|
||||
|
||||
get_val1c = tuple(cls.get_values_from_pes(pes))
|
||||
print(f'{get_val1c=} {set_val=}')
|
||||
self.assertEqual(get_val1c, ({cls.name: set_val},))
|
||||
|
||||
# assertTrue to not dump the entire der.
|
||||
# Expecting the modified DER to be different. If this assertion fails, then no change has happened in the output
|
||||
# DER and the ConfigurableParameter subclass is buggy.
|
||||
self.assertTrue(der != self.per_input)
|
||||
|
||||
pes2 = ProfileElementSequence.from_der(der)
|
||||
get_val2 = tuple(cls.get_values_from_pes(pes2))
|
||||
print(f'{get_val2=} {set_val=}')
|
||||
self.assertEqual(get_val2, ({cls.name: set_val},))
|
||||
|
||||
def test_constructor_encode(self):
|
||||
"""Test that DER-encoding of PE created by "empty" constructor works without raising exception."""
|
||||
for cls in [ProfileElementMF, ProfileElementPuk, ProfileElementPin, ProfileElementTelecom,
|
||||
|
||||
216
tests/unittests/test_param_src.py
Executable file
216
tests/unittests/test_param_src.py
Executable file
@@ -0,0 +1,216 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
|
||||
#
|
||||
# Author: Neels Hofmeyr
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import math
|
||||
from importlib import resources
|
||||
import unittest
|
||||
from pySim.esim.saip import param_source
|
||||
|
||||
import xo
|
||||
update_expected_output = False
|
||||
|
||||
class D:
|
||||
mandatory = set()
|
||||
optional = set()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if (set(kwargs.keys()) - set(self.optional)) != set(self.mandatory):
|
||||
raise RuntimeError(f'{self.__class__.__name__}.__init__():'
|
||||
f' {set(kwargs.keys())=!r} - {self.optional=!r} != {self.mandatory=!r}')
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
for k in self.optional:
|
||||
if not hasattr(self, k):
|
||||
setattr(self, k, None)
|
||||
|
||||
decimals = '0123456789'
|
||||
hexadecimals = '0123456789abcdefABCDEF'
|
||||
|
||||
class FakeRandom:
|
||||
vals = b'\xab\xcfm\xf0\x98J_\xcf\x96\x87fp5l\xe7f\xd1\xd6\x97\xc1\xf9]\x8c\x86+\xdb\t^ke\xc1r'
|
||||
i = 0
|
||||
|
||||
@classmethod
|
||||
def next(cls):
|
||||
cls.i = (cls.i + 1) % len(cls.vals)
|
||||
return cls.vals[cls.i]
|
||||
|
||||
@staticmethod
|
||||
def randint(a, b):
|
||||
d = b - a
|
||||
n_bytes = math.ceil(math.log(d, 2))
|
||||
r = int.from_bytes( bytes(FakeRandom.next() for i in range(n_bytes)) )
|
||||
return a + (r % (b - a))
|
||||
|
||||
@staticmethod
|
||||
def randbytes(n):
|
||||
return bytes(FakeRandom.next() for i in range(n))
|
||||
|
||||
|
||||
class ParamSourceTest(unittest.TestCase):
|
||||
|
||||
def test_param_source(self):
|
||||
|
||||
class ParamSourceTest(D):
|
||||
mandatory = (
|
||||
'param_source',
|
||||
'n',
|
||||
'expect',
|
||||
)
|
||||
optional = (
|
||||
'expect_arg',
|
||||
'csv_rows',
|
||||
)
|
||||
|
||||
def expect_const(t, vals):
|
||||
return tuple(t.expect_arg) == tuple(vals)
|
||||
|
||||
def expect_random(t, vals):
|
||||
chars = t.expect_arg.get('digits')
|
||||
repetitions = (t.n - len(set(vals)))
|
||||
if repetitions:
|
||||
raise RuntimeError(f'expect_random: there are {repetitions} repetitions in the returned values: {vals}')
|
||||
for val_i in range(len(vals)):
|
||||
v = vals[val_i]
|
||||
val_minlen = t.expect_arg.get('val_minlen')
|
||||
val_maxlen = t.expect_arg.get('val_maxlen')
|
||||
if len(v) < val_minlen or len(v) > val_maxlen:
|
||||
raise RuntimeError(f'expect_random: invalid length {len(v)} for value [{val_i}]: {v!r}, expecting'
|
||||
f' {val_minlen}..{val_maxlen}')
|
||||
|
||||
if chars is not None and not all(c in chars for c in v):
|
||||
raise RuntimeError(f'expect_random: invalid char in value [{val_i}]: {v!r}')
|
||||
return True
|
||||
|
||||
param_source_tests = [
|
||||
ParamSourceTest(param_source=param_source.ConstantSource.from_str('123'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('123', '123', '123')
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('12345'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 5,
|
||||
'val_maxlen': 5,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('1..999'),
|
||||
n=10,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 1,
|
||||
'val_maxlen': 3,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomDigitSource.from_str('001..999'),
|
||||
n=10,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': decimals,
|
||||
'val_minlen': 3,
|
||||
'val_maxlen': 3,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('12345678'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('0*8'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.RandomHexDigitSource.from_str('00*4'),
|
||||
n=3,
|
||||
expect=expect_random,
|
||||
expect_arg={'digits': hexadecimals,
|
||||
'val_minlen': 8,
|
||||
'val_maxlen': 8,
|
||||
},
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.IncDigitSource.from_str('10001'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('10001', '10002', '10003')
|
||||
),
|
||||
ParamSourceTest(param_source=param_source.CsvSource('column_name'),
|
||||
n=3,
|
||||
expect=expect_const,
|
||||
expect_arg=('first val', 'second val', 'third val'),
|
||||
csv_rows=(
|
||||
{'column_name': 'first val',},
|
||||
{'column_name': 'second val',},
|
||||
{'column_name': 'third val',},
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
outputs = []
|
||||
|
||||
for t in param_source_tests:
|
||||
try:
|
||||
if hasattr(t.param_source, 'random_impl'):
|
||||
t.param_source.random_impl = FakeRandom
|
||||
|
||||
vals = []
|
||||
for i in range(t.n):
|
||||
csv_row = None
|
||||
if t.csv_rows is not None:
|
||||
csv_row = t.csv_rows[i]
|
||||
vals.append( t.param_source.get_next(csv_row=csv_row) )
|
||||
if not t.expect(t, vals):
|
||||
raise RuntimeError(f'invalid values returned: returned {vals}')
|
||||
output = f'ok: {t.param_source.__class__.__name__} {vals=!r}'
|
||||
outputs.append(output)
|
||||
print(output)
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f'{t.param_source.__class__.__name__} {t.n=} {t.expect.__name__}({t.expect_arg!r}): {e}') from e
|
||||
|
||||
output = '\n'.join(outputs) + '\n'
|
||||
xo_name = 'test_param_src'
|
||||
if update_expected_output:
|
||||
with resources.path(xo, xo_name) as xo_path:
|
||||
with open(xo_path, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
else:
|
||||
xo_str = resources.read_text(xo, xo_name)
|
||||
if xo_str != output:
|
||||
at = 0
|
||||
while at < len(output):
|
||||
if output[at] == xo_str[at]:
|
||||
at += 1
|
||||
continue
|
||||
break
|
||||
|
||||
raise RuntimeError(f'output differs from expected output at position {at}: {xo_str[at:at+128]!r}')
|
||||
|
||||
if __name__ == "__main__":
|
||||
if '-u' in sys.argv:
|
||||
update_expected_output = True
|
||||
sys.argv.remove('-u')
|
||||
unittest.main()
|
||||
2496
tests/unittests/xo/test_configurable_parameters
Normal file
2496
tests/unittests/xo/test_configurable_parameters
Normal file
File diff suppressed because it is too large
Load Diff
9
tests/unittests/xo/test_param_src
Normal file
9
tests/unittests/xo/test_param_src
Normal file
@@ -0,0 +1,9 @@
|
||||
ok: ConstantSource vals=['123', '123', '123']
|
||||
ok: RandomDigitSource vals=['13987', '49298', '55670']
|
||||
ok: RandomDigitSource vals=['650', '580', '49', '885', '497', '195', '320', '137', '245', '663']
|
||||
ok: RandomDigitSource vals=['638', '025', '232', '779', '826', '972', '650', '580', '049', '885']
|
||||
ok: RandomHexDigitSource vals=['6b65c172', 'abcf6df0', '984a5fcf']
|
||||
ok: RandomHexDigitSource vals=['96876670', '356ce766', 'd1d697c1']
|
||||
ok: RandomHexDigitSource vals=['f95d8c86', '2bdb095e', '6b65c172']
|
||||
ok: IncDigitSource vals=['10001', '10002', '10003']
|
||||
ok: CsvSource vals=['first val', 'second val', 'third val']
|
||||
Reference in New Issue
Block a user