"""GSMA eSIM RSP ES2+ interface according to SGP.22 v2.5""" # (C) 2024 by Harald Welte # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import abc import requests import logging import json from datetime import datetime import time import base64 logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class ApiParam(abc.ABC): """A class reprsenting a single parameter in the ES2+ API.""" @classmethod def verify_decoded(cls, data): """Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd.""" pass @classmethod def verify_encoded(cls, data): """Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd.""" pass @classmethod def encode(cls, data): """[Validate and] Encode the given value.""" cls.verify_decoded(data) encoded = cls._encode(data) cls.verify_decoded(encoded) return encoded @classmethod def _encode(cls, data): """encoder function, typically [but not always] overridden by derived class.""" return data @classmethod def decode(cls, data): """[Validate and] Decode the given value.""" cls.verify_encoded(data) decoded = cls._decode(data) cls.verify_decoded(decoded) return decoded @classmethod def _decode(cls, data): """decoder function, typically [but not always] overridden by derived class.""" return data class ApiParamString(ApiParam): """Base class representing an API parameter of 'string' type.""" pass class ApiParamInteger(ApiParam): """Base class representing an API parameter of 'integer' type.""" @classmethod def _decode(cls, data): return int(data) @classmethod def _encode(cls, data): return str(data) @classmethod def verify_decoded(cls, data): if not isinstance(data, int): raise TypeError('Expected an integer input data type') @classmethod def verify_encoded(cls, data): if not data.isdecimal(): raise ValueError('integer (%s) contains non-decimal characters' % data) assert str(int(data)) == data class ApiParamBoolean(ApiParam): """Base class representing an API parameter of 'boolean' type.""" @classmethod def _encode(cls, data): return bool(data) class ApiParamFqdn(ApiParam): """String, as a list of domain labels concatenated using the full stop (dot, period) character as separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5 of ISO/IEC 18004""" @classmethod def verify_encoded(cls, data): # FIXME pass class param: class Iccid(ApiParamString): """String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding character F.""" @classmethod def _encode(cls, data): data = str(data) # SGP.22 version prior to 2.2 do not require support for 19-digit ICCIDs, so let's always # encode it with padding F at the end. if len(data) == 19: data += 'F' return data @classmethod def verify_encoded(cls, data): if len(data) not in [19, 20]: raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) @classmethod def _decode(cls, data): # strip trailing padding (if it's 20 digits) if len(data) == 20 and data[-1] in ['F', 'f']: data = data[:-1] return data @classmethod def verify_decoded(cls, data): data = str(data) if len(data) not in [19, 20]: raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) if len(data) == 19: decimal_part = data else: decimal_part = data[:-1] final_part = data[-1:] if final_part not in ['F', 'f'] and not final_part.isdecimal(): raise ValueError('ICCID (%s) contains non-decimal characters' % data) if not decimal_part.isdecimal(): raise ValueError('ICCID (%s) contains non-decimal characters' % data) class Eid(ApiParamString): """String of 32 decimal characters""" @classmethod def verify_encoded(cls, data): if len(data) != 32: raise ValueError('EID length invalid: "%s" (%u)' % (data, len(data))) @classmethod def verify_decoded(cls, data): if not data.isdecimal(): raise ValueError('EID (%s) contains non-decimal characters' % data) class ProfileType(ApiParamString): pass class MatchingId(ApiParamString): pass class ConfirmationCode(ApiParamString): pass class SmdsAddress(ApiParamFqdn): pass class SmdpAddress(ApiParamFqdn): pass class ReleaseFlag(ApiParamBoolean): pass class FinalProfileStatusIndicator(ApiParamString): pass class Timestamp(ApiParamString): """String format as specified by W3C: YYYY-MM-DDThh:mm:ssTZD""" @classmethod def _decode(cls, data): return datetime.fromisoformat(data) @classmethod def _encode(cls, data): return datetime.toisoformat(data) class NotificationPointId(ApiParamInteger): pass class NotificationPointStatus(ApiParam): pass class ResultData(ApiParam): @classmethod def _decode(cls, data): return base64.b64decode(data) @classmethod def _encode(cls, data): return base64.b64encode(data) class JsonResponseHeader(ApiParam): """SGP.22 section 6.5.1.4.""" @classmethod def verify_decoded(cls, data): fe_status = data.get('functionExecutionStatus') if not fe_status: raise ValueError('Missing mandatory functionExecutionStatus in header') status = fe_status.get('status') if not status: raise ValueError('Missing mandatory status in header functionExecutionStatus') if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']: raise ValueError('Unknown/unspecified status "%s"' % status) class HttpStatusError(Exception): pass class HttpHeaderError(Exception): pass class Es2PlusApiError(Exception): """Exception representing an error at the ES2+ API level (status != Executed).""" def __init__(self, func_ex_status: dict): self.status = func_ex_status['status'] sec = { 'subjectCode': None, 'reasonCode': None, 'subjectIdentifier': None, 'message': None, } actual_sec = func_ex_status.get('statusCodeData', None) sec.update(actual_sec) self.subject_code = sec['subjectCode'] self.reason_code = sec['reasonCode'] self.subject_id = sec['subjectIdentifier'] self.message = sec['message'] def __str__(self): return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")' class Es2PlusApiFunction(abc.ABC): """Base classs for representing an ES2+ API Function.""" # the below class variables are expected to be overridden in derived classes path = None # dictionary of input parameters. key is parameter name, value is ApiParam class input_params = {} # list of mandatory input parameters input_mandatory = [] # dictionary of output parameters. key is parameter name, value is ApiParam class output_params = {} # list of mandatory output parameters (for successful response) output_mandatory = [] # expected HTTP status code of the response expected_http_status = 200 def __init__(self, url_prefix: str, func_req_id: str, session): self.url_prefix = url_prefix self.func_req_id = func_req_id self.session = session def encode(self, data: dict, func_call_id: str) -> dict: """Validate an encode input dict into JSON-serializable dict for request body.""" output = { 'header': { 'functionRequesterIdentifier': self.func_req_id, 'functionCallIdentifier': func_call_id } } for p in self.input_mandatory: if not p in data: raise ValueError('Mandatory input parameter %s missing' % p) for p, v in data.items(): p_class = self.input_params.get(p) if not p_class: logger.warning('Unexpected/unsupported input parameter %s=%s', p, v) output[p] = v else: output[p] = p_class.encode(v) return output def decode(self, data: dict) -> dict: """[further] Decode and validate the JSON-Dict of the respnse body.""" output = {} # let's first do the header, it's special if not 'header' in data: raise ValueError('Mandatory output parameter "header" missing') hdr_class = self.output_params.get('header') output['header'] = hdr_class.decode(data['header']) if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']: raise Es2PlusApiError(output['header']['functionExecutionStatus']) # we can only expect mandatory parameters to be present in case of successful execution for p in self.output_mandatory: if p == 'header': continue if not p in data: raise ValueError('Mandatory output parameter "%s" missing' % p) for p, v in data.items(): p_class = self.output_params.get(p) if not p_class: logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v) output[p] = v else: output[p] = p_class.decode(v) return output def call(self, data: dict, func_call_id:str, timeout=10) -> dict: """Make an API call to the ES2+ API endpoint represented by this object. Input data is passed in `data` as json-serializable dict. Output data is returned as json-deserialized dict.""" url = self.url_prefix + self.path encoded = json.dumps(self.encode(data, func_call_id)) headers = { 'Content-Type': 'application/json', 'X-Admin-Protocol': 'gsma/rsp/v2.5.0', } logger.debug("HTTP REQ %s - '%s'" % (url, encoded)) response = self.session.post(url, data=encoded, headers=headers, timeout=timeout) logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers)) logger.debug("HTTP RSP: %s" % (response.content)) if response.status_code != self.expected_http_status: raise HttpStatusError(response) if not response.headers.get('Content-Type').startswith(headers['Content-Type']): raise HttpHeaderError(response) if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'): raise HttpHeaderError(response) return self.decode(response.json()) # ES2+ DownloadOrder function (SGP.22 section 5.3.1) class DownloadOrder(Es2PlusApiFunction): path = '/gsma/rsp2/es2plus/downloadOrder' input_params = { 'eid': param.Eid, 'iccid': param.Iccid, 'profileType': param.ProfileType } output_params = { 'header': param.JsonResponseHeader, 'iccid': param.Iccid, } output_mandatory = ['header', 'iccid'] # ES2+ ConfirmOrder function (SGP.22 section 5.3.2) class ConfirmOrder(Es2PlusApiFunction): path = '/gsma/rsp2/es2plus/confirmOrder' input_params = { 'iccid': param.Iccid, 'eid': param.Eid, 'matchingId': param.MatchingId, 'confirmationCode': param.ConfirmationCode, 'smdsAddress': param.SmdsAddress, 'releaseFlag': param.ReleaseFlag, } input_mandatory = ['iccid', 'releaseFlag'] output_params = { 'header': param.JsonResponseHeader, 'eid': param.Eid, 'matchingId': param.MatchingId, 'smdpAddress': param.SmdpAddress, } output_mandatory = ['header', 'matchingId'] # ES2+ CancelOrder function (SGP.22 section 5.3.3) class CancelOrder(Es2PlusApiFunction): path = '/gsma/rsp2/es2plus/cancelOrder' input_params = { 'iccid': param.Iccid, 'eid': param.Eid, 'matchingId': param.MatchingId, 'finalProfileStatusIndicator': param.FinalProfileStatusIndicator, } input_mandatory = ['finalProfileStatusIndicator', 'iccid'] output_params = { 'header': param.JsonResponseHeader, } output_mandatory = ['header'] # ES2+ ReleaseProfile function (SGP.22 section 5.3.4) class ReleaseProfile(Es2PlusApiFunction): path = '/gsma/rsp2/es2plus/releaseProfile' input_params = { 'iccid': param.Iccid, } input_mandatory = ['iccid'] output_params = { 'header': param.JsonResponseHeader, } output_mandatory = ['header'] # ES2+ HandleDownloadProgress function (SGP.22 section 5.3.5) class HandleDownloadProgressInfo(Es2PlusApiFunction): path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo' input_params = { 'eid': param.Eid, 'iccid': param.Iccid, 'profileType': param.ProfileType, 'timestamp': param.Timestamp, 'notificationPointId': param.NotificationPointId, 'notificationPointStatus': param.NotificationPointStatus, 'resultData': param.ResultData, } input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus'] expected_http_status = 204 class Es2pApiClient: """Main class representing a full ES2+ API client. Has one method for each API function.""" def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None): self.func_id = 0 self.session = requests.Session() if server_cert_verify: self.session.verify = server_cert_verify if client_cert: self.session.cert = client_cert self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session) self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session) self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session) self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session) self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session) def _gen_func_id(self) -> str: """Generate the next function call id.""" self.func_id += 1 return 'FCI-%u-%u' % (time.time(), self.func_id) def call_downloadOrder(self, data: dict) -> dict: """Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1).""" return self.downloadOrder.call(data, self._gen_func_id()) def call_confirmOrder(self, data: dict) -> dict: """Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2).""" return self.confirmOrder.call(data, self._gen_func_id()) def call_cancelOrder(self, data: dict) -> dict: """Perform ES2+ CancelOrder function (SGP.22 section 5.3.3).""" return self.cancelOrder.call(data, self._gen_func_id()) def call_releaseProfile(self, data: dict) -> dict: """Perform ES2+ CancelOrder function (SGP.22 section 5.3.4).""" return self.releaseProfile.call(data, self._gen_func_id()) def call_handleDownloadProgressInfo(self, data: dict) -> dict: """Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5).""" return self.handleDownloadProgressInfo.call(data, self._gen_func_id())