mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-05-03 15:08:52 +03:00
Compare commits
38 Commits
neels/smsp
...
neels/wip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c224990d74 | ||
|
|
3d232ad5ef | ||
|
|
b2a2154abd | ||
|
|
52875ee71e | ||
|
|
2f867e702a | ||
|
|
43dd5e33be | ||
|
|
5983f387d5 | ||
|
|
b34155fddb | ||
|
|
d0e5f1eb1d | ||
|
|
c5e2ddc987 | ||
|
|
46866a42ea | ||
|
|
1ee69dbcdf | ||
|
|
a66f999cc4 | ||
|
|
6b03546f00 | ||
|
|
4c90f60168 | ||
|
|
02bb0c3e28 | ||
|
|
1e2725db12 | ||
|
|
067c764561 | ||
|
|
b08e5b8c1f | ||
|
|
7c8ec2dd77 | ||
|
|
8225458201 | ||
|
|
feaabf6955 | ||
|
|
84774ef06e | ||
|
|
90ff3dee4e | ||
|
|
da0385a56a | ||
|
|
8cf59eda25 | ||
|
|
bcec3a29b1 | ||
|
|
c8ea5eead7 | ||
|
|
52c8d81957 | ||
|
|
33f3e01a22 | ||
|
|
a921722412 | ||
|
|
09d2e20be4 | ||
|
|
13a0de20a0 | ||
|
|
2bdd22768c | ||
|
|
0f7931d03c | ||
|
|
4a41503b9c | ||
|
|
c50f4b4a02 | ||
|
|
816b31eb07 |
@@ -97,7 +97,7 @@ Please install the following dependencies:
|
||||
- pyscard
|
||||
- pyserial
|
||||
- pytlv
|
||||
- pyyaml >= 5.1
|
||||
- pyyaml >= 5.4
|
||||
- smpp.pdu (from `github.com/hologram-io/smpp.pdu`)
|
||||
- termcolor
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
import hashlib
|
||||
import argparse
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
@@ -436,7 +435,7 @@ def gen_parameters(opts):
|
||||
if not re.match('^[0-9a-fA-F]{32}$', ki):
|
||||
raise ValueError('Ki needs to be 128 bits, in hex format')
|
||||
else:
|
||||
ki = ''.join(['%02x' % random.randrange(0, 256) for i in range(16)])
|
||||
ki = os.urandom(16).hex()
|
||||
|
||||
# OPC (random)
|
||||
if opts.opc is not None:
|
||||
@@ -447,7 +446,7 @@ def gen_parameters(opts):
|
||||
elif opts.op is not None:
|
||||
opc = derive_milenage_opc(ki, opts.op)
|
||||
else:
|
||||
opc = ''.join(['%02x' % random.randrange(0, 256) for i in range(16)])
|
||||
opc = os.urandom(16).hex()
|
||||
|
||||
pin_adm = sanitize_pin_adm(opts.pin_adm, opts.pin_adm_hex)
|
||||
|
||||
|
||||
@@ -16,6 +16,12 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import requests
|
||||
from klein import Klein
|
||||
from twisted.internet import defer, protocol, ssl, task, endpoints, reactor
|
||||
from twisted.internet.posixbase import PosixReactorBase
|
||||
from pathlib import Path
|
||||
from twisted.web.server import Site, Request
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import time
|
||||
@@ -123,10 +129,12 @@ class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||
class DownloadOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/downloadOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType
|
||||
}
|
||||
input_mandatory = ['header']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'iccid': param.Iccid,
|
||||
@@ -137,6 +145,7 @@ class DownloadOrder(Es2PlusApiFunction):
|
||||
class ConfirmOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/confirmOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
@@ -144,7 +153,7 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
'smdsAddress': param.SmdsAddress,
|
||||
'releaseFlag': param.ReleaseFlag,
|
||||
}
|
||||
input_mandatory = ['iccid', 'releaseFlag']
|
||||
input_mandatory = ['header', 'iccid', 'releaseFlag']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'eid': param.Eid,
|
||||
@@ -157,12 +166,13 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
class CancelOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/cancelOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
|
||||
}
|
||||
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
|
||||
input_mandatory = ['header', 'finalProfileStatusIndicator', 'iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -172,9 +182,10 @@ class CancelOrder(Es2PlusApiFunction):
|
||||
class ReleaseProfile(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/releaseProfile'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
}
|
||||
input_mandatory = ['iccid']
|
||||
input_mandatory = ['header', 'iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -184,6 +195,7 @@ class ReleaseProfile(Es2PlusApiFunction):
|
||||
class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType,
|
||||
@@ -192,10 +204,9 @@ class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
'notificationPointStatus': param.NotificationPointStatus,
|
||||
'resultData': param.ResultData,
|
||||
}
|
||||
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
input_mandatory = ['header', 'iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
expected_http_status = 204
|
||||
|
||||
|
||||
class Es2pApiClient:
|
||||
"""Main class representing a full ES2+ API client. Has one method for each API function."""
|
||||
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
|
||||
@@ -206,18 +217,17 @@ class Es2pApiClient:
|
||||
if client_cert:
|
||||
self.session.cert = client_cert
|
||||
|
||||
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
|
||||
self.downloadOrder = JsonHttpApiClient(DownloadOrder(), url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = JsonHttpApiClient(ConfirmOrder(), url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = JsonHttpApiClient(CancelOrder(), url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = JsonHttpApiClient(ReleaseProfile(), url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = JsonHttpApiClient(HandleDownloadProgressInfo(), url_prefix, func_req_id, self.session)
|
||||
|
||||
def _gen_func_id(self) -> str:
|
||||
"""Generate the next function call id."""
|
||||
self.func_id += 1
|
||||
return 'FCI-%u-%u' % (time.time(), self.func_id)
|
||||
|
||||
|
||||
def call_downloadOrder(self, data: dict) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(data, self._gen_func_id())
|
||||
@@ -237,3 +247,116 @@ class Es2pApiClient:
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())
|
||||
|
||||
class Es2pApiServerHandlerSmdpp(abc.ABC):
|
||||
"""ES2+ (SMDP+ side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_downloadOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_confirmOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_cancelOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_releaseProfile(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServerHandlerMno(abc.ABC):
|
||||
"""ES2+ (MNO side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServer(abc.ABC):
|
||||
"""Main class representing a full ES2+ API server. Has one method for each API function."""
|
||||
app = None
|
||||
|
||||
def __init__(self, port: int, interface: str, server_cert: str = None, client_cert_verify: str = None):
|
||||
logger.debug("HTTP SRV: starting ES2+ API server on %s:%s" % (interface, port))
|
||||
self.port = port
|
||||
self.interface = interface
|
||||
if server_cert:
|
||||
self.server_cert = ssl.PrivateCertificate.loadPEM(Path(server_cert).read_text())
|
||||
else:
|
||||
self.server_cert = None
|
||||
if client_cert_verify:
|
||||
self.client_cert_verify = ssl.Certificate.loadPEM(Path(client_cert_verify).read_text())
|
||||
else:
|
||||
self.client_cert_verify = None
|
||||
|
||||
def reactor(self, reactor: PosixReactorBase):
|
||||
logger.debug("HTTP SRV: listen on %s:%s" % (self.interface, self.port))
|
||||
if self.server_cert:
|
||||
if self.client_cert_verify:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(self.client_cert_verify),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenTCP(self.port, Site(self.app.resource()), interface=self.interface)
|
||||
return defer.Deferred()
|
||||
|
||||
class Es2pApiServerSmdpp(Es2pApiServer):
|
||||
"""ES2+ (SMDP+ side) API Server."""
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerSmdpp,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.downloadOrder = JsonHttpApiServer(DownloadOrder(), handler.call_downloadOrder)
|
||||
self.confirmOrder = JsonHttpApiServer(ConfirmOrder(), handler.call_confirmOrder)
|
||||
self.cancelOrder = JsonHttpApiServer(CancelOrder(), handler.call_cancelOrder)
|
||||
self.releaseProfile = JsonHttpApiServer(ReleaseProfile(), handler.call_releaseProfile)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(DownloadOrder.path)
|
||||
def call_downloadOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(request)
|
||||
|
||||
@app.route(ConfirmOrder.path)
|
||||
def call_confirmOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
return self.confirmOrder.call(request)
|
||||
|
||||
@app.route(CancelOrder.path)
|
||||
def call_cancelOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
return self.cancelOrder.call(request)
|
||||
|
||||
@app.route(ReleaseProfile.path)
|
||||
def call_releaseProfile(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
return self.releaseProfile.call(request)
|
||||
|
||||
class Es2pApiServerMno(Es2pApiServer):
|
||||
"""ES2+ (MNO side) API Server."""
|
||||
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerMno,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.handleDownloadProgressInfo = JsonHttpApiServer(HandleDownloadProgressInfo(),
|
||||
handler.call_handleDownloadProgressInfo)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(HandleDownloadProgressInfo.path)
|
||||
def call_handleDownloadProgressInfo(self, request: Request) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(request)
|
||||
|
||||
@@ -155,11 +155,11 @@ class Es9pApiClient:
|
||||
if server_cert_verify:
|
||||
self.session.verify = server_cert_verify
|
||||
|
||||
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
|
||||
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
|
||||
self.handleNotification = HandleNotification(url_prefix, '', self.session)
|
||||
self.cancelSession = CancelSession(url_prefix, '', self.session)
|
||||
self.initiateAuthentication = JsonHttpApiClient(InitiateAuthentication(), url_prefix, '', self.session)
|
||||
self.authenticateClient = JsonHttpApiClient(AuthenticateClient(), url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = JsonHttpApiClient(GetBoundProfilePackage(), url_prefix, '', self.session)
|
||||
self.handleNotification = JsonHttpApiClient(HandleNotification(), url_prefix, '', self.session)
|
||||
self.cancelSession = JsonHttpApiClient(CancelSession(), url_prefix, '', self.session)
|
||||
|
||||
def call_initiateAuthentication(self, data: dict) -> dict:
|
||||
return self.initiateAuthentication.call(data)
|
||||
|
||||
@@ -19,8 +19,10 @@ import abc
|
||||
import requests
|
||||
import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
import base64
|
||||
from twisted.web.server import Request
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
@@ -131,6 +133,16 @@ class JsonResponseHeader(ApiParam):
|
||||
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
|
||||
raise ValueError('Unknown/unspecified status "%s"' % status)
|
||||
|
||||
class JsonRequestHeader(ApiParam):
|
||||
"""SGP.22 section 6.5.1.3."""
|
||||
@classmethod
|
||||
def verify_decoded(cls, data):
|
||||
func_req_id = data.get('functionRequesterIdentifier')
|
||||
if not func_req_id:
|
||||
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
|
||||
func_call_id = data.get('functionCallIdentifier')
|
||||
if not func_call_id:
|
||||
raise ValueError('Missing mandatory functionCallIdentifier in header')
|
||||
|
||||
class HttpStatusError(Exception):
|
||||
pass
|
||||
@@ -161,65 +173,118 @@ class ApiError(Exception):
|
||||
|
||||
class JsonHttpApiFunction(abc.ABC):
|
||||
"""Base class for representing an HTTP[s] API Function."""
|
||||
# the below class variables are expected to be overridden in derived classes
|
||||
# The below class variables are used to describe the properties of the API function. Derived classes are expected
|
||||
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
|
||||
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
|
||||
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
|
||||
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
|
||||
# prefix.
|
||||
|
||||
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder', see also method rewrite_url).
|
||||
path = None
|
||||
|
||||
# dictionary of input parameters. key is parameter name, value is ApiParam class
|
||||
input_params = {}
|
||||
|
||||
# list of mandatory input parameters
|
||||
input_mandatory = []
|
||||
|
||||
# dictionary of output parameters. key is parameter name, value is ApiParam class
|
||||
output_params = {}
|
||||
|
||||
# list of mandatory output parameters (for successful response)
|
||||
output_mandatory = []
|
||||
|
||||
# list of mandatory output parameters (for failed response)
|
||||
output_mandatory_failed = []
|
||||
|
||||
# expected HTTP status code of the response
|
||||
expected_http_status = 200
|
||||
|
||||
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
|
||||
http_method = 'POST'
|
||||
|
||||
# additional custom HTTP headers (client requests)
|
||||
extra_http_req_headers = {}
|
||||
|
||||
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
# additional custom HTTP headers (server responses)
|
||||
extra_http_res_headers = {}
|
||||
|
||||
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
|
||||
def __new__(cls, *args, role = 'legacy_client', **kwargs):
|
||||
"""
|
||||
Args:
|
||||
args: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
role: role ('server' or 'client') in which the JsonHttpApiFunction should be created.
|
||||
kwargs: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
"""
|
||||
|
||||
# Create a dictionary with the class attributes of this class (the properties listed above and the encode_
|
||||
# decode_ methods below). The dictionary will not include any dunder/magic methods
|
||||
cls_attr = {attr_name: getattr(cls, attr_name) for attr_name in dir(cls) if not attr_name.startswith('__')}
|
||||
|
||||
# Normal instantiation as JsonHttpApiFunction:
|
||||
if len(args) == 0 and len(kwargs) == 0:
|
||||
return type(cls.__name__, (abc.ABC,), cls_attr)()
|
||||
|
||||
# Instantiation as as JsonHttpApiFunction with a JsonHttpApiClient or JsonHttpApiServer base
|
||||
if role == 'legacy_client':
|
||||
# Deprecated: With the advent of the server role (JsonHttpApiServer) the API had to be changed. To maintain
|
||||
# compatibility with existing code (out-of-tree) the original behaviour and API interface and behaviour had
|
||||
# to be preserved. Already existing JsonHttpApiFunction definitions will still work and the related objects
|
||||
# may still be created on the original way: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session)
|
||||
logger.warning('implicit role (falling back to legacy JsonHttpApiClient) is deprecated, please specify role explcitly')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
result.legacy = True
|
||||
return result
|
||||
elif role == 'client':
|
||||
# Create a JsonHttpApiFunction in client role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='client')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
elif role == 'server':
|
||||
# Create a JsonHttpApiFunction in server role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='server')
|
||||
result = type(cls.__name__, (JsonHttpApiServer,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
else:
|
||||
raise ValueError('Invalid role \'%s\' specified' % role)
|
||||
|
||||
def encode_client(self, data: dict) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for request body."""
|
||||
output = {}
|
||||
if func_call_id:
|
||||
output['header'] = {
|
||||
'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id
|
||||
}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
# pySim/esim/http_json_api.py:269:47: E1101: Instance of 'JsonHttpApiFunction' has no 'legacy' member (no-member)
|
||||
# pylint: disable=no-member
|
||||
if hasattr(self, 'legacy') and self.legacy:
|
||||
output[p] = JsonRequestHeader.encode(v)
|
||||
else:
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode(self, data: dict) -> dict:
|
||||
def decode_client(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the response body."""
|
||||
output = {}
|
||||
if 'header' in self.output_params:
|
||||
# let's first do the header, it's special
|
||||
if not 'header' in data:
|
||||
raise ValueError('Mandatory output parameter "header" missing')
|
||||
hdr_class = self.output_params.get('header')
|
||||
output['header'] = hdr_class.decode(data['header'])
|
||||
output_mandatory = self.output_mandatory
|
||||
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
# we can only expect mandatory parameters to be present in case of successful execution
|
||||
for p in self.output_mandatory:
|
||||
if p == 'header':
|
||||
continue
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
@@ -231,35 +296,195 @@ class JsonHttpApiFunction(abc.ABC):
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
def encode_server(self, data: dict) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.output_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_server(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the request body."""
|
||||
output = {}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
def rewrite_url(self, data: dict, url: str) -> Tuple[dict, str]:
|
||||
"""
|
||||
Rewrite a static URL using information passed in the data dict. This method may be overloaded by a derived
|
||||
class to allow fully dynamic URLs. The input parameters required for the URL rewriting may be passed using
|
||||
data parameter. In case those parameters are additional parameters that are not intended to be passed to
|
||||
the encode_client method later, they must be removed explcitly.
|
||||
|
||||
Args:
|
||||
data: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
url: statically generated URL string (see comment in JsonHttpApiClient)
|
||||
"""
|
||||
|
||||
# This implementation is a placeholder in which we do not perform any URL rewriting. We just pass through data
|
||||
# and url unmodified.
|
||||
return data, url
|
||||
|
||||
class JsonHttpApiClient():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
|
||||
session: requests.Session):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
url_prefix : prefix to be put in front of the API function path (see JsonHttpApiFunction)
|
||||
func_req_id : function requestor id to use for requests
|
||||
session : session object (requests)
|
||||
"""
|
||||
self.api_func = api_func
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
|
||||
"""Make an API call to the HTTP API endpoint represented by this object.
|
||||
Input data is passed in `data` as json-serializable dict. Output data
|
||||
is returned as json-deserialized dict."""
|
||||
url = self.url_prefix + self.path
|
||||
encoded = json.dumps(self.encode(data, func_call_id))
|
||||
"""
|
||||
Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
|
||||
json-serializable fields. `data` may also contain additional parameters required for URL rewriting (see
|
||||
rewrite_url in class JsonHttpApiFunction). Output data is returned as json-deserialized dict.
|
||||
|
||||
Args:
|
||||
data: Input data required to perform the request.
|
||||
func_call_id: Function Call Identifier, if present a header field is generated automatically.
|
||||
timeout: Maximum amount of time to wait for the request to complete.
|
||||
"""
|
||||
|
||||
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
|
||||
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
|
||||
# field is checked by the encode_client method)
|
||||
if func_call_id:
|
||||
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id}} | data
|
||||
|
||||
# The URL used for the HTTP request (see below) normally consists of the initially given url_prefix
|
||||
# concatenated with the path defined by the JsonHttpApiFunction definition. This static URL path may be
|
||||
# rewritten by rewrite_url method defined in the JsonHttpApiFunction.
|
||||
data, url = self.api_func.rewrite_url(data, self.url_prefix + self.api_func.path)
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_client(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
req_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
req_headers.update(self.extra_http_req_headers)
|
||||
req_headers.update(self.api_func.extra_http_req_headers)
|
||||
|
||||
# Perform HTTP request
|
||||
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
|
||||
response = self.session.request(self.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
|
||||
logger.debug("HTTP RSP: %s" % (response.content))
|
||||
|
||||
if response.status_code != self.expected_http_status:
|
||||
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
|
||||
# SGP.22, section 6.5.1)
|
||||
if response.status_code != self.api_func.expected_http_status:
|
||||
raise HttpStatusError(response)
|
||||
if not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
if response.content and not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
raise HttpHeaderError(response)
|
||||
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
|
||||
raise HttpHeaderError(response)
|
||||
|
||||
# Decode response and return the result back to the caller
|
||||
if response.content:
|
||||
if response.headers.get('Content-Type').startswith('application/json'):
|
||||
return self.decode(response.json())
|
||||
elif response.headers.get('Content-Type').startswith('text/plain;charset=UTF-8'):
|
||||
return { 'data': response.content.decode('utf-8') }
|
||||
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
|
||||
|
||||
output = self.api_func.decode_client(response.json())
|
||||
# In case the response contains a header, check it to make sure that the API call was executed successfully
|
||||
# (the presence of the header field is checked by the decode_client method)
|
||||
if 'header' in output:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
return output
|
||||
return None
|
||||
|
||||
class JsonHttpApiServer():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
call_handler : handler function to process the request. This function must accept the
|
||||
decoded request as a dictionary. The handler function must return a tuple consisting
|
||||
of the response in the form of a dictionary (may be empty), and a function execution
|
||||
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
|
||||
"""
|
||||
self.api_func = api_func
|
||||
if call_handler:
|
||||
self.call_handler = call_handler
|
||||
else:
|
||||
self.call_handler = self.default_handler
|
||||
|
||||
def default_handler(self, data: dict) -> (dict, str):
|
||||
"""default handler, used in case no call handler is provided."""
|
||||
logger.error("no handler function for request: %s" % str(data))
|
||||
return {}, 'Failed'
|
||||
|
||||
def call(self, request: Request) -> str:
|
||||
""" Process an incoming request.
|
||||
Args:
|
||||
request : request object as received using twisted.web.server
|
||||
Returns:
|
||||
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
|
||||
provided the request object)
|
||||
"""
|
||||
|
||||
# Make sure the request is done with the correct HTTP method
|
||||
if (request.method.decode() != self.api_func.http_method):
|
||||
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
|
||||
|
||||
# Decode the request
|
||||
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
|
||||
|
||||
# Run call handler (see above)
|
||||
data, fe_status = self.call_handler(decoded_request)
|
||||
|
||||
# In case a function execution status is returned, use it to generate and prepend the header field according to
|
||||
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
|
||||
if fe_status:
|
||||
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_server(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
res_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
res_headers.update(self.api_func.extra_http_res_headers)
|
||||
for header, value in res_headers.items():
|
||||
request.setHeader(header, value)
|
||||
request.setResponseCode(self.api_func.expected_http_status)
|
||||
|
||||
# Return the encoded result back to the caller for sending (using twisted/klein)
|
||||
return encoded
|
||||
|
||||
|
||||
@@ -20,9 +20,6 @@
|
||||
|
||||
import copy
|
||||
import pprint
|
||||
import logging
|
||||
import traceback
|
||||
import inspect
|
||||
from typing import List, Generator
|
||||
from pySim.esim.saip.personalization import ConfigurableParameter
|
||||
from pySim.esim.saip import param_source
|
||||
@@ -30,10 +27,6 @@ from pySim.esim.saip import ProfileElementSequence, ProfileElementSD
|
||||
from pySim.global_platform import KeyUsageQualifier
|
||||
from osmocom.utils import b2h
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
def _func_():
|
||||
return inspect.currentframe().f_back.f_code.co_name
|
||||
|
||||
class BatchPersonalization:
|
||||
"""Produce a series of eSIM profiles from predefined parameters.
|
||||
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
|
||||
@@ -64,15 +57,18 @@ class BatchPersonalization:
|
||||
"""
|
||||
|
||||
class ParamAndSrc:
|
||||
'tie a ConfigurableParameter to a source of actual values'
|
||||
"""tie a ConfigurableParameter to a source of actual values"""
|
||||
def __init__(self, param: ConfigurableParameter, src: param_source.ParamSource):
|
||||
self.param = param
|
||||
if isinstance(param, type):
|
||||
self.param_cls = param
|
||||
else:
|
||||
self.param_cls = param.__class__
|
||||
self.src = src
|
||||
|
||||
def __init__(self,
|
||||
n: int,
|
||||
src_pes: ProfileElementSequence,
|
||||
params: list[ParamAndSrc]=None,
|
||||
params: list[ParamAndSrc]=[],
|
||||
csv_rows: Generator=None,
|
||||
):
|
||||
"""
|
||||
@@ -81,10 +77,10 @@ class BatchPersonalization:
|
||||
copied.
|
||||
params: list of ParamAndSrc instances, defining a ConfigurableParameter and corresponding ParamSource to fill in
|
||||
profile values.
|
||||
csv_rows: A list or generator producing all CSV rows one at a time, starting with a row containing the column
|
||||
headers. This is compatible with the python csv.reader. Each row gets passed to
|
||||
ParamSource.get_next(), such that ParamSource implementations can access the row items.
|
||||
See param_source.CsvSource.
|
||||
csv_rows: A generator (e.g. iter(list_of_rows)) producing all CSV rows one at a time, starting with a row
|
||||
containing the column headers. This is compatible with the python csv.reader. Each row gets passed to
|
||||
ParamSource.get_next(), such that ParamSource implementations can access the row items. See
|
||||
param_source.CsvSource.
|
||||
"""
|
||||
self.n = n
|
||||
self.params = params or []
|
||||
@@ -92,7 +88,7 @@ class BatchPersonalization:
|
||||
self.csv_rows = csv_rows
|
||||
|
||||
def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
|
||||
self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src))
|
||||
self.params.append(BatchPersonalization.ParamAndSrc(param, src))
|
||||
|
||||
def generate_profiles(self):
|
||||
# get first row of CSV: column names
|
||||
@@ -119,12 +115,10 @@ class BatchPersonalization:
|
||||
try:
|
||||
input_value = p.src.get_next(csv_row=csv_row)
|
||||
assert input_value is not None
|
||||
value = p.param.__class__.validate_val(input_value)
|
||||
p.param.__class__.apply_val(pes, value)
|
||||
value = p.param_cls.validate_val(input_value)
|
||||
p.param_cls.apply_val(pes, value)
|
||||
except Exception as e:
|
||||
print(traceback.format_exc())
|
||||
logger.error('during %s: %r', _func_(), e)
|
||||
raise ValueError(f'{p.param.name} fed by {p.src.name}: {e!r}') from e
|
||||
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e
|
||||
|
||||
yield pes
|
||||
|
||||
@@ -138,7 +132,7 @@ class UppAudit(dict):
|
||||
|
||||
@classmethod
|
||||
def from_der(cls, der: bytes, params: List, der_size=False, additional_sd_keys=False):
|
||||
'''return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
|
||||
"""return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
|
||||
some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
|
||||
both 'IMSI' and 'IMSI-ACC' parameters.
|
||||
|
||||
@@ -160,7 +154,7 @@ class UppAudit(dict):
|
||||
Scp80Kvn03. So we would not show kvn 0x04..0x0f in an audit. additional_sd_keys=True includes audits of all SD
|
||||
key KVN there may be in the UPP. This helps to spot SD keys that may already be present in a UPP template, with
|
||||
unexpected / unusual kvn.
|
||||
'''
|
||||
"""
|
||||
|
||||
# make an instance of this class
|
||||
upp_audit = cls()
|
||||
@@ -326,7 +320,7 @@ class BatchAudit(list):
|
||||
return batch_audit
|
||||
|
||||
def to_csv_rows(self, headers=True, sort_key=None):
|
||||
'''generator that yields all audits' values as rows, useful feed to a csv.writer.'''
|
||||
"""generator that yields all audits' values as rows, useful feed to a csv.writer."""
|
||||
columns = set()
|
||||
for audit in self:
|
||||
columns.update(audit.keys())
|
||||
|
||||
@@ -37,13 +37,10 @@ class ParamSource:
|
||||
name = "none"
|
||||
numeric_base = None # or 10 or 16
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
"""Subclasses implement this:
|
||||
if a parameter source defines some string input magic, override this function.
|
||||
For example, a RandomDigitSource derives the number of digits from the string length,
|
||||
so the user can enter '0000' to get a four digit random number."""
|
||||
return cls(s)
|
||||
def __init__(self, input_str:str):
|
||||
"""Subclasses should call super().__init__(input_str) before evaluating self.input_str. Each subclass __init__()
|
||||
may in turn manipulate self.input_str to apply expansions or decodings."""
|
||||
self.input_str = input_str
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
"""Subclasses implement this: return the next value from the parameter source.
|
||||
@@ -51,78 +48,81 @@ class ParamSource:
|
||||
This default implementation is an empty source."""
|
||||
raise ParamSourceExhaustedExn()
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, input_str:str):
|
||||
"""compatibility with earlier version of ParamSource. Just use the constructor."""
|
||||
return cls(input_str)
|
||||
|
||||
class ConstantSource(ParamSource):
|
||||
"""one value for all"""
|
||||
name = "constant"
|
||||
|
||||
def __init__(self, val:str):
|
||||
self.val = val
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
return self.val
|
||||
return self.input_str
|
||||
|
||||
class InputExpandingParamSource(ParamSource):
|
||||
|
||||
def __init__(self, input_str:str):
|
||||
super().__init__(input_str)
|
||||
self.input_str = self.expand_input_str(self.input_str)
|
||||
|
||||
@classmethod
|
||||
def expand_str(cls, s:str):
|
||||
def expand_input_str(cls, input_str:str):
|
||||
# user convenience syntax '0*32' becomes '00000000000000000000000000000000'
|
||||
if "*" not in s:
|
||||
return s
|
||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", s)
|
||||
if "*" not in input_str:
|
||||
return input_str
|
||||
# re: "XX * 123" with optional spaces
|
||||
tokens = re.split(r"([^ \t]+)[ \t]*\*[ \t]*([0-9]+)", input_str)
|
||||
if len(tokens) < 3:
|
||||
return s
|
||||
return input_str
|
||||
parts = []
|
||||
for unchanged, snippet, repeat_str in zip(tokens[0::3], tokens[1::3], tokens[2::3]):
|
||||
parts.append(unchanged)
|
||||
repeat = int(repeat_str)
|
||||
parts.append(snippet * repeat)
|
||||
return "".join(parts)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
return cls(cls.expand_str(s))
|
||||
return "".join(parts)
|
||||
|
||||
class DecimalRangeSource(InputExpandingParamSource):
|
||||
"""abstract: decimal numbers with a value range"""
|
||||
|
||||
numeric_base = 10
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
"""
|
||||
See also from_str().
|
||||
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
|
||||
"""Constructor to set up values from a (user entered) string: DecimalRangeSource(input_str).
|
||||
Constructor to set up values directly: DecimalRangeSource(num_digits=3, first_value=123, last_value=456)
|
||||
|
||||
All arguments are integer values, and are converted to int if necessary, so a string of an integer is fine.
|
||||
num_digits: fixed number of digits (possibly with leading zeros) to generate.
|
||||
first_value, last_value: the decimal range in which to provide digits.
|
||||
num_digits produces leading zeros when first_value..last_value are shorter.
|
||||
"""
|
||||
num_digits = int(num_digits)
|
||||
first_value = int(first_value)
|
||||
last_value = int(last_value)
|
||||
assert ((input_str is not None and (num_digits, first_value, last_value) == (None, None, None))
|
||||
or (input_str is None and None not in (num_digits, first_value, last_value)))
|
||||
|
||||
if input_str is not None:
|
||||
super().__init__(input_str)
|
||||
|
||||
input_str = self.input_str
|
||||
|
||||
if ".." in input_str:
|
||||
first_str, last_str = input_str.split('..')
|
||||
first_str = first_str.strip()
|
||||
last_str = last_str.strip()
|
||||
else:
|
||||
first_str = input_str.strip()
|
||||
last_str = None
|
||||
|
||||
num_digits = len(first_str)
|
||||
first_value = int(first_str)
|
||||
last_value = int(last_str if last_str is not None else "9" * num_digits)
|
||||
|
||||
assert num_digits > 0
|
||||
assert first_value <= last_value
|
||||
self.num_digits = num_digits
|
||||
self.val_first_last = (first_value, last_value)
|
||||
self.first_value = first_value
|
||||
self.last_value = last_value
|
||||
|
||||
def val_to_digit(self, val:int):
|
||||
return "%0*d" % (self.num_digits, val) # pylint: disable=consider-using-f-string
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
|
||||
if ".." in s:
|
||||
first_str, last_str = s.split('..')
|
||||
first_str = first_str.strip()
|
||||
last_str = last_str.strip()
|
||||
else:
|
||||
first_str = s.strip()
|
||||
last_str = None
|
||||
|
||||
first_value = int(first_str)
|
||||
last_value = int(last_str) if last_str is not None else "9" * len(first_str)
|
||||
return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
|
||||
|
||||
class RandomSourceMixin:
|
||||
random_impl = secrets.SystemRandom()
|
||||
|
||||
@@ -135,7 +135,7 @@ class RandomDigitSource(DecimalRangeSource, RandomSourceMixin):
|
||||
# try to generate random digits that are always different from previously produced random bytes
|
||||
attempts = 10
|
||||
while True:
|
||||
val = self.random_impl.randint(*self.val_first_last)
|
||||
val = self.random_impl.randint(self.first_value, self.last_value)
|
||||
if val in RandomDigitSource.used_keys:
|
||||
attempts -= 1
|
||||
if attempts:
|
||||
@@ -150,9 +150,11 @@ class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
numeric_base = 16
|
||||
used_keys = set()
|
||||
|
||||
def __init__(self, num_digits):
|
||||
"""see from_str()"""
|
||||
num_digits = int(num_digits)
|
||||
def __init__(self, input_str:str):
|
||||
super().__init__(input_str)
|
||||
input_str = self.input_str
|
||||
|
||||
num_digits = len(input_str.strip())
|
||||
if num_digits < 1:
|
||||
raise ValueError("zero number of digits")
|
||||
# hex digits always come in two
|
||||
@@ -174,23 +176,20 @@ class RandomHexDigitSource(InputExpandingParamSource, RandomSourceMixin):
|
||||
|
||||
return b2h(val)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, s:str):
|
||||
s = cls.expand_str(s)
|
||||
return cls(num_digits=len(s.strip()))
|
||||
|
||||
class IncDigitSource(DecimalRangeSource):
|
||||
"""incrementing sequence of digits"""
|
||||
name = "incrementing decimal digits"
|
||||
|
||||
def __init__(self, num_digits, first_value, last_value):
|
||||
super().__init__(num_digits, first_value, last_value)
|
||||
def __init__(self, input_str:str=None, num_digits:int=None, first_value:int=None, last_value:int=None):
|
||||
"""input_str: the first value to return, a string of an integer number with optional leading zero digits. The
|
||||
leading zero digits are preserved."""
|
||||
super().__init__(input_str, num_digits, first_value, last_value)
|
||||
self.next_val = None
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
"""Restart from the first value of the defined range passed to __init__()."""
|
||||
self.next_val = self.val_first_last[0]
|
||||
self.next_val = self.first_value
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = self.next_val
|
||||
@@ -200,7 +199,7 @@ class IncDigitSource(DecimalRangeSource):
|
||||
returnval = self.val_to_digit(val)
|
||||
|
||||
val += 1
|
||||
if val > self.val_first_last[1]:
|
||||
if val > self.last_value:
|
||||
self.next_val = None
|
||||
else:
|
||||
self.next_val = val
|
||||
@@ -211,13 +210,15 @@ class CsvSource(ParamSource):
|
||||
"""apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)"""
|
||||
name = "from CSV"
|
||||
|
||||
def __init__(self, csv_column):
|
||||
def __init__(self, input_str:str):
|
||||
"""self.csv_column = input_str:
|
||||
column name indicating the column to use for this parameter.
|
||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
||||
CsvSource picks the column with the name matching csv_column.
|
||||
"""
|
||||
csv_column: column name indicating the column to use for this parameter.
|
||||
This name is used in get_next(): the caller passes the current CSV row to get_next(), from which
|
||||
CsvSource picks the column with the name matching csv_column.
|
||||
"""
|
||||
self.csv_column = csv_column
|
||||
"""Parse input_str into self.num_digits, self.first_value, self.last_value."""
|
||||
super().__init__(input_str)
|
||||
self.csv_column = self.input_str
|
||||
|
||||
def get_next(self, csv_row:dict=None):
|
||||
val = None
|
||||
|
||||
@@ -330,6 +330,7 @@ class DecimalHexParam(DecimalParam):
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
val = super().validate_val(val)
|
||||
assert isinstance(val, str)
|
||||
val = ''.join('%02x' % ord(x) for x in val)
|
||||
if cls.rpad is not None:
|
||||
c = cls.rpad_char
|
||||
@@ -339,7 +340,7 @@ class DecimalHexParam(DecimalParam):
|
||||
|
||||
@classmethod
|
||||
def decimal_hex_to_str(cls, val):
|
||||
'useful for get_values_from_pes() implementations of subclasses'
|
||||
"""useful for get_values_from_pes() implementations of subclasses"""
|
||||
if isinstance(val, bytes):
|
||||
val = b2h(val)
|
||||
assert isinstance(val, hexstr)
|
||||
@@ -618,11 +619,28 @@ class SmspTpScAddr(ConfigurableParameter):
|
||||
ef_smsp_dec['tp_sc_addr']['ton_npi']['type_of_number'] = 'international' if international else 'unknown'
|
||||
# ensure the parameter_indicators.tp_sc_addr is True
|
||||
ef_smsp_dec['parameter_indicators']['tp_sc_addr'] = True
|
||||
|
||||
# alpha_id padding: to make room for a human readable SMSC name that can be provisioned to the profile later
|
||||
# on, alpha_id needs to be empty but padded 0xff to some length.
|
||||
# - alpha_id is optional, setting alpha_id = '' ensures the IE is present.
|
||||
# - the length of the file is 28+Y where Y is the length of the alpha_id -- here the intended length of our padding
|
||||
# (see 3GPP TS 31.102 4.2.27 EF.SMSP). So if we want a maximum length of alpha_id = 14, we set the total
|
||||
# file size to 28+14 = 42.
|
||||
# - this file size has to go in two places: encode_record_bin() needs to know the length to encode the right
|
||||
# length of fillFileContent.
|
||||
# - the f_smsp needs to show the right file size in the PES, as in
|
||||
# 'ef-smsp': [('fileDescriptor', {'efFileSize': '2a', ...
|
||||
# (where 2a == 42)
|
||||
# - To generate the right amount of fillFileContent, pass total_len=42 to encode_record_bin().
|
||||
# - To show the right size in the PES, set f_smsp.rec_len = 42
|
||||
ef_smsp_dec['alpha_id'] = ''
|
||||
# re-encode into the File body
|
||||
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1)
|
||||
f_smsp.rec_len = 42
|
||||
|
||||
# re-encode into the File body.
|
||||
#
|
||||
#print("SMSP (new): %s" % f_smsp.body)
|
||||
# re-generate the pe.decoded member from the File instance
|
||||
f_smsp.body = ef_smsp.encode_record_bin(ef_smsp_dec, 1, total_len=f_smsp.rec_len)
|
||||
pe.file2pe(f_smsp)
|
||||
|
||||
@classmethod
|
||||
@@ -674,10 +692,8 @@ class MncLen(ConfigurableParameter):
|
||||
for pe in pes.get_pes_for_type('usim'):
|
||||
if not hasattr(pe, 'files'):
|
||||
continue
|
||||
f_ad = pe.files.get('ef-ad')
|
||||
if not f_ad:
|
||||
continue
|
||||
# decode existing values
|
||||
f_ad = pe.files['ef-ad']
|
||||
if not f_ad.body:
|
||||
continue
|
||||
try:
|
||||
@@ -695,24 +711,25 @@ class MncLen(ConfigurableParameter):
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for pe in pes.get_pes_for_type('usim'):
|
||||
if not hasattr(pe, 'files'):
|
||||
continue
|
||||
f_ad = pe.files.get('ef-ad', None)
|
||||
if f_ad is None:
|
||||
continue
|
||||
for naa in ('isim',):# 'isim', 'csim'):
|
||||
for pe in pes.get_pes_for_type(naa):
|
||||
if not hasattr(pe, 'files'):
|
||||
continue
|
||||
f_ad = pe.files.get('ef-ad', None)
|
||||
if f_ad is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
ef_ad = EF_AD()
|
||||
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
|
||||
except StreamError:
|
||||
continue
|
||||
try:
|
||||
ef_ad = EF_AD()
|
||||
ef_ad_dec = ef_ad.decode_bin(f_ad.body)
|
||||
except StreamError:
|
||||
continue
|
||||
|
||||
mnc_len = ef_ad_dec.get('mnc_len', None)
|
||||
if mnc_len is None:
|
||||
continue
|
||||
mnc_len = ef_ad_dec.get('mnc_len', None)
|
||||
if mnc_len is None:
|
||||
continue
|
||||
|
||||
yield { cls.name: str(mnc_len) }
|
||||
yield { cls.name: str(mnc_len) }
|
||||
|
||||
|
||||
class SdKey(BinaryParam):
|
||||
|
||||
@@ -6,7 +6,7 @@ jsonpath-ng
|
||||
construct>=2.10.70
|
||||
bidict
|
||||
pyosmocom>=0.0.12
|
||||
pyyaml>=5.1
|
||||
pyyaml>=5.4
|
||||
termcolor
|
||||
colorlog
|
||||
pycryptodomex
|
||||
|
||||
2
setup.py
2
setup.py
@@ -26,7 +26,7 @@ setup(
|
||||
"construct >= 2.10.70",
|
||||
"bidict",
|
||||
"pyosmocom >= 0.0.12",
|
||||
"pyyaml >= 5.1",
|
||||
"pyyaml >= 5.4",
|
||||
"termcolor",
|
||||
"colorlog",
|
||||
"pycryptodomex",
|
||||
|
||||
@@ -267,16 +267,6 @@ class ConfigurableParameterTest(unittest.TestCase):
|
||||
'11111111111111111111111111111111'
|
||||
'22222222222222222222222222222222'),
|
||||
|
||||
|
||||
Paramtest(param_cls=p13n.MncLen,
|
||||
val='2',
|
||||
expect_clean_val=2,
|
||||
expect_val='2'),
|
||||
Paramtest(param_cls=p13n.MncLen,
|
||||
val=3,
|
||||
expect_clean_val=3,
|
||||
expect_val='3'),
|
||||
|
||||
]
|
||||
|
||||
for sdkey_cls in (
|
||||
|
||||
Reference in New Issue
Block a user