mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-16 18:38:32 +03:00
esim.es9p: Suppress sending requestHeader on ES9+
SGP.22 states that ES9+ should not include a requestHeader Change-Id: Ic9aa874a82241d7b26e2bcb0423961173e103020
This commit is contained in:
@@ -39,7 +39,6 @@ logging.basicConfig(level=logging.DEBUG)
|
||||
parser = argparse.ArgumentParser(description="""
|
||||
Utility to manually issue requests against the ES9+ API of an SM-DP+ according to GSMA SGP.22.""")
|
||||
parser.add_argument('--url', required=True, help='Base URL of ES9+ API endpoint')
|
||||
parser.add_argument('--id', default='osmocom pySim', help='Entity identifier passed to SM-DP+')
|
||||
parser.add_argument('--server-ca-cert', help="""X.509 CA certificates acceptable for the server side. In
|
||||
production use cases, this would be the GSMA Root CA (CI) certificate.""")
|
||||
subparsers = parser.add_subparsers(dest='command',help="The command (API function) to call", required=True)
|
||||
@@ -87,7 +86,7 @@ def do_download(opts):
|
||||
print("CI PKID: %s" % b2h(ci_pkid))
|
||||
print()
|
||||
|
||||
peer = es9p.Es9pApiClient(opts.url, opts.id, server_cert_verify=opts.server_ca_cert)
|
||||
peer = es9p.Es9pApiClient(opts.url, server_cert_verify=opts.server_ca_cert)
|
||||
|
||||
print("Step 1: InitiateAuthentication...")
|
||||
|
||||
|
||||
@@ -148,35 +148,29 @@ class CancelSession(Es9PlusApiFunction):
|
||||
input_mandatory = ['transactionId', 'cancelSessionResponse']
|
||||
|
||||
class Es9pApiClient:
|
||||
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None):
|
||||
self.func_id = 0
|
||||
def __init__(self, url_prefix:str, server_cert_verify: str = None):
|
||||
self.session = requests.Session()
|
||||
self.session.verify = False # FIXME HACK
|
||||
if server_cert_verify:
|
||||
self.session.verify = server_cert_verify
|
||||
|
||||
self.initiateAuthentication = InitiateAuthentication(url_prefix, func_req_id, self.session)
|
||||
self.authenticateClient = AuthenticateClient(url_prefix, func_req_id, self.session)
|
||||
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, func_req_id, self.session)
|
||||
self.handleNotification = HandleNotification(url_prefix, func_req_id, self.session)
|
||||
self.cancelSession = CancelSession(url_prefix, func_req_id, self.session)
|
||||
|
||||
def _gen_func_id(self) -> str:
|
||||
"""Generate the next function call id."""
|
||||
self.func_id += 1
|
||||
return 'FCI-%u-%u' % (time.time(), self.func_id)
|
||||
self.initiateAuthentication = InitiateAuthentication(url_prefix, '', self.session)
|
||||
self.authenticateClient = AuthenticateClient(url_prefix, '', self.session)
|
||||
self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, '', self.session)
|
||||
self.handleNotification = HandleNotification(url_prefix, '', self.session)
|
||||
self.cancelSession = CancelSession(url_prefix, '', self.session)
|
||||
|
||||
def call_initiateAuthentication(self, data: dict) -> dict:
|
||||
return self.initiateAuthentication.call(data, self._gen_func_id())
|
||||
return self.initiateAuthentication.call(data)
|
||||
|
||||
def call_authenticateClient(self, data: dict) -> dict:
|
||||
return self.authenticateClient.call(data, self._gen_func_id())
|
||||
return self.authenticateClient.call(data)
|
||||
|
||||
def call_getBoundProfilePackage(self, data: dict) -> dict:
|
||||
return self.getBoundProfilePackage.call(data, self._gen_func_id())
|
||||
return self.getBoundProfilePackage.call(data)
|
||||
|
||||
def call_handleNotification(self, data: dict) -> dict:
|
||||
return self.handleNotification.call(data, self._gen_func_id())
|
||||
return self.handleNotification.call(data)
|
||||
|
||||
def call_cancelSession(self, data: dict) -> dict:
|
||||
return self.cancelSession.call(data, self._gen_func_id())
|
||||
return self.cancelSession.call(data)
|
||||
|
||||
@@ -19,6 +19,7 @@ import abc
|
||||
import requests
|
||||
import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
import base64
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -176,19 +177,20 @@ class JsonHttpApiFunction(abc.ABC):
|
||||
http_method = 'POST'
|
||||
extra_http_req_headers = {}
|
||||
|
||||
def __init__(self, url_prefix: str, func_req_id: str, session: requests.Session):
|
||||
def __init__(self, url_prefix: str, func_req_id: Optional[str], session: requests.Session):
|
||||
self.url_prefix = url_prefix
|
||||
self.func_req_id = func_req_id
|
||||
self.session = session
|
||||
|
||||
def encode(self, data: dict, func_call_id: str) -> dict:
|
||||
def encode(self, data: dict, func_call_id: Optional[str] = None) -> dict:
|
||||
"""Validate an encode input dict into JSON-serializable dict for request body."""
|
||||
output = {
|
||||
'header': {
|
||||
'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id
|
||||
}
|
||||
}
|
||||
output = {}
|
||||
if func_call_id:
|
||||
output['header'] = {
|
||||
'functionRequesterIdentifier': self.func_req_id,
|
||||
'functionCallIdentifier': func_call_id
|
||||
}
|
||||
|
||||
for p in self.input_mandatory:
|
||||
if not p in data:
|
||||
raise ValueError('Mandatory input parameter %s missing' % p)
|
||||
@@ -227,7 +229,7 @@ class JsonHttpApiFunction(abc.ABC):
|
||||
output[p] = p_class.decode(v)
|
||||
return output
|
||||
|
||||
def call(self, data: dict, func_call_id:str, timeout=10) -> dict:
|
||||
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> dict:
|
||||
"""Make an API call to the HTTP API endpoint represented by this object.
|
||||
Input data is passed in `data` as json-serializable dict. Output data
|
||||
is returned as json-deserialized dict."""
|
||||
|
||||
Reference in New Issue
Block a user