mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-16 18:38:32 +03:00
410 lines
16 KiB
Python
410 lines
16 KiB
Python
"""GSMA eSIM RSP HTTP/REST/JSON interface according to SGP.22 v2.5"""
|
|
|
|
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import abc
|
|
import requests
|
|
import logging
|
|
import json
|
|
from typing import Optional
|
|
import base64
|
|
from twisted.web.server import Request
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
class ApiParam(abc.ABC):
|
|
"""A class representing a single parameter in the API."""
|
|
@classmethod
|
|
def verify_decoded(cls, data):
|
|
"""Verify the decoded representation of a value. Should raise an exception if something is odd."""
|
|
pass
|
|
|
|
@classmethod
|
|
def verify_encoded(cls, data):
|
|
"""Verify the encoded representation of a value. Should raise an exception if something is odd."""
|
|
pass
|
|
|
|
@classmethod
|
|
def encode(cls, data):
|
|
"""[Validate and] Encode the given value."""
|
|
cls.verify_decoded(data)
|
|
encoded = cls._encode(data)
|
|
cls.verify_decoded(encoded)
|
|
return encoded
|
|
|
|
@classmethod
|
|
def _encode(cls, data):
|
|
"""encoder function, typically [but not always] overridden by derived class."""
|
|
return data
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
"""[Validate and] Decode the given value."""
|
|
cls.verify_encoded(data)
|
|
decoded = cls._decode(data)
|
|
cls.verify_decoded(decoded)
|
|
return decoded
|
|
|
|
@classmethod
|
|
def _decode(cls, data):
|
|
"""decoder function, typically [but not always] overridden by derived class."""
|
|
return data
|
|
|
|
class ApiParamString(ApiParam):
|
|
"""Base class representing an API parameter of 'string' type."""
|
|
pass
|
|
|
|
|
|
class ApiParamInteger(ApiParam):
|
|
"""Base class representing an API parameter of 'integer' type."""
|
|
@classmethod
|
|
def _decode(cls, data):
|
|
return int(data)
|
|
|
|
@classmethod
|
|
def _encode(cls, data):
|
|
return str(data)
|
|
|
|
@classmethod
|
|
def verify_decoded(cls, data):
|
|
if not isinstance(data, int):
|
|
raise TypeError('Expected an integer input data type')
|
|
|
|
@classmethod
|
|
def verify_encoded(cls, data):
|
|
if isinstance(data, int):
|
|
return
|
|
if not data.isdecimal():
|
|
raise ValueError('integer (%s) contains non-decimal characters' % data)
|
|
assert str(int(data)) == data
|
|
|
|
class ApiParamBoolean(ApiParam):
|
|
"""Base class representing an API parameter of 'boolean' type."""
|
|
@classmethod
|
|
def _encode(cls, data):
|
|
return bool(data)
|
|
|
|
class ApiParamFqdn(ApiParam):
|
|
"""String, as a list of domain labels concatenated using the full stop (dot, period) character as
|
|
separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5
|
|
of ISO/IEC 18004"""
|
|
@classmethod
|
|
def verify_encoded(cls, data):
|
|
# FIXME
|
|
pass
|
|
|
|
class ApiParamBase64(ApiParam):
|
|
@classmethod
|
|
def _decode(cls, data):
|
|
return base64.b64decode(data)
|
|
|
|
@classmethod
|
|
def _encode(cls, data):
|
|
return base64.b64encode(data).decode('ascii')
|
|
|
|
class SmdpAddress(ApiParamFqdn):
|
|
pass
|
|
|
|
class JsonResponseHeader(ApiParam):
|
|
"""SGP.22 section 6.5.1.4."""
|
|
@classmethod
|
|
def verify_decoded(cls, data):
|
|
fe_status = data.get('functionExecutionStatus')
|
|
if not fe_status:
|
|
raise ValueError('Missing mandatory functionExecutionStatus in header')
|
|
status = fe_status.get('status')
|
|
if not status:
|
|
raise ValueError('Missing mandatory status in header functionExecutionStatus')
|
|
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
|
|
raise ValueError('Unknown/unspecified status "%s"' % status)
|
|
|
|
class JsonRequestHeader(ApiParam):
|
|
"""SGP.22 section 6.5.1.3."""
|
|
@classmethod
|
|
def verify_decoded(cls, data):
|
|
func_req_id = data.get('functionRequesterIdentifier')
|
|
if not func_req_id:
|
|
raise ValueError('Missing mandatory functionRequesterIdentifier in header')
|
|
func_call_id = data.get('functionCallIdentifier')
|
|
if not func_call_id:
|
|
raise ValueError('Missing mandatory functionCallIdentifier in header')
|
|
|
|
class HttpStatusError(Exception):
|
|
pass
|
|
|
|
class HttpHeaderError(Exception):
|
|
pass
|
|
|
|
class ApiError(Exception):
|
|
"""Exception representing an error at the API level (status != Executed)."""
|
|
def __init__(self, func_ex_status: dict):
|
|
self.status = func_ex_status['status']
|
|
sec = {
|
|
'subjectCode': None,
|
|
'reasonCode': None,
|
|
'subjectIdentifier': None,
|
|
'message': None,
|
|
}
|
|
actual_sec = func_ex_status.get('statusCodeData', None)
|
|
if actual_sec:
|
|
sec.update(actual_sec)
|
|
self.subject_code = sec['subjectCode']
|
|
self.reason_code = sec['reasonCode']
|
|
self.subject_id = sec['subjectIdentifier']
|
|
self.message = sec['message']
|
|
|
|
def __str__(self):
|
|
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
|
|
|
|
class JsonHttpApiFunction(abc.ABC):
|
|
"""Base class for representing an HTTP[s] API Function."""
|
|
# The below class variables are used to describe the properties of the API function. Derived classes are expected
|
|
# to orverride those class properties with useful values. The prefixes "input_" and "output_" refer to the API
|
|
# function from an abstract point of view. Seen from the client perspective, "input_" will refer to parameters the
|
|
# client sends to a HTTP server. Seen from the server perspective, "input_" will refer to parameters the server
|
|
# receives from the a requesting client. The same applies vice versa to class variables that have an "output_"
|
|
# prefix.
|
|
|
|
# path of the API function (e.g. '/gsma/rsp2/es2plus/confirmOrder')
|
|
path = None
|
|
|
|
# dictionary of input parameters. key is parameter name, value is ApiParam class
|
|
input_params = {}
|
|
|
|
# list of mandatory input parameters
|
|
input_mandatory = []
|
|
|
|
# dictionary of output parameters. key is parameter name, value is ApiParam class
|
|
output_params = {}
|
|
|
|
# list of mandatory output parameters (for successful response)
|
|
output_mandatory = []
|
|
|
|
# list of mandatory output parameters (for failed response)
|
|
output_mandatory_failed = []
|
|
|
|
# expected HTTP status code of the response
|
|
expected_http_status = 200
|
|
|
|
# the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
|
|
http_method = 'POST'
|
|
|
|
# additional custom HTTP headers (client requests)
|
|
extra_http_req_headers = {}
|
|
|
|
# additional custom HTTP headers (server responses)
|
|
extra_http_res_headers = {}
|
|
|
|
def encode_client(self, data: dict) -> dict:
|
|
"""Validate an encode input dict into JSON-serializable dict for request body."""
|
|
output = {}
|
|
|
|
for p in self.input_mandatory:
|
|
if not p in data:
|
|
raise ValueError('Mandatory input parameter %s missing' % p)
|
|
for p, v in data.items():
|
|
p_class = self.input_params.get(p)
|
|
if not p_class:
|
|
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
|
|
output[p] = v
|
|
else:
|
|
output[p] = p_class.encode(v)
|
|
return output
|
|
|
|
def decode_client(self, data: dict) -> dict:
|
|
"""[further] Decode and validate the JSON-Dict of the response body."""
|
|
output = {}
|
|
output_mandatory = self.output_mandatory
|
|
|
|
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
|
# different set of mandatory parameters applies.
|
|
header = data.get('header')
|
|
if header:
|
|
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
|
output_mandatory = self.output_mandatory_failed
|
|
|
|
for p in output_mandatory:
|
|
if not p in data:
|
|
raise ValueError('Mandatory output parameter "%s" missing' % p)
|
|
for p, v in data.items():
|
|
p_class = self.output_params.get(p)
|
|
if not p_class:
|
|
logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v)
|
|
output[p] = v
|
|
else:
|
|
output[p] = p_class.decode(v)
|
|
return output
|
|
|
|
def encode_server(self, data: dict) -> dict:
|
|
"""Validate an encode input dict into JSON-serializable dict for response body."""
|
|
output = {}
|
|
output_mandatory = self.output_mandatory
|
|
|
|
# In case a provided header (may be optional) indicates that the API function call was unsuccessful, a
|
|
# different set of mandatory parameters applies.
|
|
header = data.get('header')
|
|
if header:
|
|
if data['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
|
output_mandatory = self.output_mandatory_failed
|
|
|
|
for p in output_mandatory:
|
|
if not p in data:
|
|
raise ValueError('Mandatory output parameter %s missing' % p)
|
|
for p, v in data.items():
|
|
p_class = self.output_params.get(p)
|
|
if not p_class:
|
|
logger.warning('Unexpected/unsupported output parameter %s=%s', p, v)
|
|
output[p] = v
|
|
else:
|
|
output[p] = p_class.encode(v)
|
|
return output
|
|
|
|
def decode_server(self, data: dict) -> dict:
|
|
"""[further] Decode and validate the JSON-Dict of the request body."""
|
|
output = {}
|
|
|
|
for p in self.input_mandatory:
|
|
if not p in data:
|
|
raise ValueError('Mandatory input parameter "%s" missing' % p)
|
|
for p, v in data.items():
|
|
p_class = self.input_params.get(p)
|
|
if not p_class:
|
|
logger.warning('Unexpected/unsupported input parameter "%s"="%s"', p, v)
|
|
output[p] = v
|
|
else:
|
|
output[p] = p_class.decode(v)
|
|
return output
|
|
|
|
class JsonHttpApiClient():
|
|
def __init__(self, api_func: JsonHttpApiFunction, url_prefix: str, func_req_id: Optional[str],
|
|
session: requests.Session):
|
|
self.api_func = api_func
|
|
self.url_prefix = url_prefix
|
|
self.func_req_id = func_req_id
|
|
self.session = session
|
|
|
|
def call(self, data: dict, func_call_id: Optional[str] = None, timeout=10) -> Optional[dict]:
|
|
"""Make an API call to the HTTP API endpoint represented by this object. Input data is passed in `data` as
|
|
json-serializable dict. Output data is returned as json-deserialized dict."""
|
|
|
|
# In case a function caller ID is supplied, use it together with the stored function requestor ID to generate
|
|
# and prepend the header field according to SGP.22, section 6.5.1.1 and 6.5.1.3. (the presence of the header
|
|
# field is checked by the encode_client method)
|
|
if func_call_id:
|
|
data = {'header' : {'functionRequesterIdentifier': self.func_req_id,
|
|
'functionCallIdentifier': func_call_id}} | data
|
|
|
|
# Encode the message (the presence of mandatory fields is checked during encoding)
|
|
encoded = json.dumps(self.api_func.encode_client(data))
|
|
|
|
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
|
req_headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
|
}
|
|
req_headers.update(self.api_func.extra_http_req_headers)
|
|
|
|
# Perform HTTP request
|
|
url = self.url_prefix + self.api_func.path
|
|
logger.debug("HTTP REQ %s - hdr: %s '%s'" % (url, req_headers, encoded))
|
|
response = self.session.request(self.api_func.http_method, url, data=encoded, headers=req_headers, timeout=timeout)
|
|
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
|
|
logger.debug("HTTP RSP: %s" % (response.content))
|
|
|
|
# Check HTTP response status code and make sure that the returned HTTP headers look plausible (according to
|
|
# SGP.22, section 6.5.1)
|
|
if response.status_code != self.api_func.expected_http_status:
|
|
raise HttpStatusError(response)
|
|
if not response.headers.get('Content-Type').startswith(req_headers['Content-Type']):
|
|
raise HttpHeaderError(response)
|
|
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
|
|
raise HttpHeaderError(response)
|
|
|
|
# Decode response and return the result back to the caller
|
|
if response.content:
|
|
output = self.api_func.decode_client(response.json())
|
|
# In case the response contains a header, check it to make sure that the API call was executed successfully
|
|
# (the presence of the header field is checked by the decode_client method)
|
|
if 'header' in output:
|
|
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
|
|
raise ApiError(output['header']['functionExecutionStatus'])
|
|
return output
|
|
return None
|
|
|
|
class JsonHttpApiServer():
|
|
def __init__(self, api_func: JsonHttpApiFunction, call_handler = None):
|
|
"""
|
|
Args:
|
|
api_func : API function definition (JsonHttpApiFunction)
|
|
call_handler : handler function to process the request. This function must accept the
|
|
decoded request as a dictionary. The handler function must return a tuple consisting
|
|
of the response in the form of a dictionary (may be empty), and a function execution
|
|
status string ('Executed-Success', 'Executed-WithWarning', 'Failed' or 'Expired')
|
|
"""
|
|
self.api_func = api_func
|
|
if call_handler:
|
|
self.call_handler = call_handler
|
|
else:
|
|
self.call_handler = self.default_handler
|
|
|
|
def default_handler(self, data: dict) -> (dict, str):
|
|
"""default handler, used in case no call handler is provided."""
|
|
logger.error("no handler function for request: %s" % str(data))
|
|
return {}, 'Failed'
|
|
|
|
def call(self, request: Request) -> str:
|
|
""" Process an incoming request.
|
|
Args:
|
|
request : request object as received using twisted.web.server
|
|
Returns:
|
|
encoded JSON string (HTTP response code and headers are set by calling the appropriate methods on the
|
|
provided the request object)
|
|
"""
|
|
|
|
# Make sure the request is done with the correct HTTP method
|
|
if (request.method.decode() != self.api_func.http_method):
|
|
raise ValueError('Wrong HTTP method %s!=%s' % (request.method.decode(), self.api_func.http_method))
|
|
|
|
# Decode the request
|
|
decoded_request = self.api_func.decode_server(json.loads(request.content.read()))
|
|
|
|
# Run call handler (see above)
|
|
data, fe_status = self.call_handler(decoded_request)
|
|
|
|
# In case a function execution status is returned, use it to generate and prepend the header field according to
|
|
# SGP.22, section 6.5.1.2 and 6.5.1.4 (the presence of the header filed is checked by the encode_server method)
|
|
if fe_status:
|
|
data = {'header' : {'functionExecutionStatus': {'status' : fe_status}}} | data
|
|
|
|
# Encode the message (the presence of mandatory fields is checked during encoding)
|
|
encoded = json.dumps(self.api_func.encode_server(data))
|
|
|
|
# Apply HTTP request headers according to SGP.22, section 6.5.1
|
|
res_headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
|
|
}
|
|
res_headers.update(self.api_func.extra_http_res_headers)
|
|
for header, value in res_headers.items():
|
|
request.setHeader(header, value)
|
|
request.setResponseCode(self.api_func.expected_http_status)
|
|
|
|
# Return the encoded result back to the caller for sending (using twisted/klein)
|
|
return encoded
|
|
|