Compare commits
13 Commits
laforge/ot
...
ewild/ossl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e27b5107b | ||
|
|
0d24f35776 | ||
|
|
89e6e0b0bc | ||
|
|
3196f2fadf | ||
|
|
f98b1a0080 | ||
|
|
dc5fdd34bf | ||
|
|
67995146eb | ||
|
|
ccefc98160 | ||
|
|
79805d1dd7 | ||
|
|
5969901be5 | ||
|
|
5316f2b1cc | ||
|
|
9572cbdb61 | ||
|
|
7fe7bff3d8 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -7,6 +7,10 @@
|
||||
/.local
|
||||
/build
|
||||
/pySim.egg-info
|
||||
/smdpp-data/sm-dp-sessions
|
||||
/smdpp-data/sm-dp-sessions*
|
||||
dist
|
||||
tags
|
||||
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem
|
||||
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
|
||||
smdpp-data/generated
|
||||
smdpp-data/certs/dhparam2048.pem
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||
perform commands on it."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
import argparse
|
||||
from osmocom.utils import b2h
|
||||
import websockets
|
||||
|
||||
from pySim.transport import init_reader, argparse_add_reader_args, LinkBase
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||
|
||||
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class CardWsClientBlocking(WsClientBlocking):
|
||||
"""Implementation of the card (reader) client of the WSRC (WebSocket Remote Card) protocol"""
|
||||
|
||||
def __init__(self, ws_uri, tp: LinkBase):
|
||||
super().__init__(ws_uri)
|
||||
self.tp = tp
|
||||
|
||||
def perform_outbound_hello(self):
|
||||
hello_data = {
|
||||
'client_type': 'card',
|
||||
'atr': b2h(self.tp.get_atr()),
|
||||
# TODO: include various card information in the HELLO message
|
||||
}
|
||||
super().perform_outbound_hello(hello_data)
|
||||
|
||||
def handle_rx_c_apdu(self, rx: dict):
|
||||
"""handle an inbound APDU transceive command"""
|
||||
data, sw = self.tp.send_apdu(rx['command'])
|
||||
tx = {
|
||||
'response': data,
|
||||
'sw': sw,
|
||||
}
|
||||
self.tx_json('r_apdu', tx)
|
||||
|
||||
def handle_rx_disconnect(self, rx: dict):
|
||||
"""server tells us to disconnect"""
|
||||
self.tx_json('disconnect_ack')
|
||||
# FIXME: tear down connection and/or terminate entire program
|
||||
|
||||
def handle_rx_print(self, rx: dict):
|
||||
"""print a message (text) given by server to the local console/log"""
|
||||
print(rx['message'])
|
||||
# no response
|
||||
|
||||
def handle_rx_reset_req(self, rx: dict):
|
||||
"""server tells us to reset the card"""
|
||||
self.tp.reset()
|
||||
self.tx_json('reset_resp', {'atr': b2h(self.tp.get_atr())})
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
argparse_add_reader_args(parser)
|
||||
parser.add_argument("--uri", default="ws://localhost:8765/",
|
||||
help="URI of the sever to which to connect")
|
||||
|
||||
opts = parser.parse_args()
|
||||
|
||||
# open the card reader / slot
|
||||
logger.info("Initializing Card Reader...")
|
||||
tp = init_reader(opts)
|
||||
logger.info("Connecting to Card...")
|
||||
tp.connect()
|
||||
scc = SimCardCommands(transport=tp)
|
||||
logger.info("Detected Card with ATR: %s" % b2h(tp.get_atr()))
|
||||
|
||||
# TODO: gather various information about the card; print it
|
||||
|
||||
# create + connect the client to the server
|
||||
cl = CardWsClientBlocking(opts.uri, tp)
|
||||
logger.info("Connecting to remote server...")
|
||||
try:
|
||||
cl.connect()
|
||||
print("Successfully connected to Server")
|
||||
except ConnectionRefusedError as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# endless loop: wait for inbound command from server + execute it
|
||||
cl.rx_and_execute_cmd()
|
||||
# TODO: clean handling of websocket disconnect
|
||||
except websockets.exceptions.ConnectionClosedOK as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt as e:
|
||||
print(e.__class__.__name__)
|
||||
sys.exit(2)
|
||||
@@ -1,241 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional, Tuple
|
||||
from websockets.asyncio.server import serve
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
from osmocom.utils import Hexstr, swap_nibbles
|
||||
|
||||
from pySim.utils import SwMatchstr, ResTuple, sw_match, dec_iccid
|
||||
from pySim.exceptions import SwMatchError
|
||||
|
||||
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
card_clients = set()
|
||||
user_clients = set()
|
||||
|
||||
class WsClient:
|
||||
def __init__(self, websocket, hello: dict):
|
||||
self.websocket = websocket
|
||||
self.hello = hello
|
||||
self.identity = {}
|
||||
|
||||
def __str__(self):
|
||||
return '%s(ws=%s)' % (self.__class__.__name__, self.websocket)
|
||||
|
||||
async def rx_json(self):
|
||||
rx = await self.websocket.recv()
|
||||
rx_js = json.loads(rx)
|
||||
logger.debug("Rx: %s", rx_js)
|
||||
assert 'msg_type' in rx
|
||||
return rx_js
|
||||
|
||||
async def tx_json(self, msg_type:str, d: dict = {}):
|
||||
"""Transmit a json-serializable dict to the peer"""
|
||||
d['msg_type'] = msg_type
|
||||
d_js = json.dumps(d)
|
||||
logger.debug("Tx: %s", d_js)
|
||||
await self.websocket.send(d_js)
|
||||
|
||||
async def tx_hello_ack(self):
|
||||
await self.tx_json('hello_ack')
|
||||
|
||||
async def xceive_json(self, msg_type:str, d:dict = {}, exp_msg_type:Optional[str] = None) -> dict:
|
||||
await self.tx_json(msg_type, d)
|
||||
rx = await self.rx_json()
|
||||
if exp_msg_type:
|
||||
assert rx['msg_type'] == exp_msg_type
|
||||
return rx;
|
||||
|
||||
async def tx_error(self, message: str):
|
||||
"""Transmit an error message to the peer"""
|
||||
event = {
|
||||
"message": message,
|
||||
}
|
||||
await self.tx_json('error', event)
|
||||
|
||||
async def ws_hdlr(self):
|
||||
"""kind of a 'main' function for the websocket client: wait for incoming message,
|
||||
and handle it."""
|
||||
try:
|
||||
async for message in self.websocket:
|
||||
method = getattr(self, 'handle_rx_%s' % message['msg_type'], None)
|
||||
if not method:
|
||||
await self.tx_error("Unknonw msg_type: %s" % message['msg_type'])
|
||||
else:
|
||||
method(message)
|
||||
except ConnectionClosedError:
|
||||
# we handle this in the outer loop
|
||||
pass
|
||||
|
||||
class CardClient(WsClient):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.state = 'init'
|
||||
|
||||
def __str__(self):
|
||||
eid = self.identity.get('EID', None)
|
||||
if eid:
|
||||
return '%s(EID=%s)' % (self.__class__.__name__, eid)
|
||||
iccid = self.identity.get('ICCID', None)
|
||||
if iccid:
|
||||
return '%s(ICCID=%s)' % (self.__class__.__name__, iccid)
|
||||
return super().__str__()
|
||||
|
||||
"""A websocket client that represents a reader/card. This is what we use to talk to a card"""
|
||||
async def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||
"""transceive a single APDU with the card"""
|
||||
message = await self.xceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||
return message['response'], message['sw']
|
||||
|
||||
async def xceive_apdu(self, pdu: Hexstr) -> ResTuple:
|
||||
"""transceive an APDU with the card, handling T=0 GET_RESPONSE cases"""
|
||||
prev_pdu = pdu
|
||||
data, sw = await self.xceive_apdu_raw(pdu)
|
||||
|
||||
if sw is not None:
|
||||
while (sw[0:2] in ['9f', '61', '62', '63']):
|
||||
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
|
||||
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
|
||||
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
|
||||
pdu_gr = pdu[0:2] + 'c00000' + sw[2:4]
|
||||
prev_pdu = pdu_gr
|
||||
d, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||
data += d
|
||||
if sw[0:2] == '6c':
|
||||
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
|
||||
pdu_gr = prev_pdu[0:8] + sw[2:4]
|
||||
data, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||
|
||||
return data, sw
|
||||
|
||||
async def xceive_apdu_checksw(self, pdu: Hexstr, sw: SwMatchstr = "9000") -> ResTuple:
|
||||
"""like xceive_apdu, but checking the status word matches the expected pattern"""
|
||||
rv = await self.xceive_apdu(pdu)
|
||||
last_sw = rv[1]
|
||||
|
||||
if not sw_match(rv[1], sw):
|
||||
raise SwMatchError(rv[1], sw.lower())
|
||||
return rv
|
||||
|
||||
async def card_reset(self):
|
||||
"""reset the card"""
|
||||
rx = await self.xceive_json('reset_req', exp_msg_type='reset_resp')
|
||||
|
||||
async def get_iccid(self):
|
||||
"""high-level method to obtain the ICCID of the card"""
|
||||
await self.xceive_apdu_checksw('00a40000023f00') # SELECT MF
|
||||
await self.xceive_apdu_checksw('00a40000022fe2') # SELECT EF.ICCID
|
||||
res, sw = await self.xceive_apdu_checksw('00b0000000') # READ BINARY
|
||||
return dec_iccid(res)
|
||||
|
||||
async def get_eid_sgp22(self):
|
||||
"""high-level method to obtain the EID of a SGP.22 consumer eUICC"""
|
||||
await self.xceive_apdu_checksw('00a4040410a0000005591010ffffffff8900000100')
|
||||
res, sw = await self.xceive_apdu_checksw('80e2910006bf3e035c015a')
|
||||
return res[-32:]
|
||||
|
||||
async def identify(self):
|
||||
# identify the card by asking for its EID and/or ICCID
|
||||
try:
|
||||
eid = await self.get_eid_sgp22()
|
||||
logger.debug("EID: %s", eid)
|
||||
self.identity['EID'] = eid
|
||||
except SwMatchError:
|
||||
pass
|
||||
try:
|
||||
iccid = await self.get_iccid()
|
||||
logger.debug("ICCID: %s", iccid)
|
||||
self.identity['ICCID'] = iccid
|
||||
except SwMatchError:
|
||||
pass
|
||||
logger.info("Card now in READY state")
|
||||
self.state = 'ready'
|
||||
|
||||
@staticmethod
|
||||
def find_client_for_id(id_type: str, id_str: str) -> Optional['CardClient']:
|
||||
for c in card_clients:
|
||||
print("testing card %s in state %s" % (c, c.state))
|
||||
if c.state != 'ready':
|
||||
continue
|
||||
c_id = c.identity.get(id_type.upper(), None)
|
||||
if c_id and c_id.lower() == id_str.lower():
|
||||
return c
|
||||
return None
|
||||
|
||||
class UserClient(WsClient):
|
||||
"""A websocket client representing a user application like pySim-shell."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.state = 'init'
|
||||
|
||||
async def state_init(self):
|
||||
"""Wait for incoming 'select_card' and process it."""
|
||||
while True:
|
||||
rx = await self.rx_json()
|
||||
if rx['msg_type'] == 'select_card':
|
||||
# look-up if the card can be found
|
||||
card = CardClient.find_client_for_id(rx['id_type'], rx['id_str'])
|
||||
if not card:
|
||||
await self.tx_error('No CardClient found for %s == %s' % (rx['id_type'], rx['id_str']))
|
||||
continue
|
||||
# transition to next statee
|
||||
self.state = 'associated'
|
||||
card.state = 'associated'
|
||||
self.card = card
|
||||
await self.tx_json('select_card_ack', {'identities': card.identity})
|
||||
break
|
||||
else:
|
||||
self.tx_error('Unknown message type %s' % rx['msg_type'])
|
||||
|
||||
async def state_selected(self):
|
||||
while True:
|
||||
rx = await self.rx_json()
|
||||
if rx['msg_type'] == 'c_apdu':
|
||||
rsp, sw = await self.card.xceive_apdu_raw(rx['command'])
|
||||
await self.tx_json('r_apdu', {'response': rsp, 'sw': sw})
|
||||
|
||||
|
||||
async def ws_conn_hdlr(websocket):
|
||||
rx_raw = await websocket.recv()
|
||||
rx = json.loads(rx_raw)
|
||||
assert rx['msg_type'] == 'hello'
|
||||
client_type = rx['client_type']
|
||||
logger.info("New client (type %s) connection accepted", client_type)
|
||||
|
||||
if client_type == 'card':
|
||||
card = CardClient(websocket, rx)
|
||||
await card.tx_hello_ack()
|
||||
card_clients.add(card)
|
||||
# first obtain the identity of the card
|
||||
await card.identify()
|
||||
# then go into the "main loop"
|
||||
try:
|
||||
await card.ws_hdlr()
|
||||
finally:
|
||||
logger.info("%s: connection closed", card)
|
||||
card_clients.remove(card)
|
||||
elif client_type == 'user':
|
||||
user = UserClient(websocket, rx)
|
||||
await user.tx_hello_ack()
|
||||
user_clients.add(user)
|
||||
# first wait for the user to specify the select the card
|
||||
await user.state_init()
|
||||
try:
|
||||
await user.state_selected()
|
||||
finally:
|
||||
logger.info("%s: connection closed", user)
|
||||
user_clients.remove(user)
|
||||
else:
|
||||
logger.info("Rejecting client (unknown type %s) connection", client_type)
|
||||
raise ValueError
|
||||
|
||||
|
||||
async def main():
|
||||
async with serve(ws_conn_hdlr, "localhost", 8765):
|
||||
await asyncio.get_running_loop().create_future() # run forever
|
||||
|
||||
asyncio.run(main())
|
||||
@@ -1,206 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
# This is a script to generate a [partial] eSIM profile from the 'fsdump' of another USIM/ISIM. This is
|
||||
# useful to generate an "as close as possible" eSIM from a physical USIM, as far as that is possible
|
||||
# programmatically and in a portable way.
|
||||
#
|
||||
# Of course, this really only affects the file sytem aspects of the card. It's not possible
|
||||
# to read the K/OPc or other authentication related parameters off a random USIM, and hence
|
||||
# we cannot replicate that. Similarly, it's not possible to export the java applets from a USIM,
|
||||
# and hence we cannot replicate those.
|
||||
|
||||
import argparse
|
||||
|
||||
from pySim.esim.saip import *
|
||||
from pySim.ts_102_221 import *
|
||||
|
||||
class FsdumpToSaip:
|
||||
def __init__(self, pes: ProfileElementSequence):
|
||||
self.pes = pes
|
||||
|
||||
@staticmethod
|
||||
def fcp_raw2saip_fcp(fname:str, fcp_raw: bytes) -> Dict:
|
||||
"""Convert a raw TLV-encoded FCP to a SAIP dict-format (as needed by asn1encode)."""
|
||||
ftype = fname.split('.')[0]
|
||||
# use the raw FCP as basis so we don't get stuck with potentially old decoder bugs
|
||||
# ore future format incompatibilities
|
||||
fcp = FcpTemplate()
|
||||
fcp.from_tlv(fcp_raw)
|
||||
r = {}
|
||||
|
||||
r['fileDescriptor'] = fcp.child_by_type(FileDescriptor).to_bytes()
|
||||
|
||||
file_id = fcp.child_by_type(FileIdentifier)
|
||||
if file_id:
|
||||
r['fileID'] = file_id.to_bytes()
|
||||
else:
|
||||
if ftype in ['ADF']:
|
||||
print('%s is an ADF but has no [mandatory] file_id!' % fname)
|
||||
#raise ValueError('%s is an ADF but has no [mandatory] file_id!' % fname)
|
||||
r['fileID'] = b'\x7f\xff' # FIXME: auto-increment
|
||||
|
||||
df_name = fcp.child_by_type(DfName)
|
||||
if ftype in ['ADF']:
|
||||
if not df_name:
|
||||
raise ValueError('%s is an ADF but has no [mandatory] df_name!' % fname)
|
||||
r['dfName'] = df_name.to_bytes()
|
||||
|
||||
lcsi_byte = fcp.child_by_type(LifeCycleStatusInteger).to_bytes()
|
||||
if lcsi_byte != b'\x05':
|
||||
r['lcsi'] = lcsi_byte
|
||||
|
||||
sa_ref = fcp.child_by_type(SecurityAttribReferenced)
|
||||
if sa_ref:
|
||||
r['securityAttributesReferenced'] = sa_ref.to_bytes()
|
||||
|
||||
file_size = fcp.child_by_type(FileSize)
|
||||
if ftype in ['EF']:
|
||||
if file_size:
|
||||
r['efFileSize'] = file_size.to_bytes()
|
||||
|
||||
psdo = fcp.child_by_type(PinStatusTemplate_DO)
|
||||
if ftype in ['MF', 'ADF', 'DF']:
|
||||
if not psdo:
|
||||
raise ValueError('%s is an %s but has no [mandatory] PinStatusTemplateDO' % fname)
|
||||
else:
|
||||
r['pinStatusTemplateDO'] = psdo.to_bytes()
|
||||
|
||||
sfid = fcp.child_by_type(ShortFileIdentifier)
|
||||
if sfid and sfid.decoded:
|
||||
if ftype not in ['EF']:
|
||||
raise ValueError('%s is not an EF but has [forbidden] shortEFID' % fname)
|
||||
r['shortEFID'] = sfid.to_bytes()
|
||||
|
||||
pinfo = fcp.child_by_type(ProprietaryInformation)
|
||||
if pinfo and ftype in ['EF']:
|
||||
spinfo = pinfo.child_by_type(SpecialFileInfo)
|
||||
fill_p = pinfo.child_by_type(FillingPattern)
|
||||
repeat_p = pinfo.child_by_type(RepeatPattern)
|
||||
# only exists for BER-TLV files
|
||||
max_fsize = pinfo.child_by_type(MaximumFileSize)
|
||||
|
||||
if spinfo or fill_p or repeat_p or max_fsize:
|
||||
r['proprietaryEFInfo'] = {}
|
||||
if spinfo:
|
||||
r['proprietaryEFInfo']['specialFileInformation'] = spinfo.to_bytes()
|
||||
if fill_p:
|
||||
r['proprietaryEFInfo']['fillPattern'] = fill_p.to_bytes()
|
||||
if repeat_p:
|
||||
r['proprietaryEFInfo']['repeatPattern'] = repeat_p.to_bytes()
|
||||
if max_fsize:
|
||||
r['proprietaryEFInfo']['maximumFileSize'] = max_fsize.to_bytes()
|
||||
|
||||
# TODO: linkPath
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def fcp_fsdump2saip(fsdump_ef: Dict):
|
||||
"""Convert a file from its "fsdump" representation to the SAIP representation of a File type
|
||||
in the decoded format as used by the asn1tools-generated codec."""
|
||||
# first convert the FCP
|
||||
path = fsdump_ef['path']
|
||||
fdesc = FsdumpToSaip.fcp_raw2saip_fcp(path[-1], h2b(fsdump_ef['fcp_raw']))
|
||||
r = [
|
||||
('fileDescriptor', fdesc),
|
||||
]
|
||||
# then convert the body. We're doing a straight-forward conversion without any optimization
|
||||
# like not encoding all-ff files. This should be done by a subsequent optimizer
|
||||
if 'body' in fsdump_ef and fsdump_ef['body']:
|
||||
if isinstance(fsdump_ef['body'], list):
|
||||
for b_seg in fsdump_ef['body']:
|
||||
r.append(('fillFileContent', h2b(b_seg)))
|
||||
else:
|
||||
r.append(('fillFileContent', h2b(fsdump_ef['body'])))
|
||||
print(fsdump_ef['path'])
|
||||
return r
|
||||
|
||||
def add_file_from_fsdump(self, fsdump_ef: Dict):
|
||||
fid = int(fsdump_ef['fcp']['file_identifier'])
|
||||
# determine NAA
|
||||
if fsdump_ef['path'][0:1] == ['MF', 'ADF.USIM']:
|
||||
naa = NaaUsim
|
||||
elif fsdump_ef['path'][0:1] == ['MF', 'ADF.ISIM']:
|
||||
naa = NaaIsim
|
||||
else:
|
||||
print("Unable to determine NAA for %s" % fsdump_ef['path'])
|
||||
return
|
||||
pes.pes_by_naa[naa.name]
|
||||
for pe in pes:
|
||||
print("PE %s" % pe)
|
||||
if not isinstance(pe, FsProfileElement):
|
||||
print("Skipping PE %s" % pe)
|
||||
continue
|
||||
if not pe.template:
|
||||
print("No template for PE %s" % pe )
|
||||
continue
|
||||
if not fid in pe.template.files_by_fid:
|
||||
print("File %04x not available in template; must create it using GenericFileMgmt" % fid)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('fsdump', help='')
|
||||
|
||||
def has_unsupported_path_prefix(path: List[str]) -> bool:
|
||||
# skip some paths from export as they don't exist in an eSIM profile
|
||||
UNSUPPORTED_PATHS = [
|
||||
['MF', 'DF.GSM'],
|
||||
]
|
||||
for p in UNSUPPORTED_PATHS:
|
||||
prefix = path[:len(p)]
|
||||
if prefix == p:
|
||||
return True
|
||||
# any ADF not USIM or ISIM are unsupported
|
||||
SUPPORTED_ADFS = [ 'ADF.USIM', 'ADF.ISIM' ]
|
||||
if len(path) == 2 and path[0] == 'MF' and path[1].startswith('ADF.') and path[1] not in SUPPORTED_ADFS:
|
||||
return True
|
||||
return False
|
||||
|
||||
import traceback
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = parser.parse_args()
|
||||
|
||||
with open(opts.fsdump, 'r') as f:
|
||||
fsdump = json.loads(f.read())
|
||||
|
||||
pes = ProfileElementSequence()
|
||||
|
||||
# FIXME: fsdump has strting-name path, but we need FID-list path for ProfileElementSequence
|
||||
for path, fsdump_ef in fsdump['files'].items():
|
||||
print("=" * 80)
|
||||
#print(fsdump_ef)
|
||||
if not 'fcp_raw' in fsdump_ef:
|
||||
continue
|
||||
if has_unsupported_path_prefix(fsdump_ef['path']):
|
||||
print("Skipping eSIM-unsupported path %s" % ('/'.join(fsdump_ef['path'])))
|
||||
continue
|
||||
saip_dec = FsdumpToSaip.fcp_fsdump2saip(fsdump_ef)
|
||||
#print(saip_dec)
|
||||
try:
|
||||
f = pes.add_file_at_path(Path(path), saip_dec)
|
||||
print(repr(f))
|
||||
except Exception as e:
|
||||
print("EXCEPTION: %s" % traceback.format_exc())
|
||||
#print("EXCEPTION: %s" % e)
|
||||
continue
|
||||
|
||||
print("=== Tree === ")
|
||||
pes.mf.print_tree()
|
||||
|
||||
# FIXME: export the actual PE Sequence
|
||||
|
||||
661
contrib/generate_smdpp_certs.py
Executable file
661
contrib/generate_smdpp_certs.py
Executable file
@@ -0,0 +1,661 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Faithfully reproduces the smdpp certs contained in SGP.26_v1.5_Certificates_18_07_2024.zip
|
||||
available at https://www.gsma.com/solutions-and-impact/technologies/esim/gsma_resources/sgp-26-test-certificate-definition-v1-5/
|
||||
Only usable for testing, it obviously uses a different CI key.
|
||||
"""
|
||||
|
||||
import os
|
||||
import binascii
|
||||
from datetime import datetime
|
||||
from cryptography import x509
|
||||
from cryptography.x509.oid import NameOID, ExtensionOID
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
# Custom OIDs used in certificates
|
||||
OID_CERTIFICATE_POLICIES_CI = "2.23.146.1.2.1.0" # CI cert policy
|
||||
OID_CERTIFICATE_POLICIES_TLS = "2.23.146.1.2.1.3" # DPtls cert policy
|
||||
OID_CERTIFICATE_POLICIES_AUTH = "2.23.146.1.2.1.4" # DPauth cert policy
|
||||
OID_CERTIFICATE_POLICIES_PB = "2.23.146.1.2.1.5" # DPpb cert policy
|
||||
|
||||
# Subject Alternative Name OIDs
|
||||
OID_CI_RID = "2.999.1" # CI Registered ID
|
||||
OID_DP_RID = "2.999.10" # DP+ Registered ID
|
||||
OID_DP2_RID = "2.999.12" # DP+2 Registered ID
|
||||
OID_DP4_RID = "2.999.14" # DP+4 Registered ID
|
||||
OID_DP8_RID = "2.999.18" # DP+8 Registered ID
|
||||
|
||||
|
||||
class SimplifiedCertificateGenerator:
|
||||
def __init__(self):
|
||||
self.backend = default_backend()
|
||||
# Store generated CI keys to sign other certs
|
||||
self.ci_certs = {} # {"BRP": cert, "NIST": cert}
|
||||
self.ci_keys = {} # {"BRP": key, "NIST": key}
|
||||
|
||||
def get_curve(self, curve_type):
|
||||
"""Get the appropriate curve object."""
|
||||
if curve_type == "BRP":
|
||||
return ec.BrainpoolP256R1()
|
||||
else:
|
||||
return ec.SECP256R1()
|
||||
|
||||
def generate_key_pair(self, curve):
|
||||
"""Generate a new EC key pair."""
|
||||
private_key = ec.generate_private_key(curve, self.backend)
|
||||
return private_key
|
||||
|
||||
def load_private_key_from_hex(self, hex_key, curve):
|
||||
"""Load EC private key from hex string."""
|
||||
key_bytes = binascii.unhexlify(hex_key.replace(":", "").replace(" ", "").replace("\n", ""))
|
||||
key_int = int.from_bytes(key_bytes, 'big')
|
||||
return ec.derive_private_key(key_int, curve, self.backend)
|
||||
|
||||
def generate_ci_cert(self, curve_type):
|
||||
"""Generate CI certificate for either BRP or NIST curve."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.generate_key_pair(curve)
|
||||
|
||||
# Build subject and issuer (self-signed) - same for both
|
||||
subject = issuer = x509.Name([
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "Test CI"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "TESTCERT"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSPTEST"),
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "IT"),
|
||||
])
|
||||
|
||||
# Build certificate - all parameters same for both
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(issuer)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 8, 27, 51))
|
||||
builder = builder.not_valid_after(datetime(2055, 4, 1, 8, 27, 51))
|
||||
builder = builder.serial_number(0xb874f3abfa6c44d3)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
# Add extensions - all same for both
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.BasicConstraints(ca=True, path_length=None),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_CI),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=False,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=True,
|
||||
crl_sign=True,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier(OID_CI_RID))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(private_key, hashes.SHA256(), self.backend)
|
||||
|
||||
self.ci_keys[curve_type] = private_key
|
||||
self.ci_certs[curve_type] = certificate
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_dp_cert(self, curve_type, subject_cn, serial, key_hex,
|
||||
cert_policy_oid, rid_oid, validity_start, validity_end):
|
||||
"""Generate a DP certificate signed by CI - works for both BRP and NIST."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(validity_start)
|
||||
builder = builder.not_valid_after(validity_end)
|
||||
builder = builder.serial_number(serial)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(cert_policy_oid),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_tls_cert(self, curve_type, subject_cn, dns_name, serial, key_hex,
|
||||
rid_oid, validity_start, validity_end):
|
||||
"""Generate a TLS certificate signed by CI."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ACME"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(validity_start)
|
||||
builder = builder.not_valid_after(validity_end)
|
||||
builder = builder.serial_number(serial)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.ExtendedKeyUsage([
|
||||
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
|
||||
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier(OID_CERTIFICATE_POLICIES_TLS),
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.DNSName(dns_name),
|
||||
x509.RegisteredID(x509.ObjectIdentifier(rid_oid))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-A.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
),
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_eum_cert(self, curve_type, key_hex):
|
||||
"""Generate EUM certificate signed by CI."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
ci_cert = self.ci_certs[curve_type]
|
||||
ci_key = self.ci_keys[curve_type]
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "EUM Test"),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(ci_cert.subject)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 28, 37))
|
||||
builder = builder.not_valid_after(datetime(2054, 3, 24, 9, 28, 37))
|
||||
builder = builder.serial_number(0x12345678)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(ci_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=False,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=True,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier("2.23.146.1.2.1.2"), # EUM policy
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectAlternativeName([
|
||||
x509.RegisteredID(x509.ObjectIdentifier("2.999.5"))
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.BasicConstraints(ca=True, path_length=0),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CRLDistributionPoints([
|
||||
x509.DistributionPoint(
|
||||
full_name=[x509.UniformResourceIdentifier("http://ci.test.example.com/CRL-B.crl")],
|
||||
relative_name=None,
|
||||
reasons=None,
|
||||
crl_issuer=None
|
||||
)
|
||||
]),
|
||||
critical=False
|
||||
)
|
||||
|
||||
# Name Constraints
|
||||
constrained_name = x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032"),
|
||||
])
|
||||
|
||||
name_constraints = x509.NameConstraints(
|
||||
permitted_subtrees=[
|
||||
x509.DirectoryName(constrained_name)
|
||||
],
|
||||
excluded_subtrees=None
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
name_constraints,
|
||||
critical=True
|
||||
)
|
||||
|
||||
certificate = builder.sign(ci_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def generate_euicc_cert(self, curve_type, eum_cert, eum_key, key_hex):
|
||||
"""Generate eUICC certificate signed by EUM."""
|
||||
curve = self.get_curve(curve_type)
|
||||
private_key = self.load_private_key_from_hex(key_hex, curve)
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RSP Test EUM"),
|
||||
x509.NameAttribute(NameOID.SERIAL_NUMBER, "89049032123451234512345678901235"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "Test eUICC"),
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(eum_cert.subject)
|
||||
builder = builder.not_valid_before(datetime(2020, 4, 1, 9, 48, 58))
|
||||
builder = builder.not_valid_after(datetime(7496, 1, 24, 9, 48, 58))
|
||||
builder = builder.serial_number(0x0200000000000001)
|
||||
builder = builder.public_key(private_key.public_key())
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier.from_issuer_public_key(eum_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
|
||||
critical=False
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.KeyUsage(
|
||||
digital_signature=True,
|
||||
content_commitment=False,
|
||||
key_encipherment=False,
|
||||
data_encipherment=False,
|
||||
key_agreement=False,
|
||||
key_cert_sign=False,
|
||||
crl_sign=False,
|
||||
encipher_only=False,
|
||||
decipher_only=False
|
||||
),
|
||||
critical=True
|
||||
)
|
||||
|
||||
builder = builder.add_extension(
|
||||
x509.CertificatePolicies([
|
||||
x509.PolicyInformation(
|
||||
x509.ObjectIdentifier("2.23.146.1.2.1.1"), # eUICC policy
|
||||
policy_qualifiers=None
|
||||
)
|
||||
]),
|
||||
critical=True
|
||||
)
|
||||
|
||||
certificate = builder.sign(eum_key, hashes.SHA256(), self.backend)
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def save_cert_and_key(self, cert, key, cert_path_der, cert_path_pem, key_path_sk, key_path_pk):
|
||||
"""Save certificate and key in various formats."""
|
||||
# Create directories if needed
|
||||
os.makedirs(os.path.dirname(cert_path_der), exist_ok=True)
|
||||
|
||||
with open(cert_path_der, "wb") as f:
|
||||
f.write(cert.public_bytes(serialization.Encoding.DER))
|
||||
|
||||
if cert_path_pem:
|
||||
with open(cert_path_pem, "wb") as f:
|
||||
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
if key and key_path_sk:
|
||||
with open(key_path_sk, "wb") as f:
|
||||
f.write(key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
))
|
||||
|
||||
if key and key_path_pk:
|
||||
with open(key_path_pk, "wb") as f:
|
||||
f.write(key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
))
|
||||
|
||||
|
||||
def main():
|
||||
gen = SimplifiedCertificateGenerator()
|
||||
|
||||
output_dir = "smdpp-data/generated"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
print("=== Generating CI Certificates ===")
|
||||
|
||||
for curve_type in ["BRP", "NIST"]:
|
||||
ci_cert, ci_key = gen.generate_ci_cert(curve_type)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
ci_cert, ci_key,
|
||||
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.der",
|
||||
f"{output_dir}/CertificateIssuer/CERT_CI{suffix}.pem",
|
||||
None, None
|
||||
)
|
||||
print(f"Generated CI {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPauth Certificates ===")
|
||||
|
||||
dpauth_configs = [
|
||||
("BRP", "TEST SM-DP+", 256, "93:fb:33:d0:58:4f:34:9b:07:f8:b5:d2:af:93:d7:c3:e3:54:b3:49:a3:b9:13:50:2e:6a:bc:07:0e:4d:49:29", OID_DP_RID, "DPauth"),
|
||||
("NIST", "TEST SM-DP+", 256, "0a:7c:c1:c2:44:e6:0c:52:cd:5b:78:07:ab:8c:36:0c:26:52:46:01:50:7d:ca:bc:5d:d5:98:b5:a6:16:d5:d5", OID_DP_RID, "DPauth"),
|
||||
("BRP", "TEST SM-DP+2", 512, "0c:17:35:5c:01:1d:0f:e8:d7:da:dd:63:f1:97:85:cf:6c:51:cb:cd:46:6a:e8:8b:e8:f8:1b:c1:05:88:46:f6", OID_DP2_RID, "DP2auth"),
|
||||
("NIST", "TEST SM-DP+2", 512, "9c:32:a0:95:d4:88:42:d9:ff:a4:04:f7:12:51:2a:a2:c5:42:5a:1a:26:38:6a:b6:a1:45:d5:81:1e:03:91:41", OID_DP2_RID, "DP2auth"),
|
||||
]
|
||||
|
||||
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dpauth_configs:
|
||||
cert, key = gen.generate_dp_cert(
|
||||
curve_type, cn, serial, key_hex,
|
||||
OID_CERTIFICATE_POLICIES_AUTH, rid_oid,
|
||||
datetime(2020, 4, 1, 8, 31, 30),
|
||||
datetime(2030, 3, 30, 8, 31, 30)
|
||||
)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPauth/CERT_S_SM_{name_prefix}{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPauth/SK_S_SM_{name_prefix}{suffix}.pem",
|
||||
f"{output_dir}/DPauth/PK_S_SM_{name_prefix}{suffix}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPpb Certificates ===")
|
||||
|
||||
dppb_configs = [
|
||||
("BRP", "TEST SM-DP+", 257, "75:ff:32:2f:41:66:16:da:e1:a4:84:ef:71:d4:87:4f:b0:df:32:95:fd:35:c2:cb:a4:89:fb:b2:bb:9c:7b:f6", OID_DP_RID, "DPpb"),
|
||||
("NIST", "TEST SM-DP+", 257, "dc:d6:94:b7:78:95:7e:8e:9a:dd:bd:d9:44:33:e9:ef:8f:73:d1:1e:49:1c:48:d4:25:a3:8a:94:91:bd:3b:ed", OID_DP_RID, "DPpb"),
|
||||
("BRP", "TEST SM-DP+2", 513, "9c:ae:2e:1a:56:07:a9:d5:78:38:2e:ee:93:2e:25:1f:52:30:4f:86:ee:b1:f1:70:8c:db:d3:c0:7b:e2:cd:3d", OID_DP2_RID, "DP2pb"),
|
||||
("NIST", "TEST SM-DP+2", 513, "66:93:11:49:63:9d:ba:ac:1d:c3:d3:06:c5:8b:d2:df:d2:2f:73:bf:63:ac:86:31:98:32:90:b5:7f:90:93:45", OID_DP2_RID, "DP2pb"),
|
||||
]
|
||||
|
||||
for curve_type, cn, serial, key_hex, rid_oid, name_prefix in dppb_configs:
|
||||
cert, key = gen.generate_dp_cert(
|
||||
curve_type, cn, serial, key_hex,
|
||||
OID_CERTIFICATE_POLICIES_PB, rid_oid,
|
||||
datetime(2020, 4, 1, 8, 34, 46),
|
||||
datetime(2030, 3, 30, 8, 34, 46)
|
||||
)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPpb/CERT_S_SM_{name_prefix}{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPpb/SK_S_SM_{name_prefix}{suffix}.pem",
|
||||
f"{output_dir}/DPpb/PK_S_SM_{name_prefix}{suffix}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating DPtls Certificates ===")
|
||||
|
||||
dptls_configs = [
|
||||
("BRP", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "3f:67:15:28:02:b3:f4:c7:fa:e6:79:58:55:f6:82:54:1e:45:e3:5e:ff:f4:e8:a0:55:65:a0:f1:91:2a:78:2e", OID_DP_RID, "DP_TLS_BRP"),
|
||||
("NIST", "testsmdpplus1.example.com", "testsmdpplus1.example.com", 9, "a0:3e:7c:e4:55:04:74:be:a4:b7:a8:73:99:ce:5a:8c:9f:66:1b:68:0f:94:01:39:ff:f8:4e:9d:ec:6a:4d:8c", OID_DP_RID, "DP_TLS_NIST"),
|
||||
("NIST", "testsmdpplus2.example.com", "testsmdpplus2.example.com", 12, "4e:65:61:c6:40:88:f6:69:90:7a:db:e3:94:b1:1a:84:24:2e:03:3a:82:a8:84:02:31:63:6d:c9:1b:4e:e3:f5", OID_DP2_RID, "DP2_TLS"),
|
||||
("NIST", "testsmdpplus4.example.com", "testsmdpplus4.example.com", 14, "f2:65:9d:2f:52:8f:4b:11:37:40:d5:8a:0d:2a:f3:eb:2b:48:e1:22:c2:b6:0a:6a:f6:fc:96:ad:86:be:6f:a4", OID_DP4_RID, "DP4_TLS"),
|
||||
("NIST", "testsmdpplus8.example.com", "testsmdpplus8.example.com", 18, "ff:6e:4a:50:9b:ad:db:38:10:88:31:c2:3c:cc:2d:44:30:7a:f2:81:e9:25:96:7f:8c:df:1d:95:54:a0:28:8d", OID_DP8_RID, "DP8_TLS"),
|
||||
]
|
||||
|
||||
for curve_type, cn, dns, serial, key_hex, rid_oid, name_prefix in dptls_configs:
|
||||
cert, key = gen.generate_tls_cert(
|
||||
curve_type, cn, dns, serial, key_hex, rid_oid,
|
||||
datetime(2024, 7, 9, 15, 29, 36),
|
||||
datetime(2025, 8, 11, 15, 29, 36)
|
||||
)
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/DPtls/CERT_S_SM_{name_prefix}.der",
|
||||
None,
|
||||
f"{output_dir}/DPtls/SK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem",
|
||||
f"{output_dir}/DPtls/PK_S_SM_{name_prefix.replace('_BRP', '_BRP').replace('_NIST', '_NIST')}.pem"
|
||||
)
|
||||
print(f"Generated {name_prefix} certificate")
|
||||
|
||||
print("\n=== Generating EUM Certificates ===")
|
||||
|
||||
eum_configs = [
|
||||
("BRP", "12:9b:0a:b1:3f:17:e1:4a:40:b6:fa:4e:d8:23:e0:cf:46:5b:7b:3d:73:24:05:e6:29:5d:3b:23:b0:45:c9:9a"),
|
||||
("NIST", "25:e6:75:77:28:e1:e9:51:13:51:9c:dc:34:55:5c:29:ba:ed:23:77:3a:c5:af:dd:dc:da:d9:84:89:8a:52:f0"),
|
||||
]
|
||||
|
||||
eum_certs = {}
|
||||
eum_keys = {}
|
||||
|
||||
for curve_type, key_hex in eum_configs:
|
||||
cert, key = gen.generate_eum_cert(curve_type, key_hex)
|
||||
eum_certs[curve_type] = cert
|
||||
eum_keys[curve_type] = key
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/EUM/CERT_EUM{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/EUM/SK_EUM{suffix}.pem",
|
||||
f"{output_dir}/EUM/PK_EUM{suffix}.pem"
|
||||
)
|
||||
print(f"Generated EUM {curve_type} certificate")
|
||||
|
||||
print("\n=== Generating eUICC Certificates ===")
|
||||
|
||||
euicc_configs = [
|
||||
("BRP", "8d:c3:47:a7:6d:b7:bd:d6:22:2d:d7:5e:a1:a1:68:8a:ca:81:1e:4c:bc:6a:7f:6a:ef:a4:b2:64:19:62:0b:90"),
|
||||
("NIST", "11:e1:54:67:dc:19:4f:33:71:83:e4:60:c9:f6:32:60:09:1e:12:e8:10:26:cd:65:61:e1:7c:6d:85:39:cc:9c"),
|
||||
]
|
||||
|
||||
for curve_type, key_hex in euicc_configs:
|
||||
cert, key = gen.generate_euicc_cert(curve_type, eum_certs[curve_type], eum_keys[curve_type], key_hex)
|
||||
suffix = "_ECDSA_BRP" if curve_type == "BRP" else "_ECDSA_NIST"
|
||||
gen.save_cert_and_key(
|
||||
cert, key,
|
||||
f"{output_dir}/eUICC/CERT_EUICC{suffix}.der",
|
||||
None,
|
||||
f"{output_dir}/eUICC/SK_EUICC{suffix}.pem",
|
||||
f"{output_dir}/eUICC/PK_EUICC{suffix}.pem"
|
||||
)
|
||||
print(f"Generated eUICC {curve_type} certificate")
|
||||
|
||||
print("\n=== Certificate generation complete! ===")
|
||||
print(f"All certificates saved to: {output_dir}/")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -48,7 +48,6 @@ pySim consists of several parts:
|
||||
sim-rest
|
||||
suci-keytool
|
||||
saip-tool
|
||||
wsrc
|
||||
|
||||
|
||||
Indices and tables
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
WebSocket Remote Card (WSRC)
|
||||
============================
|
||||
|
||||
WSRC (*Web Socket Remote Card*) is a mechanism by which card readers can be made remotely available
|
||||
via a computer network. The transport mechanism is (as the name implies) a WebSocket. This transport
|
||||
method was chosen to be as firewall/NAT friendly as possible.
|
||||
|
||||
WSRC Network Architecture
|
||||
-------------------------
|
||||
|
||||
In a WSRC network, there are three major elements:
|
||||
|
||||
* The **WSRC Card Client** which exposes a locally attached smart card (usually via a Smart Card Reader)
|
||||
to a remote *WSRC Server*
|
||||
* The **WSRC Server** manges incoming connections from both *WSRC Card Clients* as well as *WSRC User Clients*
|
||||
* The **WSRC User Client** is a user application, like for example pySim-shell, which is accessing a remote
|
||||
card by connecting to the *WSRC Server* which relays the information to the selected *WSRC Card Client*
|
||||
|
||||
WSRC Protocol
|
||||
-------------
|
||||
|
||||
The WSRC protocl consits of JSON objects being sent over a websocket. The websocket communication itself
|
||||
is based on HTTP and should usually operate via TLS for security reasons.
|
||||
|
||||
The detailed protocol is currently still WIP. The plan is to document it here.
|
||||
|
||||
|
||||
pySim implementations
|
||||
---------------------
|
||||
|
||||
TBD
|
||||
1215
osmo-smdpp.py
1215
osmo-smdpp.py
File diff suppressed because it is too large
Load Diff
154
ota_test.py
154
ota_test.py
@@ -1,154 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
from pySim.ota import *
|
||||
from pySim.sms import SMS_SUBMIT, SMS_DELIVER, AddressField
|
||||
from pySim.utils import h2b, h2b
|
||||
|
||||
# pre-defined SPI values for use in test cases below
|
||||
SPI_CC_POR_CIPHERED_CC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True,
|
||||
'por_rc_cc_ds': 'cc',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_POR_UNCIPHERED_CC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'cc',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_POR_UNCIPHERED_NOCC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'no_rc_cc_ds',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
# SJA5 SAMPLE cards provisioned by execute_ipr.py
|
||||
OTA_KEYSET_SJA5_SAMPLES = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3,
|
||||
algo_auth='triple_des_cbc2', kid_idx=3,
|
||||
kic=h2b('300102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('301102030405060708090a0b0c0d0e0f'))
|
||||
|
||||
OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
|
||||
algo_auth='aes_cmac', kid_idx=2,
|
||||
kic=h2b('200102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('201102030405060708090a0b0c0d0e0f'))
|
||||
|
||||
# TS.48 profile on sysmoEUICC1-C2G
|
||||
OTA_KEYSET_C2G_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
|
||||
algo_auth='aes_cmac', kid_idx=2,
|
||||
kic=h2b('66778899AABBCCDD1122334455EEFF10'),
|
||||
kid=h2b('112233445566778899AABBCCDDEEFF10'))
|
||||
|
||||
|
||||
# ISD-R on sysmoEUICC1-C2G
|
||||
OTA_KEYSET_C2G_AES128_ISDR = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('B52F9C5938D1C19ED73E1AE772937FD7'),
|
||||
kid=h2b('3BC696ACD1EEC95A6624F7330D22FC81'))
|
||||
|
||||
# ISD-A on sysmoEUICC1-C2G
|
||||
OTA_KEYSET_C2G_AES128_ISDA = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('8DAAD1DAAA8D7C9000E3BBED8B7556E7'),
|
||||
kid=h2b('5392D503AE050DDEAF81AFAEFF275A2B'))
|
||||
|
||||
# TODO: AES192
|
||||
# TODO: AES256
|
||||
|
||||
testcases = [
|
||||
{
|
||||
'name': '3DES-SJA5-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
|
||||
}
|
||||
}, {
|
||||
'name': '3DES-SJA5-UNCIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
|
||||
}
|
||||
}, {
|
||||
'name': '3DES-SJA5-UNCIPHERED-NOCC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100000e0ab0001100000000000000016132',
|
||||
}
|
||||
}, {
|
||||
'name': 'AES128-C2G-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_C2G_AES128_ISDR,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
'request': {
|
||||
#'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'apdu': h2b('80ec800300'),
|
||||
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
for t in testcases:
|
||||
print()
|
||||
print("==== TESTCASE: %s" % t['name'])
|
||||
od = t['ota_keyset']
|
||||
|
||||
# RAM: B00000
|
||||
# SIM RFM: B00010
|
||||
# USIM RFM: B00011
|
||||
# ISD-R: 000001
|
||||
# ECASD: 000002
|
||||
tar = h2b('000001')
|
||||
|
||||
dialect = OtaDialectSms()
|
||||
outp = dialect.encode_cmd(od, tar, t['spi'], apdu=t['request']['apdu'])
|
||||
print("result: %s" % b2h(outp))
|
||||
#assert(b2h(outp) == t['request']['encoded_cmd'])
|
||||
|
||||
with_udh = b'\x02\x70\x00' + outp
|
||||
print("with_udh: %s" % b2h(with_udh))
|
||||
|
||||
|
||||
# processing of the response from the card
|
||||
da = AddressField('12345678', 'unknown', 'isdn_e164')
|
||||
#tpdu = SMS_SUBMIT(tp_udhi=True, tp_mr=0x23, tp_da=da, tp_pid=0x7F, tp_dcs=0xF6, tp_udl=3, tp_ud=with_udh)
|
||||
tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=da, tp_pid=0x7F, tp_dcs=0xF6, tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
|
||||
print("TPDU: %s" % tpdu)
|
||||
print("tpdu: %s" % b2h(tpdu.to_bytes()))
|
||||
#assert(b2h(tpdu.to_bytes()) == t['request']['encoded_tpdu'])
|
||||
|
||||
r = dialect.decode_resp(od, t['spi'], t['response']['encoded_resp'])
|
||||
print("RESP: ", r)
|
||||
@@ -23,7 +23,6 @@ from pySim.apdu_source.gsmtap import GsmtapApduSource
|
||||
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive
|
||||
from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap
|
||||
from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource
|
||||
from pySim.apdu_source.stdin_hex import StdinHexApduSource
|
||||
|
||||
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
|
||||
|
||||
@@ -33,11 +32,10 @@ logger = colorlog.getLogger()
|
||||
|
||||
# merge all of the command sets into one global set. This will override instructions,
|
||||
# the one from the 'last' set in the addition below will prevail.
|
||||
from pySim.apdu.ts_51_011 import ApduCommands as SimApduCommands
|
||||
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
|
||||
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
|
||||
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
|
||||
ApduCommands = SimApduCommands + UiccApduCommands + UsimApduCommands #+ GpApduCommands
|
||||
ApduCommands = UiccApduCommands + UsimApduCommands #+ GpApduCommands
|
||||
|
||||
|
||||
class DummySimLink(LinkBase):
|
||||
@@ -192,10 +190,6 @@ parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
|
||||
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
|
||||
help='Name of te log file to be read')
|
||||
|
||||
parser_stdin_hex = subparsers.add_parser('stdin-hex', help="""
|
||||
Read APDUs as hex-string from stdin.""")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
opts = option_parser.parse_args()
|
||||
@@ -211,8 +205,6 @@ if __name__ == '__main__':
|
||||
s = PysharkGsmtapPcap(opts.pcap_file)
|
||||
elif opts.source == 'tca-loader-log':
|
||||
s = TcaLoaderLogApduSource(opts.log_file)
|
||||
elif opts.source == 'stdin-hex':
|
||||
s = StdinHexApduSource()
|
||||
else:
|
||||
raise ValueError("unsupported source %s", opts.source)
|
||||
|
||||
|
||||
@@ -1,339 +0,0 @@
|
||||
# coding=utf-8
|
||||
"""APDU definitions/decoders of 3GPP TS 51.011, the classic SIM spec.
|
||||
|
||||
(C) 2022 by Harald Welte <laforge@osmocom.org>
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from construct import GreedyRange, Struct
|
||||
from pySim.construct import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.runtime import RuntimeLchan
|
||||
from pySim.apdu import ApduCommand, ApduCommandSet
|
||||
from typing import Optional, Dict, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# TS 51.011 Section 9.2.1
|
||||
class SimSelect(ApduCommand, n='SELECT', ins=0xA4, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
path = [self.cmd_data[i:i+2] for i in range(0, len(self.cmd_data), 2)]
|
||||
for file in path:
|
||||
file_hex = b2h(file)
|
||||
sels = lchan.selected_file.get_selectables(['FIDS'])
|
||||
if file_hex in sels:
|
||||
if self.successful:
|
||||
#print("\tSELECT %s" % sels[file_hex])
|
||||
lchan.selected_file = sels[file_hex]
|
||||
else:
|
||||
#print("\tSELECT %s FAILED" % sels[file_hex])
|
||||
pass
|
||||
continue
|
||||
logger.warning('SELECT UNKNOWN FID %s (%s)' % (file_hex, '/'.join([b2h(x) for x in path])))
|
||||
if len(self.cmd_data) != 2:
|
||||
raise ValueError('Expecting a 2-byte FID')
|
||||
|
||||
# decode the SELECT response
|
||||
if self.successful:
|
||||
self.file = lchan.selected_file
|
||||
if 'body' in self.rsp_dict:
|
||||
# not every SELECT is asking for the FCP in response...
|
||||
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
|
||||
return None
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.2
|
||||
class SimStatus(ApduCommand, n='STATUS', ins=0xF2, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
if self.successful:
|
||||
if 'body' in self.rsp_dict:
|
||||
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
|
||||
|
||||
def _decode_binary_p1p2(p1, p2) -> Dict:
|
||||
ret = {}
|
||||
if p1 & 0x80:
|
||||
ret['file'] = 'sfi'
|
||||
ret['sfi'] = p1 & 0x1f
|
||||
ret['offset'] = p2
|
||||
else:
|
||||
ret['file'] = 'currently_selected_ef'
|
||||
ret['offset'] = ((p1 & 0x7f) << 8) & p2
|
||||
return ret
|
||||
|
||||
# TS 51.011 Section 9.2.3 / 31.101
|
||||
class ReadBinary(ApduCommand, n='READ BINARY', ins=0xB0, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
def _decode_p1p2(self):
|
||||
return _decode_binary_p1p2(self.p1, self.p2)
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, TransparentEF):
|
||||
return b2h(self.rsp_data)
|
||||
# our decoders don't work for non-zero offsets / short reads
|
||||
if self.cmd_dict['offset'] != 0 or self.lr < self.file.size[0]:
|
||||
return b2h(self.rsp_data)
|
||||
method = getattr(self.file, 'decode_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.rsp_data)
|
||||
|
||||
# TS 51.011 Section 9.2.4 / 31.101
|
||||
class UpdateBinary(ApduCommand, n='UPDATE BINARY', ins=0xD6, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
def _decode_p1p2(self):
|
||||
return _decode_binary_p1p2(self.p1, self.p2)
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, TransparentEF):
|
||||
return b2h(self.rsp_data)
|
||||
# our decoders don't work for non-zero offsets / short writes
|
||||
if self.cmd_dict['offset'] != 0 or self.lc < self.file.size[0]:
|
||||
return b2h(self.cmd_data)
|
||||
method = getattr(self.file, 'decode_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.cmd_data)
|
||||
|
||||
def _decode_record_p1p2(p1, p2):
|
||||
ret = {}
|
||||
ret['record_number'] = p1
|
||||
if p2 >> 3 == 0:
|
||||
ret['file'] = 'currently_selected_ef'
|
||||
else:
|
||||
ret['file'] = 'sfi'
|
||||
ret['sfi'] = p2 >> 3
|
||||
mode = p2 & 0x7
|
||||
if mode == 2:
|
||||
ret['mode'] = 'next_record'
|
||||
elif mode == 3:
|
||||
ret['mode'] = 'previous_record'
|
||||
elif mode == 8:
|
||||
ret['mode'] = 'absolute_current'
|
||||
return ret
|
||||
|
||||
# TS 51.011 Section 9.2.5
|
||||
class ReadRecord(ApduCommand, n='READ RECORD', ins=0xB2, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
def _decode_p1p2(self):
|
||||
r = _decode_record_p1p2(self.p1, self.p2)
|
||||
self.col_id = '%02u' % r['record_number']
|
||||
return r
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, LinFixedEF):
|
||||
return b2h(self.rsp_data)
|
||||
method = getattr(self.file, 'decode_record_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.rsp_data)
|
||||
|
||||
# TS 51.011 Section 9.2.6
|
||||
class UpdateRecord(ApduCommand, n='UPDATE RECORD', ins=0xDC, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
def _decode_p1p2(self):
|
||||
r = _decode_record_p1p2(self.p1, self.p2)
|
||||
self.col_id = '%02u' % r['record_number']
|
||||
return r
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, LinFixedEF):
|
||||
return b2h(self.cmd_data)
|
||||
method = getattr(self.file, 'decode_record_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.cmd_data)
|
||||
|
||||
# TS 51.011 Section 9.2.7
|
||||
class Seek(ApduCommand, n='SEEK', ins=0xA2, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
_construct_rsp = GreedyRange(Int8ub)
|
||||
|
||||
def _decode_p1p2(self):
|
||||
ret = {}
|
||||
sfi = self.p2 >> 3
|
||||
if sfi == 0:
|
||||
ret['file'] = 'currently_selected_ef'
|
||||
else:
|
||||
ret['file'] = 'sfi'
|
||||
ret['sfi'] = sfi
|
||||
mode = self.p2 & 0x7
|
||||
if mode in [0x4, 0x5]:
|
||||
if mode == 0x4:
|
||||
ret['mode'] = 'forward_search'
|
||||
else:
|
||||
ret['mode'] = 'backward_search'
|
||||
ret['record_number'] = self.p1
|
||||
self.col_id = '%02u' % ret['record_number']
|
||||
elif mode == 6:
|
||||
ret['mode'] = 'enhanced_search'
|
||||
# TODO: further decode
|
||||
elif mode == 7:
|
||||
ret['mode'] = 'proprietary_search'
|
||||
return ret
|
||||
|
||||
def _decode_cmd(self):
|
||||
ret = self._decode_p1p2()
|
||||
if self.cmd_data:
|
||||
if ret['mode'] == 'enhanced_search':
|
||||
ret['search_indication'] = b2h(self.cmd_data[:2])
|
||||
ret['search_string'] = b2h(self.cmd_data[2:])
|
||||
else:
|
||||
ret['search_string'] = b2h(self.cmd_data)
|
||||
return ret
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
return self.to_dict()
|
||||
|
||||
# TS 51.011 Section 9.2.8
|
||||
class Increase(ApduCommand, n='INCREASE', ins=0x32, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
|
||||
PinConstructP2 = BitStruct('scope'/Enum(Flag, global_mf=0, specific_df_adf=1),
|
||||
BitsInteger(2), 'reference_data_nr'/BitsInteger(5))
|
||||
|
||||
# TS 51.011 Section 9.2.9
|
||||
class VerifyChv(ApduCommand, n='VERIFY CHV', ins=0x20, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
@staticmethod
|
||||
def _pin_process(apdu):
|
||||
processed = {
|
||||
'scope': apdu.cmd_dict['p2']['scope'],
|
||||
'referenced_data_nr': apdu.cmd_dict['p2']['reference_data_nr'],
|
||||
}
|
||||
if apdu.lc == 0:
|
||||
# this is just a question on the counters remaining
|
||||
processed['mode'] = 'check_remaining_attempts'
|
||||
else:
|
||||
processed['pin'] = b2h(apdu.cmd_data)
|
||||
if apdu.sw[0] == 0x63:
|
||||
processed['remaining_attempts'] = apdu.sw[1] & 0xf
|
||||
return processed
|
||||
|
||||
@staticmethod
|
||||
def _pin_is_success(sw):
|
||||
if sw[0] == 0x63:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.10
|
||||
class ChangeChv(ApduCommand, n='CHANGE CHV', ins=0x24, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.11
|
||||
class DisableChv(ApduCommand, n='DISABLE CHV', ins=0x26, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.12
|
||||
class EnableChv(ApduCommand, n='ENABLE CHV', ins=0x28, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.13
|
||||
class UnblockChv(ApduCommand, n='UNBLOCK CHV', ins=0x2C, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.14
|
||||
class Invalidate(ApduCommand, n='INVALIDATE', ins=0x04, cla=['A0']):
|
||||
_apdu_case = 1
|
||||
_construct_p1 = BitStruct(BitsInteger(4),
|
||||
'select_mode'/Enum(BitsInteger(4), ef_by_file_id=0,
|
||||
path_from_mf=8, path_from_current_df=9))
|
||||
|
||||
# TS 51.011 Section 9.2.15
|
||||
class Rehabilitate(ApduCommand, n='REHABILITATE', ins=0x44, cla=['A0']):
|
||||
_apdu_case = 1
|
||||
_construct_p1 = Invalidate._construct_p1
|
||||
|
||||
# TS 51.011 Section 9.2.16
|
||||
class RunGsmAlgorithm(ApduCommand, n='RUN GSM ALGORITHM', ins=0x88, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
_construct = Struct('rand'/Bytes(16))
|
||||
_construct_rsp = Struct('sres'/Bytes(4), 'kc'/Bytes(8))
|
||||
|
||||
# TS 51.011 Section 9.2.17
|
||||
class Sleep(ApduCommand, n='SLEEP', ins=0xFA, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
# TS 51.011 Section 9.2.18
|
||||
class GetResponse(ApduCommand, n='GET RESPONSE', ins=0xC0, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
# TS 51.011 Section 9.2.19
|
||||
class TerminalProfile(ApduCommand, n='TERMINAL PROFILE', ins=0x10, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
|
||||
# TS 51.011 Section 9.2.20
|
||||
class Envelope(ApduCommand, n='ENVELOPE', ins=0xC2, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
|
||||
# TS 51.011 Section 9.2.21
|
||||
class Fetch(ApduCommand, n='FETCH', ins=0x12, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
# TS 51.011 Section 9.2.22
|
||||
class TerminalResponse(ApduCommand, n='TERMINAL RESPONSE', ins=0x14, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
|
||||
|
||||
ApduCommands = ApduCommandSet('TS 51.011', cmds=[SimSelect, SimStatus, ReadBinary, UpdateBinary, ReadRecord,
|
||||
UpdateRecord, Seek, Increase, VerifyChv, ChangeChv, DisableChv,
|
||||
EnableChv, UnblockChv, Invalidate, Rehabilitate, RunGsmAlgorithm,
|
||||
Sleep, GetResponse, TerminalProfile, Envelope, Fetch, TerminalResponse])
|
||||
@@ -1,39 +0,0 @@
|
||||
# coding=utf-8
|
||||
|
||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from pySim.utils import h2b
|
||||
from pySim.gsmtap import GsmtapSource
|
||||
|
||||
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
|
||||
from pySim.apdu.ts_102_222 import ApduCommands as UiccAdmApduCommands
|
||||
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
|
||||
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
|
||||
|
||||
from . import ApduSource, PacketType, CardReset
|
||||
|
||||
ApduCommands = UiccApduCommands + UiccAdmApduCommands + UsimApduCommands + GpApduCommands
|
||||
|
||||
class StdinHexApduSource(ApduSource):
|
||||
"""ApduSource for reading apdu hex-strings from stdin."""
|
||||
|
||||
def read_packet(self) -> PacketType:
|
||||
while True:
|
||||
command = input("C-APDU >")
|
||||
response = '9000'
|
||||
return ApduCommands.parse_cmd_bytes(h2b(command) + h2b(response))
|
||||
raise StopIteration
|
||||
@@ -31,7 +31,6 @@ from pySim.exceptions import SwMatchError
|
||||
# CardModel is created, which will add the ATR-based matching and
|
||||
# calling of SysmocomSJA2.add_files. See CardModel.apply_matching_models
|
||||
import pySim.sysmocom_sja2
|
||||
#import pySim.sysmocom_euicc1
|
||||
|
||||
# we need to import these modules so that the various sub-classes of
|
||||
# CardProfile are created, which will be used in init_card() to iterate
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
mainly) ETSI TS 102 223, ETSI TS 101 220 and USIM Application Toolkit (SAT)
|
||||
as described in 3GPP TS 31.111."""
|
||||
|
||||
# (C) 2021-2024 by Harald Welte <laforge@osmocom.org>
|
||||
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
|
||||
@@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
|
||||
block_nr = self.block_nr
|
||||
ciphertext = self._encrypt(padded_data)
|
||||
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
|
||||
block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext))
|
||||
block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20])
|
||||
return ciphertext
|
||||
|
||||
def decrypt(self, data:bytes) -> bytes:
|
||||
@@ -149,10 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC):
|
||||
temp_data = self.mac_chain + tag_and_length + data
|
||||
old_mcv = self.mac_chain
|
||||
c_mac = self._auth(temp_data)
|
||||
|
||||
# DEBUG: Show MAC computation details
|
||||
logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
|
||||
logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
|
||||
logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
|
||||
logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
|
||||
logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
|
||||
|
||||
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
|
||||
ret = tag_and_length + data + c_mac
|
||||
logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
|
||||
|
||||
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
|
||||
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret))
|
||||
tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20])
|
||||
return ret
|
||||
|
||||
def verify(self, ciphertext: bytes) -> bool:
|
||||
@@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
|
||||
s_enc = out[l:2*l]
|
||||
s_mac = out[l*2:3*l]
|
||||
|
||||
logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
|
||||
logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
|
||||
|
||||
return s_enc, s_mac, initial_mac_chaining_value
|
||||
|
||||
|
||||
@@ -231,9 +246,21 @@ class BspInstance:
|
||||
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||
assert tag <= 255
|
||||
assert len(plaintext) <= self.max_payload_size
|
||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
|
||||
|
||||
# DEBUG: Show what we're processing
|
||||
logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
|
||||
logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
|
||||
|
||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20])
|
||||
ciphered = self.c_algo.encrypt(plaintext)
|
||||
logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
|
||||
|
||||
maced = self.m_algo.auth(tag, ciphered)
|
||||
logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
|
||||
logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}")
|
||||
|
||||
return maced
|
||||
|
||||
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
|
||||
|
||||
@@ -25,6 +25,9 @@ import pySim.esim.rsp as rsp
|
||||
from pySim.esim.bsp import BspInstance
|
||||
from pySim.esim import PMO
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
|
||||
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
|
||||
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
|
||||
@@ -196,8 +199,12 @@ class BoundProfilePackage(ProfilePackage):
|
||||
# 'initialiseSecureChannelRequest'
|
||||
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
|
||||
# firstSequenceOf87
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
|
||||
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
|
||||
# sequenceOF88
|
||||
logger.debug(f"BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
|
||||
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
|
||||
|
||||
if self.ppp: # we have to use session keys
|
||||
|
||||
@@ -94,9 +94,57 @@ class RspSessionState:
|
||||
self.__dict__.update(state)
|
||||
|
||||
|
||||
class RspSessionStore(shelve.DbfilenameShelf):
|
||||
"""A derived class as wrapper around the database-backed non-volatile storage 'shelve', in case we might
|
||||
need to extend it in the future. We use it to store RspSessionState objects indexed by transactionId."""
|
||||
class RspSessionStore:
|
||||
"""A wrapper around the database-backed storage 'shelve' for storing RspSessionState objects.
|
||||
Can be configured to use either file-based storage or in-memory storage.
|
||||
We use it to store RspSessionState objects indexed by transactionId."""
|
||||
|
||||
def __init__(self, filename: Optional[str] = None, in_memory: bool = False):
|
||||
self._in_memory = in_memory
|
||||
|
||||
if in_memory:
|
||||
self._shelf = shelve.Shelf(dict())
|
||||
else:
|
||||
if filename is None:
|
||||
raise ValueError("filename is required for file-based session store")
|
||||
self._shelf = shelve.open(filename)
|
||||
|
||||
# dunder magic
|
||||
def __getitem__(self, key):
|
||||
return self._shelf[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._shelf[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self._shelf[key]
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self._shelf
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._shelf)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._shelf)
|
||||
|
||||
# everything else
|
||||
def __getattr__(self, name):
|
||||
"""Delegate attribute access to the underlying shelf object."""
|
||||
return getattr(self._shelf, name)
|
||||
|
||||
def close(self):
|
||||
"""Close the session store."""
|
||||
if hasattr(self._shelf, 'close'):
|
||||
self._shelf.close()
|
||||
if self._in_memory:
|
||||
# For in-memory store, clear the reference
|
||||
self._shelf = None
|
||||
|
||||
def sync(self):
|
||||
"""Synchronize the cache with the underlying storage."""
|
||||
if hasattr(self._shelf, 'sync'):
|
||||
self._shelf.sync()
|
||||
|
||||
def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes:
|
||||
"""Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This
|
||||
|
||||
@@ -21,8 +21,6 @@ import io
|
||||
import os
|
||||
from typing import Tuple, List, Optional, Dict, Union
|
||||
from collections import OrderedDict
|
||||
from difflib import SequenceMatcher, Match
|
||||
|
||||
import asn1tools
|
||||
import zipfile
|
||||
from pySim import javacard
|
||||
@@ -46,29 +44,6 @@ asn1 = compile_asn1_subdir('saip')
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class NonMatch(Match):
|
||||
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
|
||||
@classmethod
|
||||
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
|
||||
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
|
||||
The caller must ensure that the input list is ordered, non-overlapping and only contains
|
||||
matches at equal offsets in a and b."""
|
||||
res = []
|
||||
cur = 0
|
||||
for match in l:
|
||||
if match.a != match.b:
|
||||
return ValueError('only works for equal-offset matches')
|
||||
assert match.a >= cur
|
||||
nm_len = match.a - cur
|
||||
if nm_len > 0:
|
||||
# there's no point in generating zero-lenth non-matching sections
|
||||
res.append(cls(a=cur, b=cur, size=nm_len))
|
||||
cur = match.a + match.size
|
||||
if size > cur:
|
||||
res.append(cls(a=cur, b=cur, size=size-cur))
|
||||
|
||||
return res
|
||||
|
||||
class Naa:
|
||||
"""A class defining a Network Access Application (NAA)"""
|
||||
name = None
|
||||
@@ -431,35 +406,12 @@ class File:
|
||||
return ValueError("Unknown key '%s' in tuple list" % k)
|
||||
return stream.getvalue()
|
||||
|
||||
def file_content_to_tuples(self, optimize:bool = True) -> List[Tuple]:
|
||||
if not self.file_type in ['TR', 'LF', 'CY', 'BT']:
|
||||
return []
|
||||
if not optimize:
|
||||
# simplistic approach: encode the full file, ignoring the template/default
|
||||
return [('fillFileContent', self.body)]
|
||||
# Try to 'compress' the file body, based on the default file contents.
|
||||
if self.template:
|
||||
default = self.template.expand_default_value_pattern(length=len(self.body))
|
||||
if not default:
|
||||
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
||||
else:
|
||||
if default == self.body:
|
||||
# 100% match: return an empty tuple list to make eUICC use the default
|
||||
return []
|
||||
sm = SequenceMatcher(a=default, b=self.body)
|
||||
else:
|
||||
# no template at all: we can only remove padding
|
||||
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
||||
matching_blocks = sm.get_matching_blocks()
|
||||
# we can only make use of matches that have the same offset in 'a' and 'b'
|
||||
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
|
||||
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
|
||||
ret = []
|
||||
cur = 0
|
||||
for block in non_matching_blocks:
|
||||
ret.append(('fillFileOffset', block.a - cur))
|
||||
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
|
||||
return ret
|
||||
def file_content_to_tuples(self) -> List[Tuple]:
|
||||
# FIXME: simplistic approach. needs optimization. We should first check if the content
|
||||
# matches the expanded default value from the template. If it does, return empty list.
|
||||
# Next, we should compute the diff between the default value and self.body, and encode
|
||||
# that as a sequence of fillFileOffset and fillFileContent tuples.
|
||||
return [('fillFileContent', self.body)]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "File(%s)" % self.pe_name
|
||||
|
||||
@@ -25,6 +25,7 @@ from cryptography.hazmat.primitives.serialization import load_pem_private_key, E
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
|
||||
|
||||
from pySim.utils import b2h
|
||||
from . import x509_err
|
||||
|
||||
def check_signed(signed: x509.Certificate, signer: x509.Certificate) -> bool:
|
||||
"""Verify if 'signed' certificate was signed using 'signer'."""
|
||||
@@ -64,9 +65,6 @@ class oid:
|
||||
id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
|
||||
id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
|
||||
|
||||
class VerifyError(Exception):
|
||||
"""An error during certificate verification,"""
|
||||
|
||||
class CertificateSet:
|
||||
"""A set of certificates consisting of a trusted [self-signed] CA root certificate,
|
||||
and an optional number of intermediate certificates. Can be used to verify the certificate chain
|
||||
@@ -135,7 +133,7 @@ class CertificateSet:
|
||||
# we cannot check if there's no CRL
|
||||
return
|
||||
if self.crl.get_revoked_certificate_by_serial_number(cert.serial_nr):
|
||||
raise VerifyError('Certificate is present in CRL, verification failed')
|
||||
raise x509_err.CertificateRevoked()
|
||||
|
||||
def verify_cert_chain(self, cert: x509.Certificate, max_depth: int = 100):
|
||||
"""Verify if a given certificate's signature chain can be traced back to the root CA of this
|
||||
@@ -149,14 +147,14 @@ class CertificateSet:
|
||||
check_signed(c, self.root_cert)
|
||||
return
|
||||
parent_cert = self.intermediate_certs.get(aki, None)
|
||||
if not aki:
|
||||
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki))
|
||||
if not parent_cert:
|
||||
raise x509_err.MissingIntermediateCert(b2h(aki))
|
||||
check_signed(c, parent_cert)
|
||||
# if we reach here, we passed (no exception raised)
|
||||
c = parent_cert
|
||||
depth += 1
|
||||
if depth > max_depth:
|
||||
raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth)
|
||||
raise x509_err.MaxDepthExceeded(max_depth, depth)
|
||||
|
||||
|
||||
def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:
|
||||
|
||||
58
pySim/esim/x509_err.py
Normal file
58
pySim/esim/x509_err.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""X.509 certificate verification exceptions for GSMA eSIM."""
|
||||
|
||||
class VerifyError(Exception):
|
||||
"""Base class for certificate verification errors."""
|
||||
pass
|
||||
|
||||
|
||||
class MissingIntermediateCert(VerifyError):
|
||||
"""Raised when an intermediate certificate in the chain cannot be found."""
|
||||
def __init__(self, auth_key_id: str):
|
||||
self.auth_key_id = auth_key_id
|
||||
super().__init__(f'Could not find intermediate certificate for AuthKeyId {auth_key_id}')
|
||||
|
||||
|
||||
class CertificateRevoked(VerifyError):
|
||||
"""Raised when a certificate is found in the CRL."""
|
||||
def __init__(self, cert_serial: str = None):
|
||||
self.cert_serial = cert_serial
|
||||
msg = 'Certificate is present in CRL, verification failed'
|
||||
if cert_serial:
|
||||
msg += f' (serial: {cert_serial})'
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class MaxDepthExceeded(VerifyError):
|
||||
"""Raised when certificate chain depth exceeds the maximum allowed."""
|
||||
def __init__(self, max_depth: int, actual_depth: int):
|
||||
self.max_depth = max_depth
|
||||
self.actual_depth = actual_depth
|
||||
super().__init__(f'Maximum depth {max_depth} exceeded while verifying certificate chain (actual: {actual_depth})')
|
||||
|
||||
|
||||
class SignatureVerification(VerifyError):
|
||||
"""Raised when certificate signature verification fails."""
|
||||
def __init__(self, cert_subject: str = None, signer_subject: str = None):
|
||||
self.cert_subject = cert_subject
|
||||
self.signer_subject = signer_subject
|
||||
msg = 'Certificate signature verification failed'
|
||||
if cert_subject and signer_subject:
|
||||
msg += f': {cert_subject} not signed by {signer_subject}'
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class InvalidCertificate(VerifyError):
|
||||
"""Raised when a certificate is invalid (missing required fields, wrong type, etc)."""
|
||||
def __init__(self, reason: str):
|
||||
self.reason = reason
|
||||
super().__init__(f'Invalid certificate: {reason}')
|
||||
|
||||
|
||||
class CertificateExpired(VerifyError):
|
||||
"""Raised when a certificate has expired."""
|
||||
def __init__(self, cert_subject: str = None):
|
||||
self.cert_subject = cert_subject
|
||||
msg = 'Certificate has expired'
|
||||
if cert_subject:
|
||||
msg += f': {cert_subject}'
|
||||
super().__init__(msg)
|
||||
15
pySim/ota.py
15
pySim/ota.py
@@ -302,7 +302,7 @@ class OtaAlgoAuthDES3(OtaAlgoAuth):
|
||||
class OtaAlgoCryptAES(OtaAlgoCrypt):
|
||||
name = 'AES'
|
||||
enum_name = 'aes_cbc'
|
||||
blocksize = 16
|
||||
blocksize = 16 # TODO: is this needed?
|
||||
def _encrypt(self, data:bytes) -> bytes:
|
||||
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
|
||||
return cipher.encrypt(data)
|
||||
@@ -356,20 +356,20 @@ class OtaDialectSms(OtaDialect):
|
||||
|
||||
# CHL + SPI (+ KIC + KID)
|
||||
part_head = self.hdr_construct.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
|
||||
print("part_head: %s" % b2h(part_head))
|
||||
#print("part_head: %s" % b2h(part_head))
|
||||
|
||||
# CNTR + PCNTR (CNTR not used)
|
||||
part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
|
||||
print("part_cnt: %s" % b2h(part_cnt))
|
||||
#print("part_cnt: %s" % b2h(part_cnt))
|
||||
|
||||
envelope_data = part_head + part_cnt + apdu
|
||||
print("envelope_data: %s" % b2h(envelope_data))
|
||||
#print("envelope_data: %s" % b2h(envelope_data))
|
||||
|
||||
# 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
|
||||
# CPL from and including CPI to end of secured data, including any padding for ciphering
|
||||
cpl = len(envelope_data) + len_sig
|
||||
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
|
||||
print("envelope_data with cpl: %s" % b2h(envelope_data))
|
||||
#print("envelope_data with cpl: %s" % b2h(envelope_data))
|
||||
|
||||
if spi['rc_cc_ds'] == 'cc':
|
||||
cc = otak.auth.sign(envelope_data)
|
||||
@@ -383,7 +383,7 @@ class OtaDialectSms(OtaDialect):
|
||||
else:
|
||||
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
|
||||
|
||||
print("envelope_data with sig: %s" % b2h(envelope_data))
|
||||
#print("envelope_data with sig: %s" % b2h(envelope_data))
|
||||
|
||||
# encrypt as needed
|
||||
if spi['ciphering']: # ciphering is requested
|
||||
@@ -395,7 +395,7 @@ class OtaDialectSms(OtaDialect):
|
||||
else:
|
||||
envelope_data = part_head + envelope_data
|
||||
|
||||
print("envelope_data: %s" % b2h(envelope_data))
|
||||
#print("envelope_data: %s" % b2h(envelope_data))
|
||||
|
||||
if len(envelope_data) > 140:
|
||||
raise ValueError('Cannot encode command in a single SMS; Fragmentation not implemented')
|
||||
@@ -492,7 +492,6 @@ class OtaDialectSms(OtaDialect):
|
||||
else:
|
||||
raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
|
||||
|
||||
print(res)
|
||||
# TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
|
||||
if res.response_status == 'por_ok' and len(res['secured_data']):
|
||||
dec = CompactRemoteResp.parse(res['secured_data'])
|
||||
|
||||
@@ -30,13 +30,6 @@ from smpp.pdu import pdu_types, operations
|
||||
|
||||
BytesOrHex = typing.Union[Hexstr, bytes]
|
||||
|
||||
# 07
|
||||
# 00 03 000201 # part 01 of 02 in reference 00
|
||||
# 70 00
|
||||
|
||||
# 05
|
||||
# 00 03 000202
|
||||
|
||||
class UserDataHeader:
|
||||
# a single IE in the user data header
|
||||
ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
|
||||
|
||||
@@ -162,28 +162,6 @@ class EF_SIM_AUTH_KEY(TransparentEF):
|
||||
'key'/Bytes(16),
|
||||
'op_opc' /Bytes(16))
|
||||
|
||||
class EF_HTTPS_CFG(TransparentEF):
|
||||
def __init__(self, fid='6f2a', name='EF.HTTPS_CFG'):
|
||||
super().__init__(fid, name=name, desc='HTTPS configuration')
|
||||
|
||||
class EF_HTTPS_KEYS(TransparentEF):
|
||||
KeyRecord = Struct('security_domain'/Int8ub,
|
||||
'key_type'/Enum(Int8ub, des=0x80, psk=0x85, aes=0x88),
|
||||
'key_version'/Int8ub,
|
||||
'key_id'/Int8ub,
|
||||
'key_length'/Int8ub,
|
||||
'key'/Bytes(this.key_length))
|
||||
def __init__(self, fid='6f2b', name='EF.HTTPS_KEYS'):
|
||||
super().__init__(fid, name=name, desc='HTTPS PSK and DEK keys')
|
||||
self._construct = GreedyRange(self.KeyRecord)
|
||||
|
||||
class EF_HTTPS_POLL(TransparentEF):
|
||||
TimeUnit = Enum(Int8ub, seconds=0, minutes=1, hours=2, days=3, ten_days=4)
|
||||
def __init__(self, fid='6f2c', name='EF.HTTPS_POLL'):
|
||||
super().__init__(fid, name=name, desc='HTTPS polling interval')
|
||||
self._construct = Struct(Const(b'\x82'), 'time_unit'/self.TimeUnit, 'value'/Int8ub,
|
||||
'adm_session_triggering_tlv'/GreedyBytes)
|
||||
|
||||
|
||||
class DF_SYSTEM(CardDF):
|
||||
def __init__(self):
|
||||
@@ -202,9 +180,6 @@ class DF_SYSTEM(CardDF):
|
||||
EF_0348_COUNT(),
|
||||
EF_GP_COUNT(),
|
||||
EF_GP_DIV_DATA(),
|
||||
EF_HTTPS_CFG(),
|
||||
EF_HTTPS_KEYS(),
|
||||
EF_HTTPS_POLL(),
|
||||
]
|
||||
self.add_files(files)
|
||||
|
||||
|
||||
@@ -333,13 +333,11 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
||||
from pySim.transport.pcsc import PcscSimLink
|
||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
from pySim.transport.calypso import CalypsoSimLink
|
||||
from pySim.transport.wsrc import WsrcSimLink
|
||||
|
||||
SerialSimLink.argparse_add_reader_args(arg_parser)
|
||||
PcscSimLink.argparse_add_reader_args(arg_parser)
|
||||
ModemATCommandLink.argparse_add_reader_args(arg_parser)
|
||||
CalypsoSimLink.argparse_add_reader_args(arg_parser)
|
||||
WsrcSimLink.argparse_add_reader_args(arg_parser)
|
||||
arg_parser.add_argument('--apdu-trace', action='store_true',
|
||||
help='Trace the command/response APDUs exchanged with the card')
|
||||
|
||||
@@ -362,9 +360,6 @@ def init_reader(opts, **kwargs) -> LinkBase:
|
||||
elif opts.modem_dev is not None:
|
||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
sl = ModemATCommandLink(opts, **kwargs)
|
||||
elif opts.wsrc_server_url is not None:
|
||||
from pySim.transport.wsrc import WsrcSimLink
|
||||
sl = WsrcSimLink(opts, **kwargs)
|
||||
else: # Serial reader is default
|
||||
print("No reader/driver specified; falling back to default (Serial reader)")
|
||||
from pySim.transport.serial import SerialSimLink
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import argparse
|
||||
from typing import Optional
|
||||
|
||||
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
|
||||
|
||||
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
|
||||
from pySim.transport import LinkBase
|
||||
from pySim.utils import ResTuple
|
||||
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||
|
||||
class UserWsClientBlocking(WsClientBlocking):
|
||||
def perform_outbound_hello(self):
|
||||
hello_data = {
|
||||
'client_type': 'user',
|
||||
}
|
||||
super().perform_outbound_hello(hello_data)
|
||||
|
||||
def select_card(self, id_type:str, id_str:str):
|
||||
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
|
||||
'select_card_ack')
|
||||
return rx
|
||||
|
||||
def reset_card(self):
|
||||
self.transceive_json('reset_req', {}, 'reset_resp')
|
||||
|
||||
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||
return rx['response'], rx['sw']
|
||||
|
||||
|
||||
class WsrcSimLink(LinkBase):
|
||||
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
|
||||
name = 'WSRC'
|
||||
|
||||
def __init__(self, opts: argparse.Namespace, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.identities = {}
|
||||
self.server_url = opts.wsrc_server_url
|
||||
if opts.wsrc_eid:
|
||||
self.id_type = 'eid'
|
||||
self.id_str = opts.wsrc_eid
|
||||
elif opts.wsrc_iccid:
|
||||
self.id_type = 'iccid'
|
||||
self.id_str = opts.wsrc_iccid
|
||||
self.client = UserWsClientBlocking(self.server_url)
|
||||
self.client.connect()
|
||||
|
||||
def __del__(self):
|
||||
# FIXME: disconnect from server
|
||||
pass
|
||||
|
||||
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
|
||||
self.connect()
|
||||
|
||||
def connect(self):
|
||||
rx = self.client.select_card(self.id_type, self.id_str)
|
||||
self.identitites = rx['identities']
|
||||
|
||||
def get_atr(self) -> Hexstr:
|
||||
return self.identities['atr']
|
||||
|
||||
def disconnect(self):
|
||||
self.__delete__()
|
||||
|
||||
def _reset_card(self):
|
||||
self.client.reset_card()
|
||||
return 1
|
||||
|
||||
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
|
||||
return self.client.xceive_apdu_raw(pdu)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
|
||||
|
||||
@staticmethod
|
||||
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
||||
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
|
||||
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
|
||||
can be accessed via a network.""")
|
||||
wsrc_group.add_argument('--wsrc-server-url',
|
||||
help='URI of the WSRC server to connect to')
|
||||
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
|
||||
help='ICCID of the card to open via WSRC')
|
||||
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
|
||||
help='EID of the card to open via WSRC')
|
||||
@@ -1,65 +0,0 @@
|
||||
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||
perform commands on it."""
|
||||
|
||||
import abc
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
from websockets.sync.client import connect
|
||||
from osmocom.utils import b2h
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WsClientBlocking(abc.ABC):
|
||||
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
|
||||
|
||||
def __init__(self, ws_uri: str):
|
||||
self.ws_uri = ws_uri
|
||||
self.ws = None
|
||||
|
||||
def connect(self):
|
||||
self.ws = connect(uri=self.ws_uri)
|
||||
self.perform_outbound_hello()
|
||||
|
||||
def tx_json(self, msg_type: str, d: dict = {}):
|
||||
"""JSON-Encode and transmit a message to the given websocket."""
|
||||
d['msg_type'] = msg_type
|
||||
d_js = json.dumps(d)
|
||||
logger.debug("Tx: %s", d_js)
|
||||
self.ws.send(d_js)
|
||||
|
||||
def tx_error(self, message: str):
|
||||
event = {
|
||||
'message': message,
|
||||
}
|
||||
self.tx_json('error', event)
|
||||
|
||||
def rx_json(self):
|
||||
"""Receive a single message from the given websocket and JSON-decode it."""
|
||||
rx = self.ws.recv()
|
||||
rx_js = json.loads(rx)
|
||||
logger.debug("Rx: %s", rx_js)
|
||||
assert 'msg_type' in rx
|
||||
return rx_js
|
||||
|
||||
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
|
||||
self.tx_json(tx_msg_type, tx_d)
|
||||
rx = self.rx_json()
|
||||
assert rx['msg_type'] == rx_msg_type
|
||||
return rx
|
||||
|
||||
def perform_outbound_hello(self, tx: dict = {}):
|
||||
self.tx_json('hello', tx)
|
||||
rx = self.rx_json()
|
||||
assert rx['msg_type'] == 'hello_ack'
|
||||
return rx
|
||||
|
||||
def rx_and_execute_cmd(self):
|
||||
"""Receve and dispatch/execute a single command from the server."""
|
||||
rx = self.rx_json()
|
||||
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
|
||||
if handler:
|
||||
handler(rx)
|
||||
else:
|
||||
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
|
||||
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])
|
||||
@@ -15,4 +15,3 @@ git+https://github.com/osmocom/asn1tools
|
||||
packaging
|
||||
git+https://github.com/hologram-io/smpp.pdu
|
||||
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
|
||||
smpplib
|
||||
|
||||
8
setup.py
8
setup.py
@@ -49,4 +49,12 @@ setup(
|
||||
'asn1/saip/*.asn',
|
||||
],
|
||||
},
|
||||
extras_require={
|
||||
"smdpp": [
|
||||
"klein",
|
||||
"service-identity",
|
||||
"pyopenssl",
|
||||
"requests",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
16
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
Normal file
16
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
Normal file
@@ -0,0 +1,16 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICgjCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||
SVQwHhcNMjQwNzA5MTUyOTM2WhcNMjUwODExMTUyOTM2WjAzMQ0wCwYDVQQKDARB
|
||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI
|
||||
zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n
|
||||
MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV
|
||||
HQ8BAf8EBAMCB4AwIAYDVR0lAQH/BBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMBQG
|
||||
A1UdIAQNMAswCQYHZ4ESAQIBAzAdBgNVHQ4EFgQUPTMJg/OfzFvS5K1ophmnR0iu
|
||||
i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ
|
||||
dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG
|
||||
JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0
|
||||
cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0cA
|
||||
MEQCIHHmXEy9mgudh/VbK0hJwmX7eOgbvHLnlujrpQzvUd4uAiBFVJgSdzYvrmJ9
|
||||
5yeIvmjHwxSMBgQp2dde7OtdVEK8Kw==
|
||||
-----END CERTIFICATE-----
|
||||
Binary file not shown.
@@ -1,7 +1,7 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICgjCCAiigAwIBAgIBCjAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MIICgzCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||
SVQwHhcNMjUwNDIzMTUyMzA1WhcNMzUwNDIxMTUyMzA1WjAzMQ0wCwYDVQQKDARB
|
||||
SVQwHhcNMjQwNzA5MTUyODMzWhcNMjUwODExMTUyODMzWjAzMQ0wCwYDVQQKDARB
|
||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
|
||||
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
|
||||
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
|
||||
@@ -10,7 +10,7 @@ VR0gBA0wCzAJBgdngRIBAgEDMB0GA1UdDgQWBBQn/vHyKRh+x4Pt9uApZKRRjVfU
|
||||
qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
|
||||
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
|
||||
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
|
||||
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSAAw
|
||||
RQIgYakBZP6y9/2iu9aZkgb89SQgfRb0TKVMGElWgtyoX2gCIQCT8o/TkAxhWCTY
|
||||
yaBDMi1L9Ub+93ef5s+S+eHLmwuudA==
|
||||
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSQAw
|
||||
RgIhAL1qQ/cnrCZC7UnnLJ8WeK+0aWUJFWh1cOlBEzw0NlTVAiEA25Vf4WHzwmJi
|
||||
zkARzxJ1qB0qfBofuJrtfPM4gNJ4Quw=
|
||||
-----END CERTIFICATE-----
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1
smdpp-data/sm-dp-sessions-BRP
Symbolic link
1
smdpp-data/sm-dp-sessions-BRP
Symbolic link
@@ -0,0 +1 @@
|
||||
/tmp/sm-dp-sessions-BRP
|
||||
1
smdpp-data/sm-dp-sessions-NIST
Symbolic link
1
smdpp-data/sm-dp-sessions-NIST
Symbolic link
@@ -0,0 +1 @@
|
||||
/tmp/sm-dp-sessions-NIST
|
||||
@@ -1,142 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
import sys
|
||||
from pprint import pprint as pp
|
||||
|
||||
from pySim.ota import OtaKeyset, OtaDialectSms
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
import smpplib.gsm
|
||||
import smpplib.client
|
||||
import smpplib.consts
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# if you want to know what's happening
|
||||
logging.basicConfig(level='DEBUG')
|
||||
|
||||
class Foo:
|
||||
def smpp_rx_handler(self, pdu):
|
||||
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
|
||||
if pdu.short_message:
|
||||
try:
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
|
||||
except ValueError:
|
||||
spi = self.spi.copy()
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
|
||||
pp(dec)
|
||||
return None
|
||||
|
||||
def __init__(self):
|
||||
# Two parts, UCS2, SMS with UDH
|
||||
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
|
||||
|
||||
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
|
||||
|
||||
# Print when obtain message_id
|
||||
client.set_message_sent_handler(
|
||||
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
|
||||
#client.set_message_received_handler(
|
||||
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
|
||||
client.set_message_received_handler(self.smpp_rx_handler)
|
||||
|
||||
client.connect()
|
||||
client.bind_transceiver(system_id='test', password='test')
|
||||
|
||||
self.client = client
|
||||
|
||||
if False:
|
||||
KIC1 = h2b('000102030405060708090a0b0c0d0e0f')
|
||||
KID1 = h2b('101112131415161718191a1b1c1d1e1f')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
|
||||
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
|
||||
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
|
||||
|
||||
if False:
|
||||
KIC1 = h2b('4BE2D58A1FA7233DD723B3C70996E6E6')
|
||||
KID1 = h2b('4a664208eba091d32c4ecbc299da1f34')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
|
||||
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
|
||||
#KIC3 = h2b('A4074D8E1FE69B484A7E62682ED09B51')
|
||||
#KID3 = h2b('41FF1033910112DB4EBEBB7807F939CD')
|
||||
#self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
|
||||
# algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
|
||||
#self.tar = h2b('B00011') # USIM RFM
|
||||
self.tar = h2b('000000') # RAM
|
||||
|
||||
if False: # sysmoEUICC1-C2G
|
||||
KIC1 = h2b('B52F9C5938D1C19ED73E1AE772937FD7')
|
||||
KID1 = h2b('3BC696ACD1EEC95A6624F7330D22FC81')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
|
||||
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
|
||||
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
|
||||
|
||||
if False: # TS.48 profile
|
||||
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
|
||||
KID1 = h2b('112233445566778899aabbccddeeff10')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
|
||||
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
|
||||
self.tar = h2b('000000') # ISD-P according to FIXME
|
||||
|
||||
if False: # TS.48 profile AES
|
||||
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
|
||||
KID1 = h2b('112233445566778899aabbccddeeff10')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=2, kid=KID1)
|
||||
self.tar = h2b('000000') # ISD-P according to FIXME
|
||||
|
||||
if True: # eSIM profile
|
||||
KIC1 = h2b('207c5d2c1aa80d58cd8f542fb9ef2f80')
|
||||
KID1 = h2b('312367f1681902fd67d9a71c62a840e3')
|
||||
#KIC1 = h2b('B6B0E130EEE1C9A4A29DAEC034D35C9E')
|
||||
#KID1 = h2b('889C51CD2A381A9D4EDDA9224C24A9AF')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
|
||||
self.ota_keyset.cntr = 7
|
||||
self.tar = h2b('b00001') # ADF.USIM
|
||||
|
||||
self.ota_dialect = OtaDialectSms()
|
||||
self.spi = {'counter':'counter_must_be_higher', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
|
||||
|
||||
def tx_sms_tpdu(self, tpdu: bytes):
|
||||
self.client.send_message(
|
||||
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure it is a byte string, not unicode:
|
||||
source_addr='12',
|
||||
|
||||
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure thease two params are byte strings, not unicode:
|
||||
destination_addr='23',
|
||||
short_message=tpdu,
|
||||
|
||||
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
|
||||
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
|
||||
protocol_id=0x7f,
|
||||
#registered_delivery=True,
|
||||
)
|
||||
|
||||
def tx_c_apdu(self, apdu: bytes):
|
||||
logger.info("C-APDU: %s" % b2h(apdu))
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
# add user data header
|
||||
tpdu = b'\x02\x70\x00' + secured
|
||||
# send via SMPP
|
||||
self.tx_sms_tpdu(tpdu)
|
||||
|
||||
|
||||
f = Foo()
|
||||
print("initialized")
|
||||
#f.tx_c_apdu(h2b('80a40400023f00'))
|
||||
#f.tx_c_apdu(h2b('80EC010100'))
|
||||
#f.tx_c_apdu(h2b('80EC0101' + '0E' + '350103' + '390203e8' + '3e052101020304'))
|
||||
f.tx_c_apdu(h2b('00a40004026f07' + '00b0000009'))
|
||||
f.client.listen()
|
||||
@@ -90,34 +90,5 @@ class OidTest(unittest.TestCase):
|
||||
self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0'))
|
||||
self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1'))
|
||||
|
||||
class NonMatchTest(unittest.TestCase):
|
||||
def test_nonmatch(self):
|
||||
# non-matches before, in between and after matches
|
||||
match_list = [Match(a=10, b=10, size=5), Match(a=20, b=20, size=4)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 26)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=10), NonMatch(a=15, b=15, size=5),
|
||||
NonMatch(a=24, b=24, size=2)])
|
||||
|
||||
def test_nonmatch_beg(self):
|
||||
# single match at beginning
|
||||
match_list = [Match(a=0, b=0, size=5)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 20)
|
||||
self.assertEqual(nm_list, [NonMatch(a=5, b=5, size=15)])
|
||||
|
||||
def test_nonmatch_end(self):
|
||||
# single match at end
|
||||
match_list = [Match(a=19, b=19, size=5)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 24)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=19)])
|
||||
|
||||
def test_nonmatch_none(self):
|
||||
# no match at all
|
||||
match_list = []
|
||||
nm_list = NonMatch.from_matchlist(match_list, 24)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=24)])
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -16,16 +16,6 @@ SPI_CC_POR_CIPHERED_CC = {
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_CTR_POR_UNCIPHERED_CC = {
|
||||
'counter':'counter_must_be_higher',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'cc',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_POR_UNCIPHERED_CC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
@@ -46,17 +36,6 @@ SPI_CC_POR_UNCIPHERED_NOCC = {
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_UNCIPHERED = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':False,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'no_rc_cc_ds',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
|
||||
######################################################################
|
||||
# old-style code-driven test (lots of code copy+paste)
|
||||
######################################################################
|
||||
@@ -95,48 +74,6 @@ class Test_SMS_AES128(unittest.TestCase):
|
||||
self.assertEqual(b2h(dec_tar), b2h(self.tar))
|
||||
self.assertEqual(dec_spi, spi)
|
||||
|
||||
def test_open_channel(self):
|
||||
spi = self.spi_base
|
||||
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('000102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('101112131415161718191a1b1c1d1e1f'))
|
||||
tpdu = h2b('40048111227ff6407070611535007e070003000201700000781516011212000001ccda206e8b0d46304247bf00bfdc9853eed2a826f9af8dc7c2974ce2cb9bb55cc1a8577e047cc8f5d450380ba86b25354fe69f58884f671d7ace0c911f7c74830dc1d58b62cce4934568697ba1f577eecbca26c5dbfa32b0e2f0877948a9fb46a122e4214947386f467de11c')
|
||||
#'40048111227ff6407070611535000a0500030002027b6abed3'
|
||||
submit = SMS_DELIVER.from_bytes(tpdu)
|
||||
submit.tp_udhi = True
|
||||
print(submit)
|
||||
#print("UD: %s" % b2h(submit.tp_ud))
|
||||
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
|
||||
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
|
||||
print("UDHD: %s" % udhd)
|
||||
print("DATA: %s" % b2h(data))
|
||||
tpdu2 = h2b('40048111227ff6407070611535000a0500030002027b6abed3')
|
||||
submit2 = SMS_DELIVER.from_bytes(tpdu2)
|
||||
print(submit2)
|
||||
udhd2, data2 = UserDataHeader.from_bytes(submit2.tp_ud)
|
||||
print("UDHD: %s" % udhd2)
|
||||
print("DATA: %s" % b2h(data2))
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data + data2)
|
||||
print(b2h(dec_tar), b2h(dec_apdu))
|
||||
|
||||
def test_open_channel2(self):
|
||||
spi = self.spi_base
|
||||
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('000102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('101112131415161718191a1b1c1d1e1f'))
|
||||
tpdu = h2b('40048111227ff6407070611535004d02700000481516011212000001fe4c0943aea42e45021c078ae06c66afc09303608874b72f58bacadb0dcf665c29349c799fbb522e61709c9baf1890015e8e8e196e36153106c8b92f95153774')
|
||||
submit = SMS_DELIVER.from_bytes(tpdu)
|
||||
submit.tp_udhi = True
|
||||
print(submit)
|
||||
#print("UD: %s" % b2h(submit.tp_ud))
|
||||
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
|
||||
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
|
||||
print("UDHD: %s" % udhd)
|
||||
print("DATA: %s" % b2h(data))
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data)
|
||||
print(b2h(dec_tar), b2h(dec_apdu))
|
||||
|
||||
class Test_SMS_3DES(unittest.TestCase):
|
||||
tar = h2b('b00000')
|
||||
@@ -241,13 +178,6 @@ OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
|
||||
kic=h2b('200102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('201102030405060708090a0b0c0d0e0f'))
|
||||
|
||||
OTA_KEYSET_eSIM_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('207c5d2c1aa80d58cd8f542fb9ef2f80'),
|
||||
kid=h2b('312367f1681902fd67d9a71c62a840e3'))
|
||||
OTA_KEYSET_eSIM_AES128.cntr = 2
|
||||
|
||||
|
||||
class OtaTestCase(unittest.TestCase):
|
||||
def __init__(self, methodName='runTest', **kwargs):
|
||||
super().__init__(methodName, **kwargs)
|
||||
@@ -255,7 +185,6 @@ class OtaTestCase(unittest.TestCase):
|
||||
# SIM RFM: B00010
|
||||
# USIM RFM: B00011
|
||||
self.tar = h2b('B00011')
|
||||
self.tar = h2b('B00000')
|
||||
|
||||
class SmsOtaTestCase(OtaTestCase):
|
||||
# Array describing the input/output data for the tests. We use the
|
||||
@@ -264,102 +193,70 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
# manually writing one class per test.
|
||||
testdatasets = [
|
||||
{
|
||||
# 'name': '3DES-SJA5-CIPHERED-CC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
# 'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
# 'name': '3DES-SJA5-UNCIPHERED-CC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
# 'spi': SPI_CC_POR_UNCIPHERED_CC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
# 'name': '3DES-SJA5-UNCIPHERED-NOCC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
# 'spi': SPI_CC_POR_UNCIPHERED_NOCC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100000e0ab0001100000000000000016132',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
# 'name': 'AES128-SJA5-CIPHERED-CC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_AES128,
|
||||
# 'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
'name': 'AES128-eSIM-UNCIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_eSIM_AES128,
|
||||
'spi': SPI_CC_UNCIPHERED,
|
||||
'name': '3DES-SJA5-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
|
||||
'encoded_cmd': '1502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
|
||||
'encoded_tpdu': '40048111227FF6407070611535002702700000221502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100000B0AB000000000000000000A9000',
|
||||
'response_status': 'insufficient_security_level',
|
||||
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
}
|
||||
}, {
|
||||
'name': 'AES128-eSIM-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_eSIM_AES128,
|
||||
'spi': SPI_CC_CTR_POR_UNCIPHERED_CC,
|
||||
'name': '3DES-SJA5-UNCIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
|
||||
'encoded_cmd': '00281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
|
||||
'encoded_tpdu': '40048111227FF6407070611535002D02700000281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100001C12B00000000000000200007DB7086951F7DB55AB0780010123026D009000',
|
||||
#'encoded_resp': '027100000b0ab0000100000000000006'
|
||||
#'encoded_resp': '027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c'
|
||||
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
}
|
||||
}, {
|
||||
'name': '3DES-SJA5-UNCIPHERED-NOCC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100000e0ab0001100000000000000016132',
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
}
|
||||
}, {
|
||||
'name': 'AES128-SJA5-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_AES128,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
}
|
||||
},
|
||||
|
||||
# TODO: AES192
|
||||
# TODO: AES256
|
||||
]
|
||||
@@ -375,7 +272,7 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
kset = t['ota_keyset']
|
||||
outp = self.dialect.encode_cmd(kset, self.tar, t['spi'], apdu=t['request']['apdu'])
|
||||
#print("result: %s" % b2h(outp))
|
||||
self.assertEqual(b2h(outp), t['request']['encoded_cmd'].lower())
|
||||
self.assertEqual(b2h(outp), t['request']['encoded_cmd'])
|
||||
|
||||
with_udh = b'\x02\x70\x00' + outp
|
||||
#print("with_udh: %s" % b2h(with_udh))
|
||||
@@ -384,7 +281,7 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
|
||||
#print("TPDU: %s" % tpdu)
|
||||
#print("tpdu: %s" % b2h(tpdu.to_bytes()))
|
||||
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'].lower())
|
||||
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'])
|
||||
|
||||
# also test decoder
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(kset, outp)
|
||||
@@ -399,12 +296,9 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
r, d = self.dialect.decode_resp(kset, t['spi'], t['response']['encoded_resp'])
|
||||
#print("RESP: %s / %s" % (r, d))
|
||||
self.assertEqual(r.response_status, t['response']['response_status'])
|
||||
if 'number_of_commands' in t['response']:
|
||||
self.assertEqual(d.number_of_commands, t['response']['number_of_commands'])
|
||||
if 'last_status_word' in t['response']:
|
||||
self.assertEqual(d.last_status_word, t['response']['last_status_word'])
|
||||
if 'last_response_data' in t['response']:
|
||||
self.assertEqual(d.last_response_data, t['response']['last_response_data'])
|
||||
self.assertEqual(d.number_of_commands, t['response']['number_of_commands'])
|
||||
self.assertEqual(d.last_status_word, t['response']['last_status_word'])
|
||||
self.assertEqual(d.last_response_data, t['response']['last_response_data'])
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
301
vpcd2smpp.py
301
vpcd2smpp.py
@@ -1,301 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# This program receive APDUs via the VPCD protocol of Frank Morgner's
|
||||
# virtualsmartcard, encrypts them with OTA (over the air) keys and
|
||||
# forwards them via SMPP to a SMSC (SMS service centre).
|
||||
#
|
||||
# In other words, you can use it as a poor man's OTA server, to enable
|
||||
# you to use unmodified application software with PC/SC support to talk
|
||||
# securely via OTA with a remote SMS card.
|
||||
#
|
||||
# This is very much a work in progress at this point.
|
||||
|
||||
#######################################################################
|
||||
# twisted VPCD Library
|
||||
#######################################################################
|
||||
|
||||
import logging
|
||||
import struct
|
||||
import abc
|
||||
from typing import Union, Optional
|
||||
from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild
|
||||
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class VirtualCard(abc.ABC):
|
||||
"""Abstract base class for a virtual smart card."""
|
||||
def __init__(self, atr: Union[str, bytes]):
|
||||
if isinstance(atr, str):
|
||||
atr = h2b(atr)
|
||||
self.atr = atr
|
||||
|
||||
@abc.abstractmethod
|
||||
def power_change(self, new_state: bool):
|
||||
"""Power the card on or off."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def reset(self):
|
||||
"""Reset the card."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def rx_c_apdu(self, apdu: bytes):
|
||||
"""Receive a C-APDU from the reader/application."""
|
||||
pass
|
||||
|
||||
def tx_r_apdu(self, apdu: Union[str, bytes]):
|
||||
if isinstance(apdu, str):
|
||||
apdu = h2b(apdu)
|
||||
logger.info("R-APDU: %s" % b2h(apdu))
|
||||
self.protocol.send_data(apdu)
|
||||
|
||||
class VpcdProtocolBase(Protocol):
|
||||
# Prefixed couldn't be used as the this.length wouldn't be available in this case
|
||||
construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)),
|
||||
'data'/If(this.length > 1, Bytes(this.length)),
|
||||
'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4)))
|
||||
def __init__(self, vcard: VirtualCard):
|
||||
self.recvBuffer = b''
|
||||
self.connectionCorrupted = False
|
||||
self.pduReadTimer = None
|
||||
self.pduReadTimerSecs = 10
|
||||
self.callLater = reactor.callLater
|
||||
self.on = False
|
||||
self.vcard = vcard
|
||||
self.vcard.protocol = self
|
||||
|
||||
def dataReceived(self, data: bytes):
|
||||
"""entry point where twisted tells us data was received."""
|
||||
#logger.debug('Data received: %s' % b2h(data))
|
||||
self.recvBuffer = self.recvBuffer + data
|
||||
while True:
|
||||
if self.connectionCorrupted:
|
||||
return
|
||||
msg = self.readMessage()
|
||||
if msg is None:
|
||||
break
|
||||
self.endPDURead()
|
||||
self.rawMessageReceived(msg)
|
||||
|
||||
if len(self.recvBuffer) > 0:
|
||||
self.incompletePDURead()
|
||||
|
||||
def incompletePDURead(self):
|
||||
"""We have an incomplete PDU in readBuffer, schedule pduReadTimer"""
|
||||
if self.pduReadTimer and self.pduReadTimer.active():
|
||||
return
|
||||
self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout)
|
||||
|
||||
def endPDURead(self):
|
||||
"""We completed reading a PDU, cancel the pduReadTimer."""
|
||||
if self.pduReadTimer and self.pduReadTimer.active():
|
||||
self.pduReadTimer.cancel()
|
||||
|
||||
def readMessage(self) -> Optional[bytes]:
|
||||
"""read an entire [raw] message."""
|
||||
pduLen = self._getMessageLength()
|
||||
if pduLen is None:
|
||||
return None
|
||||
return self._getMessage(pduLen)
|
||||
|
||||
def _getMessageLength(self) -> Optional[int]:
|
||||
if len(self.recvBuffer) < 2:
|
||||
return None
|
||||
return struct.unpack('!H', self.recvBuffer[:2])[0]
|
||||
|
||||
def _getMessage(self, pduLen: int) -> Optional[bytes]:
|
||||
if len(self.recvBuffer) < pduLen+2:
|
||||
return None
|
||||
|
||||
message = self.recvBuffer[:pduLen+2]
|
||||
self.recvBuffer = self.recvBuffer[pduLen+2:]
|
||||
return message
|
||||
|
||||
def onPDUReadTimeout(self):
|
||||
logger.error('PDU read timed out. Buffer is now considered corrupt')
|
||||
#self.coruptDataReceived
|
||||
|
||||
def rawMessageReceived(self, message: bytes):
|
||||
"""Called once a complete binary vpcd message has been received."""
|
||||
pdu = None
|
||||
try:
|
||||
pdu = VpcdProtocolBase.construct.parse(message)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.critical('Received corrupt PDU %s' % b2h(message))
|
||||
#self.corupDataRecvd()
|
||||
else:
|
||||
self.PDUReceived(pdu)
|
||||
|
||||
def PDUReceived(self, pdu):
|
||||
logger.debug("Rx PDU: %s" % pdu)
|
||||
if pdu['data']:
|
||||
return self.on_rx_data(pdu)
|
||||
else:
|
||||
method = getattr(self, 'on_rx_' + pdu['ctrl'])
|
||||
return method(pdu)
|
||||
|
||||
def on_rx_atr(self, pdu):
|
||||
self.send_data(self.vcard.atr)
|
||||
|
||||
def on_rx_on(self, pdu):
|
||||
if self.on:
|
||||
return
|
||||
else:
|
||||
self.on = True
|
||||
self.vcard.power_change(self.on)
|
||||
|
||||
def on_rx_reset(self, pdu):
|
||||
self.vcard.reset()
|
||||
|
||||
def on_rx_off(self, pdu):
|
||||
if not self.on:
|
||||
return
|
||||
else:
|
||||
self.on = False
|
||||
self.vcard.power_change(self.on)
|
||||
|
||||
def on_rx_data(self, pdu):
|
||||
self.vcard.rx_c_apdu(pdu['data'])
|
||||
|
||||
def send_pdu(self, pdu):
|
||||
logger.debug("Sending PDU: %s" % pdu)
|
||||
encoded = VpcdProtocolBase.construct.build(pdu)
|
||||
#logger.debug("Sending binary: %s" % b2h(encoded))
|
||||
self.transport.write(encoded)
|
||||
|
||||
def send_data(self, data: Union[str, bytes]):
|
||||
if isinstance(data, str):
|
||||
data = h2b(data)
|
||||
return self.send_pdu({'length': 0, 'ctrl': '', 'data': data})
|
||||
|
||||
def send_ctrl(self, ctrl: str):
|
||||
return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''})
|
||||
|
||||
|
||||
class VpcdProtocolClient(VpcdProtocolBase):
|
||||
pass
|
||||
|
||||
|
||||
class VpcdClientFactory(ReconnectingClientFactory):
|
||||
def __init__(self, vcard_class: VirtualCard):
|
||||
self.vcard_class = vcard_class
|
||||
|
||||
def startedConnecting(self, connector):
|
||||
logger.debug('Started to connect')
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
logger.info('Connection established to %s' % addr)
|
||||
self.resetDelay()
|
||||
return VpcdProtocolClient(vcard = self.vcard_class())
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
logger.warning('Connection lost (reason: %s)' % reason)
|
||||
super().clientConnectionLost(connector, reason)
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
logger.warning('Connection failed (reason: %s)' % reason)
|
||||
super().clientConnectionFailed(connector, reason)
|
||||
|
||||
#######################################################################
|
||||
# Application
|
||||
#######################################################################
|
||||
|
||||
from pprint import pprint as pp
|
||||
|
||||
from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator
|
||||
from twisted.internet import reactor
|
||||
|
||||
from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService
|
||||
from smpp.twisted.protocol import SMPPClientProtocol
|
||||
from smpp.twisted.config import SMPPClientConfig
|
||||
from smpp.pdu.operations import SubmitSM, DeliverSM
|
||||
from smpp.pdu import pdu_types
|
||||
|
||||
from pySim.ota import OtaKeyset, OtaDialectSms
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
|
||||
class MyVcard(VirtualCard):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs)
|
||||
self.smpp_client = None
|
||||
# KIC1 + KID1 of 8988211000000467285
|
||||
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
|
||||
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
|
||||
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
|
||||
KID3 = h2b('12110C78E678C25408233076AA033615')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
|
||||
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
|
||||
self.ota_dialect = OtaDialectSms()
|
||||
self.tar = h2b('B00011')
|
||||
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
|
||||
def ensure_smpp(self):
|
||||
config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test')
|
||||
if self.smpp_client:
|
||||
return
|
||||
self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp)
|
||||
smpp = self.smpp_client.connectAndBind()
|
||||
#self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp)
|
||||
#d = self.smpp.connectTCP(config.host, config.port)
|
||||
#d = self.smpp.connectAndBind()
|
||||
#d.addCallback(self.forwardToClient, self.smpp)
|
||||
|
||||
def power_change(self, new_state: bool):
|
||||
if new_state:
|
||||
logger.info("POWER ON")
|
||||
self.ensure_smpp()
|
||||
else:
|
||||
logger.info("POWER OFF")
|
||||
|
||||
def reset(self):
|
||||
logger.info("RESET")
|
||||
|
||||
def rx_c_apdu(self, apdu: bytes):
|
||||
pp(self.smpp_client.smpp)
|
||||
logger.info("C-APDU: %s" % b2h(apdu))
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
# add user data header
|
||||
tpdu = b'\x02\x70\x00' + secured
|
||||
# send via SMPP
|
||||
self.tx_sms_tpdu(tpdu)
|
||||
#self.tx_r_apdu('9000')
|
||||
|
||||
def tx_sms_tpdu(self, tpdu: bytes):
|
||||
"""Send a SMS TPDU via SMPP SubmitSM."""
|
||||
dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
|
||||
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
|
||||
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
|
||||
gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
|
||||
submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class,
|
||||
protocol_id=0x7f, short_message=tpdu)
|
||||
self.smpp_client.smpp.sendDataRequest(submit)
|
||||
|
||||
def handleSmpp(self, smpp, pdu):
|
||||
#logger.info("Received SMPP %s" % pdu)
|
||||
data = pdu.params['short_message']
|
||||
#logger.info("Received SMS Data %s" % b2h(data))
|
||||
r, d = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data)
|
||||
logger.info("Decoded SMPP %s" % r)
|
||||
self.tx_r_apdu(r['last_response_data'] + r['last_status_word'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
import colorlog
|
||||
log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
|
||||
colorlog.basicConfig(level=logging.INFO, format = log_format)
|
||||
logger = colorlog.getLogger()
|
||||
|
||||
from twisted.internet import reactor
|
||||
host = 'localhost'
|
||||
port = 35963
|
||||
reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard))
|
||||
reactor.run()
|
||||
Reference in New Issue
Block a user