mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-04-10 15:01:05 +03:00
Compare commits
49 Commits
osmith/sai
...
neels/saip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4b5841f78 | ||
|
|
efc3f06f38 | ||
|
|
7d73ebf641 | ||
|
|
462346bb71 | ||
|
|
0c33330056 | ||
|
|
aa2eaee266 | ||
|
|
03e17a7366 | ||
|
|
59bec08eae | ||
|
|
5c34d36b48 | ||
|
|
5c74d5b4b3 | ||
|
|
6b2e5645b3 | ||
|
|
f9b581a985 | ||
|
|
789734d8a5 | ||
|
|
d364174ca0 | ||
|
|
b52d3ab9ed | ||
|
|
49d4ba38e0 | ||
|
|
8cc7421faa | ||
|
|
44dbbe5509 | ||
|
|
8b1db4f0d2 | ||
|
|
ae8465532c | ||
|
|
0f00bd14d8 | ||
|
|
ff7c252d63 | ||
|
|
f8aa78d370 | ||
|
|
ef995383a6 | ||
|
|
b39f69ba89 | ||
|
|
3bee435c2a | ||
|
|
9e33ae0486 | ||
|
|
913e8d12d4 | ||
|
|
346e2aeb01 | ||
|
|
b71fa5307e | ||
|
|
e065cd50ed | ||
|
|
5eec7b7478 | ||
|
|
9bdf1e3dee | ||
|
|
d5e550dcc4 | ||
|
|
8fb451b732 | ||
|
|
eca2fd39c0 | ||
|
|
2c9374ac53 | ||
|
|
c9c5bd2470 | ||
|
|
9bc795e7a1 | ||
|
|
7886e59f82 | ||
|
|
a65886ca3f | ||
|
|
d8f3c78135 | ||
|
|
6b9b46a5a4 | ||
|
|
b6b4501e37 | ||
|
|
54658fa3a9 | ||
|
|
eb04bb1082 | ||
|
|
453fde5a3a | ||
|
|
57237b650e | ||
|
|
1f94791240 |
@@ -141,7 +141,7 @@ class SmppHandler:
|
||||
tuple containing the last response data and the last status word as byte strings
|
||||
"""
|
||||
|
||||
logger.info("C-APDU sending: %s..." % b2h(apdu))
|
||||
logger.info("C-APDU sending: %s...", b2h(apdu))
|
||||
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
@@ -194,19 +194,19 @@ if __name__ == '__main__':
|
||||
option_parser.add_argument('--tar', required=True, type=is_hexstr, help='Toolkit Application Reference')
|
||||
option_parser.add_argument("--cntr_req", choices=CNTR_REQ.decmapping.values(), default='no_counter',
|
||||
help="Counter requirement")
|
||||
option_parser.add_argument('--ciphering', default=True, type=bool, help='Enable ciphering')
|
||||
option_parser.add_argument('--no-ciphering', action='store_true', default=False, help='Disable ciphering')
|
||||
option_parser.add_argument("--rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="message check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument('--por-in-submit', default=False, type=bool,
|
||||
option_parser.add_argument('--por-in-submit', action='store_true', default=False,
|
||||
help='require PoR to be sent via SMS-SUBMIT')
|
||||
option_parser.add_argument('--por-shall-be-ciphered', default=True, type=bool, help='require encrypted PoR')
|
||||
option_parser.add_argument('--por-no-ciphering', action='store_true', default=False, help='Disable ciphering (PoR)')
|
||||
option_parser.add_argument("--por-rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc',
|
||||
help="PoR check (rc=redundency check, cc=crypt. checksum, ds=digital signature)")
|
||||
option_parser.add_argument("--por_req", choices=POR_REQ.decmapping.values(), default='por_required',
|
||||
help="Proof of Receipt requirements")
|
||||
option_parser.add_argument('--src-addr', default='12', type=str, help='TODO')
|
||||
option_parser.add_argument('--dest-addr', default='23', type=str, help='TODO')
|
||||
option_parser.add_argument('--timeout', default=10, type=int, help='TODO')
|
||||
option_parser.add_argument('--src-addr', default='12', type=str, help='SMS source address (MSISDN)')
|
||||
option_parser.add_argument('--dest-addr', default='23', type=str, help='SMS destination address (MSISDN)')
|
||||
option_parser.add_argument('--timeout', default=10, type=int, help='Maximum response waiting time')
|
||||
option_parser.add_argument('-a', '--apdu', action='append', required=True, type=is_hexstr, help='C-APDU to send')
|
||||
opts = option_parser.parse_args()
|
||||
|
||||
@@ -214,18 +214,22 @@ if __name__ == '__main__':
|
||||
format='%(asctime)s %(levelname)s %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S')
|
||||
|
||||
if opts.kic_idx != opts.kid_idx:
|
||||
logger.warning("KIC index (%s) and KID index (%s) are different (security violation, card should reject message)",
|
||||
opts.kic_idx, opts.kid_idx)
|
||||
|
||||
ota_keyset = OtaKeyset(algo_crypt=opts.algo_crypt,
|
||||
kic_idx=opts.kic_idx,
|
||||
kic=h2b(opts.kic),
|
||||
algo_auth=opts.algo_auth,
|
||||
kid_idx=opts.kic_idx,
|
||||
kid_idx=opts.kid_idx,
|
||||
kid=h2b(opts.kid),
|
||||
cntr=opts.cntr)
|
||||
spi = {'counter' : opts.cntr_req,
|
||||
'ciphering' : opts.ciphering,
|
||||
'ciphering' : not opts.no_ciphering,
|
||||
'rc_cc_ds': opts.rc_cc_ds,
|
||||
'por_in_submit':opts.por_in_submit,
|
||||
'por_shall_be_ciphered':opts.por_shall_be_ciphered,
|
||||
'por_in_submit': opts.por_in_submit,
|
||||
'por_shall_be_ciphered': not opts.por_no_ciphering,
|
||||
'por_rc_cc_ds': opts.por_rc_cc_ds,
|
||||
'por': opts.por_req}
|
||||
apdu = h2b("".join(opts.apdu))
|
||||
|
||||
@@ -16,12 +16,6 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import requests
|
||||
from klein import Klein
|
||||
from twisted.internet import defer, protocol, ssl, task, endpoints, reactor
|
||||
from twisted.internet.posixbase import PosixReactorBase
|
||||
from pathlib import Path
|
||||
from twisted.web.server import Site, Request
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import time
|
||||
@@ -129,12 +123,10 @@ class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||
class DownloadOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/downloadOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType
|
||||
}
|
||||
input_mandatory = ['header']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'iccid': param.Iccid,
|
||||
@@ -145,7 +137,6 @@ class DownloadOrder(Es2PlusApiFunction):
|
||||
class ConfirmOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/confirmOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
@@ -153,7 +144,7 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
'smdsAddress': param.SmdsAddress,
|
||||
'releaseFlag': param.ReleaseFlag,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'releaseFlag']
|
||||
input_mandatory = ['iccid', 'releaseFlag']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
'eid': param.Eid,
|
||||
@@ -166,13 +157,12 @@ class ConfirmOrder(Es2PlusApiFunction):
|
||||
class CancelOrder(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/cancelOrder'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
'eid': param.Eid,
|
||||
'matchingId': param.MatchingId,
|
||||
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
|
||||
}
|
||||
input_mandatory = ['header', 'finalProfileStatusIndicator', 'iccid']
|
||||
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -182,10 +172,9 @@ class CancelOrder(Es2PlusApiFunction):
|
||||
class ReleaseProfile(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/releaseProfile'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'iccid': param.Iccid,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid']
|
||||
input_mandatory = ['iccid']
|
||||
output_params = {
|
||||
'header': JsonResponseHeader,
|
||||
}
|
||||
@@ -195,7 +184,6 @@ class ReleaseProfile(Es2PlusApiFunction):
|
||||
class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
|
||||
input_params = {
|
||||
'header': JsonRequestHeader,
|
||||
'eid': param.Eid,
|
||||
'iccid': param.Iccid,
|
||||
'profileType': param.ProfileType,
|
||||
@@ -204,9 +192,10 @@ class HandleDownloadProgressInfo(Es2PlusApiFunction):
|
||||
'notificationPointStatus': param.NotificationPointStatus,
|
||||
'resultData': param.ResultData,
|
||||
}
|
||||
input_mandatory = ['header', 'iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
|
||||
expected_http_status = 204
|
||||
|
||||
|
||||
class Es2pApiClient:
|
||||
"""Main class representing a full ES2+ API client. Has one method for each API function."""
|
||||
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
|
||||
@@ -217,17 +206,18 @@ class Es2pApiClient:
|
||||
if client_cert:
|
||||
self.session.cert = client_cert
|
||||
|
||||
self.downloadOrder = JsonHttpApiClient(DownloadOrder(), url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = JsonHttpApiClient(ConfirmOrder(), url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = JsonHttpApiClient(CancelOrder(), url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = JsonHttpApiClient(ReleaseProfile(), url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = JsonHttpApiClient(HandleDownloadProgressInfo(), url_prefix, func_req_id, self.session)
|
||||
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
|
||||
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
|
||||
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
|
||||
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
|
||||
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
|
||||
|
||||
def _gen_func_id(self) -> str:
|
||||
"""Generate the next function call id."""
|
||||
self.func_id += 1
|
||||
return 'FCI-%u-%u' % (time.time(), self.func_id)
|
||||
|
||||
|
||||
def call_downloadOrder(self, data: dict) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(data, self._gen_func_id())
|
||||
@@ -247,116 +237,3 @@ class Es2pApiClient:
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())
|
||||
|
||||
class Es2pApiServerHandlerSmdpp(abc.ABC):
|
||||
"""ES2+ (SMDP+ side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_downloadOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_confirmOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_cancelOrder(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_releaseProfile(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServerHandlerMno(abc.ABC):
|
||||
"""ES2+ (MNO side) API Server handler class. The API user is expected to override the contained methods."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def call_handleDownloadProgressInfo(self, data: dict) -> (dict, str):
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
pass
|
||||
|
||||
class Es2pApiServer(abc.ABC):
|
||||
"""Main class representing a full ES2+ API server. Has one method for each API function."""
|
||||
app = None
|
||||
|
||||
def __init__(self, port: int, interface: str, server_cert: str = None, client_cert_verify: str = None):
|
||||
logger.debug("HTTP SRV: starting ES2+ API server on %s:%s" % (interface, port))
|
||||
self.port = port
|
||||
self.interface = interface
|
||||
if server_cert:
|
||||
self.server_cert = ssl.PrivateCertificate.loadPEM(Path(server_cert).read_text())
|
||||
else:
|
||||
self.server_cert = None
|
||||
if client_cert_verify:
|
||||
self.client_cert_verify = ssl.Certificate.loadPEM(Path(client_cert_verify).read_text())
|
||||
else:
|
||||
self.client_cert_verify = None
|
||||
|
||||
def reactor(self, reactor: PosixReactorBase):
|
||||
logger.debug("HTTP SRV: listen on %s:%s" % (self.interface, self.port))
|
||||
if self.server_cert:
|
||||
if self.client_cert_verify:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(self.client_cert_verify),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenSSL(self.port, Site(self.app.resource()), self.server_cert.options(),
|
||||
interface=self.interface)
|
||||
else:
|
||||
reactor.listenTCP(self.port, Site(self.app.resource()), interface=self.interface)
|
||||
return defer.Deferred()
|
||||
|
||||
class Es2pApiServerSmdpp(Es2pApiServer):
|
||||
"""ES2+ (SMDP+ side) API Server."""
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerSmdpp,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.downloadOrder = JsonHttpApiServer(DownloadOrder(), handler.call_downloadOrder)
|
||||
self.confirmOrder = JsonHttpApiServer(ConfirmOrder(), handler.call_confirmOrder)
|
||||
self.cancelOrder = JsonHttpApiServer(CancelOrder(), handler.call_cancelOrder)
|
||||
self.releaseProfile = JsonHttpApiServer(ReleaseProfile(), handler.call_releaseProfile)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(DownloadOrder.path)
|
||||
def call_downloadOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
|
||||
return self.downloadOrder.call(request)
|
||||
|
||||
@app.route(ConfirmOrder.path)
|
||||
def call_confirmOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
|
||||
return self.confirmOrder.call(request)
|
||||
|
||||
@app.route(CancelOrder.path)
|
||||
def call_cancelOrder(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
|
||||
return self.cancelOrder.call(request)
|
||||
|
||||
@app.route(ReleaseProfile.path)
|
||||
def call_releaseProfile(self, request: Request) -> dict:
|
||||
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
|
||||
return self.releaseProfile.call(request)
|
||||
|
||||
class Es2pApiServerMno(Es2pApiServer):
|
||||
"""ES2+ (MNO side) API Server."""
|
||||
|
||||
app = Klein()
|
||||
|
||||
def __init__(self, port: int, interface: str, handler: Es2pApiServerHandlerMno,
|
||||
server_cert: str = None, client_cert_verify: str = None):
|
||||
super().__init__(port, interface, server_cert, client_cert_verify)
|
||||
self.handler = handler
|
||||
self.handleDownloadProgressInfo = JsonHttpApiServer(HandleDownloadProgressInfo(),
|
||||
handler.call_handleDownloadProgressInfo)
|
||||
task.react(self.reactor)
|
||||
|
||||
@app.route(HandleDownloadProgressInfo.path)
|
||||
def call_handleDownloadProgressInfo(self, request: Request) -> dict:
|
||||
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
|
||||
return self.handleDownloadProgressInfo.call(request)
|
||||
|
||||
@@ -155,11 +155,11 @@ class Es9pApiClient:
|
||||
if server_cert_verify:
|
||||
self.session.verify = server_cert_verify
|
||||
|
||||
self.initiateAuthentication = JsonHttpApiClient(InitiateAuthentication(), url_prefix, '', self.session)
|
||||
self.authenticateClient = JsonHttpApiClient(AuthenticateClient(), url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = JsonHttpApiClient(GetBoundProfilePackage(), url_prefix, '', self.session)
|
||||
self.handleNotification = JsonHttpApiClient(HandleNotification(), url_prefix, '', self.session)
|
||||
self.cancelSession = JsonHttpApiClient(CancelSession(), url_prefix, '', self.session)
|
||||
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
|
||||
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
|
||||
self.handleNotification = HandleNotification(url_prefix, '', self.session)
|
||||
self.cancelSession = CancelSession(url_prefix, '', self.session)
|
||||
|
||||
def call_initiateAuthentication(self, data: dict) -> dict:
|
||||
return self.initiateAuthentication.call(data)
|
||||
|
||||
@@ -21,8 +21,6 @@ import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
import base64
|
||||
from twisted.web.server import Request
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
@@ -133,16 +131,6 @@ class JsonResponseHeader(ApiParam):
|
||||
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
|
||||
raise ValueError('Unknown/unspecified status "%s"' % status)
|
||||
|
||||
class JsonRequestHeader(ApiParam):
|
||||
"""SGP.22 section 6.5.1.3."""
|
||||
@classmethod
|
||||
def verify_decoded(cls, data):
|
||||
func_req_id = data.get('functionRequesterIdentifier')
|
||||
if not func_req_id:
|
||||
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
|
||||
func_call_id = data.get('functionCallIdentifier')
|
||||
if not func_call_id:
|
||||
raise ValueError('Missing mandatory functionCallIdentifier in header')
|
||||
|
||||
class HttpStatusError(Exception):
|
||||
pass
|
||||
@@ -173,118 +161,65 @@ class ApiError(Exception):
|
||||
|
||||
class JsonHttpApiFunction(abc.ABC):
|
||||
"""Base class for representing an HTTP[s] API Function."""
|
||||
# The below class variables are used to describe the properties of the API function. Derived classes are expected
|
||||
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
|
||||
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
|
||||
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
|
||||
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
|
||||
# prefix.
|
||||
# the below class variables are expected to be overridden in derived classes
|
||||
|
||||
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder')
|
||||
path = None
|
||||
|
||||
# dictionary of input parameters. key is parameter name, value is ApiParam class
|
||||
input_params = {}
|
||||
|
||||
# list of mandatory input parameters
|
||||
input_mandatory = []
|
||||
|
||||
# dictionary of output parameters. key is parameter name, value is ApiParam class
|
||||
output_params = {}
|
||||
|
||||
# list of mandatory output parameters (for successful response)
|
||||
output_mandatory = []
|
||||
|
||||
# list of mandatory output parameters (for failed response)
|
||||
output_mandatory_failed = []
|
||||
|
||||
# expected HTTP status code of the response
|
||||
expected_http_status = 200
|
||||
|
||||
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
|
||||
http_method = 'POST'
|
||||
|
||||
# additional custom HTTP headers (client requests)
|
||||
extra_http_req_headers = {}
|
||||
|
||||
# additional custom HTTP headers (server responses)
|
||||
extra_http_res_headers = {}
|
||||
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def __new__(cls, *args, role = 'legacy_client', **kwargs):
|
||||
"""
|
||||
Args:
|
||||
args: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
role: role ('server' or 'client') in which the JsonHttpApiFunction should be created.
|
||||
kwargs: (see JsonHttpApiClient and JsonHttpApiServer)
|
||||
"""
|
||||
|
||||
# Create a dictionary with the class attributes of this class (the properties listed above and the encode_
|
||||
# decode_ methods below). The dictionary will not include any dunder/magic methods
|
||||
cls_attr = {attr_name: getattr(cls, attr_name) for attr_name in dir(cls) if not attr_name.startswith('__')}
|
||||
|
||||
# Normal instantiation as JsonHttpApiFunction:
|
||||
if len(args) == 0 and len(kwargs) == 0:
|
||||
return type(cls.__name__, (abc.ABC,), cls_attr)()
|
||||
|
||||
# Instantiation as as JsonHttpApiFunction with a JsonHttpApiClient or JsonHttpApiServer base
|
||||
if role == 'legacy_client':
|
||||
# Deprecated: With the advent of the server role (JsonHttpApiServer) the API had to be changed. To maintain
|
||||
# compatibility with existing code (out-of-tree) the original behaviour and API interface and behaviour had
|
||||
# to be preserved. Already existing JsonHttpApiFunction definitions will still work and the related objects
|
||||
# may still be created on the original way: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session)
|
||||
logger.warning('implicit role (falling back to legacy JsonHttpApiClient) is deprecated, please specify role explcitly')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
result.legacy = True
|
||||
return result
|
||||
elif role == 'client':
|
||||
# Create a JsonHttpApiFunction in client role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='client')
|
||||
result = type(cls.__name__, (JsonHttpApiClient,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
elif role == 'server':
|
||||
# Create a JsonHttpApiFunction in server role
|
||||
# Example: my_api_func = MyApiFunc(url_prefix, func_req_id, self.session, role='server')
|
||||
result = type(cls.__name__, (JsonHttpApiServer,), cls_attr)(None, *args, **kwargs)
|
||||
result.api_func = result
|
||||
return result
|
||||
else:
|
||||
raise ValueError('Invalid role \'%s\' specified' % role)
|
||||
|
||||
def encode_client(self, data: dict) -> dict:
|
||||
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for request body."""
|
||||
output = {}
|
||||
if func_call_id:
|
||||
output['header'] = {
|
||||
'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id
|
||||
}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
# pySim/esim/http_json_api.py:269:47: E1101: Instance of 'JsonHttpApiFunction' has no 'legacy' member (no-member)
|
||||
# pylint: disable=no-member
|
||||
if hasattr(self, 'legacy') and self.legacy:
|
||||
output[p] = JsonRequestHeader.encode(v)
|
||||
else:
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_client(self, data: dict) -> dict:
|
||||
def decode(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
if 'header' in self.output_params:
|
||||
# let's first do the header, it's special
|
||||
if not 'header' in data:
|
||||
raise ValueError('Mandatory output parameter "header" missing')
|
||||
hdr_class = self.output_params.get('header')
|
||||
output['header'] = hdr_class.decode(data['header'])
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
# we can only expect mandatory parameters to be present in case of successful execution
|
||||
for p in self.output_mandatory:
|
||||
if p == 'header':
|
||||
continue
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
@@ -296,171 +231,35 @@ class JsonHttpApiFunction(abc.ABC):
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
def encode_server(self, data: dict) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for response body."""
|
||||
output = {}
|
||||
output_mandatory = self.output_mandatory
|
||||
|
||||
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
||||
# different set of mandatory parameters applies.
|
||||
header = data.get('header')
|
||||
if header:
|
||||
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
output_mandatory = self.output_mandatory_failed
|
||||
|
||||
for p in output_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory output parameter %s missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.output_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.encode(v)
|
||||
return output
|
||||
|
||||
def decode_server(self, data: dict) -> dict:
|
||||
"""[further] Decode and validate the JSON-Dict of the request body."""
|
||||
output = {}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter "%s" missing' % p)
|
||||
for p, v in data.items():
|
||||
p_class = self.input_params.get(p)
|
||||
if not p_class:
|
||||
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
|
||||
output[p] = v
|
||||
else:
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
class JsonHttpApiClient():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
|
||||
session: requests.Session):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
url_prefix : prefix to be put in front of the API function path (see JsonHttpApiFunction)
|
||||
func_req_id : function requestor id to use for requests
|
||||
session : session object (requests)
|
||||
"""
|
||||
self.api_func = api_func
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
|
||||
"""Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
|
||||
json-serializable dict. Output data is returned as json-deserialized dict."""
|
||||
|
||||
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
|
||||
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
|
||||
# field is checked by the encode_client method)
|
||||
if func_call_id:
|
||||
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_client(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
"""Make an API call to the HTTP API endpoint represented by this object.
|
||||
Input data is passed in `data` as json-serializable dict. Output data
|
||||
is returned as json-deserialized dict."""
|
||||
url = self.url_prefix + self.path
|
||||
encoded = json.dumps(self.encode(data, func_call_id))
|
||||
req_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
req_headers.update(self.api_func.extra_http_req_headers)
|
||||
req_headers.update(self.extra_http_req_headers)
|
||||
|
||||
# Perform HTTP request
|
||||
url = self.url_prefix + self.api_func.path
|
||||
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
|
||||
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
response = self.session.request(self.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
||||
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
|
||||
logger.debug("HTTP RSP: %s" % (response.content))
|
||||
|
||||
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
|
||||
# SGP.22, section 6.5.1)
|
||||
if response.status_code != self.api_func.expected_http_status:
|
||||
if response.status_code != self.expected_http_status:
|
||||
raise HttpStatusError(response)
|
||||
if not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
||||
raise HttpHeaderError(response)
|
||||
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
|
||||
raise HttpHeaderError(response)
|
||||
|
||||
# Decode response and return the result back to the caller
|
||||
if response.content:
|
||||
if response.headers.get('Content-Type').startswith('application/json'):
|
||||
output = self.api_func.decode_client(response.json())
|
||||
return self.decode(response.json())
|
||||
elif response.headers.get('Content-Type').startswith('text/plain;charset=UTF-8'):
|
||||
output = { 'data': response.content.decode('utf-8') }
|
||||
else:
|
||||
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
|
||||
return { 'data': response.content.decode('utf-8') }
|
||||
raise HttpHeaderError(f'unimplemented response Content-Type: {response.headers=!r}')
|
||||
|
||||
# In case the response contains a header, check it to make sure that the API call was executed successfully
|
||||
# (the presence of the header field is checked by the decode_client method)
|
||||
if 'header' in output:
|
||||
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
||||
raise ApiError(output['header']['functionExecutionStatus'])
|
||||
return output
|
||||
return None
|
||||
|
||||
class JsonHttpApiServer():
|
||||
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
|
||||
"""
|
||||
Args:
|
||||
api_func : API function definition (JsonHttpApiFunction)
|
||||
call_handler : handler function to process the request. This function must accept the
|
||||
decoded request as a dictionary. The handler function must return a tuple consisting
|
||||
of the response in the form of a dictionary (may be empty), and a function execution
|
||||
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
|
||||
"""
|
||||
self.api_func = api_func
|
||||
if call_handler:
|
||||
self.call_handler = call_handler
|
||||
else:
|
||||
self.call_handler = self.default_handler
|
||||
|
||||
def default_handler(self, data: dict) -> (dict, str):
|
||||
"""default handler, used in case no call handler is provided."""
|
||||
logger.error("no handler function for request: %s" % str(data))
|
||||
return {}, 'Failed'
|
||||
|
||||
def call(self, request: Request) -> str:
|
||||
""" Process an incoming request.
|
||||
Args:
|
||||
request : request object as received using twisted.web.server
|
||||
Returns:
|
||||
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
|
||||
provided the request object)
|
||||
"""
|
||||
|
||||
# Make sure the request is done with the correct HTTP method
|
||||
if (request.method.decode() != self.api_func.http_method):
|
||||
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
|
||||
|
||||
# Decode the request
|
||||
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
|
||||
|
||||
# Run call handler (see above)
|
||||
data, fe_status = self.call_handler(decoded_request)
|
||||
|
||||
# In case a function execution status is returned, use it to generate and prepend the header field according to
|
||||
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
|
||||
if fe_status:
|
||||
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
|
||||
|
||||
# Encode the message (the presence of mandatory fields is checked during encoding)
|
||||
encoded = json.dumps(self.api_func.encode_server(data))
|
||||
|
||||
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
||||
res_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
||||
}
|
||||
res_headers.update(self.api_func.extra_http_res_headers)
|
||||
for header, value in res_headers.items():
|
||||
request.setHeader(header, value)
|
||||
request.setResponseCode(self.api_func.expected_http_status)
|
||||
|
||||
# Return the encoded result back to the caller for sending (using twisted/klein)
|
||||
return encoded
|
||||
|
||||
|
||||
@@ -55,22 +55,6 @@ class ClassVarMeta(abc.ABCMeta):
|
||||
setattr(x, k, v)
|
||||
return x
|
||||
|
||||
def file_tuples_content_as_bytes(l: List[Tuple]) -> Optional[bytes]:
|
||||
"""linearize a list of fillFileContent / fillFileOffset tuples into a stream of bytes."""
|
||||
stream = io.BytesIO()
|
||||
for k, v in l:
|
||||
if k == 'doNotCreate':
|
||||
return None
|
||||
if k == 'fileDescriptor':
|
||||
pass
|
||||
elif k == 'fillFileOffset':
|
||||
stream.seek(v, os.SEEK_CUR)
|
||||
elif k == 'fillFileContent':
|
||||
stream.write(v)
|
||||
else:
|
||||
return ValueError("Unknown key '%s' in tuple list" % k)
|
||||
return stream.getvalue()
|
||||
|
||||
class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
|
||||
r"""Base class representing a part of the eSIM profile that is configurable during the
|
||||
personalization process (with dynamic data from elsewhere).
|
||||
|
||||
@@ -859,28 +859,22 @@ class ADF_SD(CardADF):
|
||||
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
|
||||
self._cmd.poutput("Loaded a total of %u bytes in %u blocks. Don't forget install_for_install (and make selectable) now!" % (total_size, block_nr))
|
||||
|
||||
install_cap_parser = argparse.ArgumentParser(usage='%(prog)s FILE [--install-parameters | --install-parameters-*]')
|
||||
install_cap_parser = argparse.ArgumentParser()
|
||||
install_cap_parser.add_argument('cap_file', type=str, metavar='FILE',
|
||||
help='JAVA-CARD CAP file to install')
|
||||
# Ideally, the parser should enforce that:
|
||||
# * either the `--install-parameters` is given alone,
|
||||
# * or distinct `--install-parameters-*` are optionally given instead.
|
||||
# We tried to achieve this using mutually exclusive groups (add_mutually_exclusive_group).
|
||||
# However, group nesting was never supported, often failed to work correctly, and was unintentionally
|
||||
# exposed through inheritance. It has been deprecated since version 3.11, removed in version 3.14.
|
||||
# Hence, we have to implement the enforcement manually.
|
||||
install_cap_parser_inst_prm_grp = install_cap_parser.add_argument_group('Install Parameters')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters', type=is_hexstr, default=None,
|
||||
help='install Parameters (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters-non-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='non volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_grp.add_argument('--install-parameters-stk',
|
||||
type=is_hexstr, default=None,
|
||||
help='Load Parameters (ETSI TS 102 226, section 8.2.1.3.2.1)')
|
||||
install_cap_parser_inst_prm_g = install_cap_parser.add_mutually_exclusive_group()
|
||||
install_cap_parser_inst_prm_g.add_argument('--install-parameters', type=is_hexstr, default=None,
|
||||
help='install Parameters (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_g_grp = install_cap_parser_inst_prm_g.add_argument_group()
|
||||
install_cap_parser_inst_prm_g_grp.add_argument('--install-parameters-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_g_grp.add_argument('--install-parameters-non-volatile-memory-quota',
|
||||
type=int, default=None,
|
||||
help='non volatile memory quota (GPC_SPE_034, section 11.5.2.3.7, table 11-49)')
|
||||
install_cap_parser_inst_prm_g_grp.add_argument('--install-parameters-stk',
|
||||
type=is_hexstr, default=None,
|
||||
help='Load Parameters (ETSI TS 102 226, section 8.2.1.3.2.1)')
|
||||
|
||||
@cmd2.with_argparser(install_cap_parser)
|
||||
def do_install_cap(self, opts):
|
||||
@@ -894,17 +888,9 @@ class ADF_SD(CardADF):
|
||||
load_file_aid = cap.get_loadfile_aid()
|
||||
module_aid = cap.get_applet_aid()
|
||||
application_aid = module_aid
|
||||
if opts.install_parameters is not None:
|
||||
# `--install-parameters` and `--install-parameters-*` are mutually exclusive
|
||||
# make sure that none of `--install-parameters-*` is given; abort otherwise
|
||||
if any(p is not None for p in [opts.install_parameters_non_volatile_memory_quota,
|
||||
opts.install_parameters_volatile_memory_quota,
|
||||
opts.install_parameters_stk]):
|
||||
self.install_cap_parser.error('arguments --install-parameters-* are '
|
||||
'not allowed with --install-parameters')
|
||||
if opts.install_parameters:
|
||||
install_parameters = opts.install_parameters;
|
||||
else:
|
||||
# `--install-parameters-*` are all optional
|
||||
install_parameters = gen_install_parameters(opts.install_parameters_non_volatile_memory_quota,
|
||||
opts.install_parameters_volatile_memory_quota,
|
||||
opts.install_parameters_stk)
|
||||
|
||||
Reference in New Issue
Block a user