Compare commits
21 Commits
master
...
laforge/ot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b767a78935 | ||
|
|
89ff53922e | ||
|
|
9e9db415b9 | ||
|
|
3203f7d0ff | ||
|
|
bc23a2cc7b | ||
|
|
a5dd041f4c | ||
|
|
8e05d83913 | ||
|
|
4f73968bde | ||
|
|
f61196ace8 | ||
|
|
286d96c8ad | ||
|
|
165b145d48 | ||
|
|
e707a2fe0b | ||
|
|
dd2dc510d3 | ||
|
|
b4a5776815 | ||
|
|
227da5935f | ||
|
|
3fae277dfa | ||
|
|
d45123f6ac | ||
|
|
ad9c01f1c1 | ||
|
|
d2f1ee4c0e | ||
|
|
82c6575a77 | ||
|
|
dc5fdfca39 |
97
contrib/card2server.py
Executable file
97
contrib/card2server.py
Executable file
@@ -0,0 +1,97 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||
perform commands on it."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
import argparse
|
||||
from osmocom.utils import b2h
|
||||
import websockets
|
||||
|
||||
from pySim.transport import init_reader, argparse_add_reader_args, LinkBase
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||
|
||||
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class CardWsClientBlocking(WsClientBlocking):
|
||||
"""Implementation of the card (reader) client of the WSRC (WebSocket Remote Card) protocol"""
|
||||
|
||||
def __init__(self, ws_uri, tp: LinkBase):
|
||||
super().__init__(ws_uri)
|
||||
self.tp = tp
|
||||
|
||||
def perform_outbound_hello(self):
|
||||
hello_data = {
|
||||
'client_type': 'card',
|
||||
'atr': b2h(self.tp.get_atr()),
|
||||
# TODO: include various card information in the HELLO message
|
||||
}
|
||||
super().perform_outbound_hello(hello_data)
|
||||
|
||||
def handle_rx_c_apdu(self, rx: dict):
|
||||
"""handle an inbound APDU transceive command"""
|
||||
data, sw = self.tp.send_apdu(rx['command'])
|
||||
tx = {
|
||||
'response': data,
|
||||
'sw': sw,
|
||||
}
|
||||
self.tx_json('r_apdu', tx)
|
||||
|
||||
def handle_rx_disconnect(self, rx: dict):
|
||||
"""server tells us to disconnect"""
|
||||
self.tx_json('disconnect_ack')
|
||||
# FIXME: tear down connection and/or terminate entire program
|
||||
|
||||
def handle_rx_print(self, rx: dict):
|
||||
"""print a message (text) given by server to the local console/log"""
|
||||
print(rx['message'])
|
||||
# no response
|
||||
|
||||
def handle_rx_reset_req(self, rx: dict):
|
||||
"""server tells us to reset the card"""
|
||||
self.tp.reset()
|
||||
self.tx_json('reset_resp', {'atr': b2h(self.tp.get_atr())})
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
argparse_add_reader_args(parser)
|
||||
parser.add_argument("--uri", default="ws://localhost:8765/",
|
||||
help="URI of the sever to which to connect")
|
||||
|
||||
opts = parser.parse_args()
|
||||
|
||||
# open the card reader / slot
|
||||
logger.info("Initializing Card Reader...")
|
||||
tp = init_reader(opts)
|
||||
logger.info("Connecting to Card...")
|
||||
tp.connect()
|
||||
scc = SimCardCommands(transport=tp)
|
||||
logger.info("Detected Card with ATR: %s" % b2h(tp.get_atr()))
|
||||
|
||||
# TODO: gather various information about the card; print it
|
||||
|
||||
# create + connect the client to the server
|
||||
cl = CardWsClientBlocking(opts.uri, tp)
|
||||
logger.info("Connecting to remote server...")
|
||||
try:
|
||||
cl.connect()
|
||||
print("Successfully connected to Server")
|
||||
except ConnectionRefusedError as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# endless loop: wait for inbound command from server + execute it
|
||||
cl.rx_and_execute_cmd()
|
||||
# TODO: clean handling of websocket disconnect
|
||||
except websockets.exceptions.ConnectionClosedOK as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt as e:
|
||||
print(e.__class__.__name__)
|
||||
sys.exit(2)
|
||||
241
contrib/card_server.py
Executable file
241
contrib/card_server.py
Executable file
@@ -0,0 +1,241 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional, Tuple
|
||||
from websockets.asyncio.server import serve
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
from osmocom.utils import Hexstr, swap_nibbles
|
||||
|
||||
from pySim.utils import SwMatchstr, ResTuple, sw_match, dec_iccid
|
||||
from pySim.exceptions import SwMatchError
|
||||
|
||||
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
card_clients = set()
|
||||
user_clients = set()
|
||||
|
||||
class WsClient:
|
||||
def __init__(self, websocket, hello: dict):
|
||||
self.websocket = websocket
|
||||
self.hello = hello
|
||||
self.identity = {}
|
||||
|
||||
def __str__(self):
|
||||
return '%s(ws=%s)' % (self.__class__.__name__, self.websocket)
|
||||
|
||||
async def rx_json(self):
|
||||
rx = await self.websocket.recv()
|
||||
rx_js = json.loads(rx)
|
||||
logger.debug("Rx: %s", rx_js)
|
||||
assert 'msg_type' in rx
|
||||
return rx_js
|
||||
|
||||
async def tx_json(self, msg_type:str, d: dict = {}):
|
||||
"""Transmit a json-serializable dict to the peer"""
|
||||
d['msg_type'] = msg_type
|
||||
d_js = json.dumps(d)
|
||||
logger.debug("Tx: %s", d_js)
|
||||
await self.websocket.send(d_js)
|
||||
|
||||
async def tx_hello_ack(self):
|
||||
await self.tx_json('hello_ack')
|
||||
|
||||
async def xceive_json(self, msg_type:str, d:dict = {}, exp_msg_type:Optional[str] = None) -> dict:
|
||||
await self.tx_json(msg_type, d)
|
||||
rx = await self.rx_json()
|
||||
if exp_msg_type:
|
||||
assert rx['msg_type'] == exp_msg_type
|
||||
return rx;
|
||||
|
||||
async def tx_error(self, message: str):
|
||||
"""Transmit an error message to the peer"""
|
||||
event = {
|
||||
"message": message,
|
||||
}
|
||||
await self.tx_json('error', event)
|
||||
|
||||
async def ws_hdlr(self):
|
||||
"""kind of a 'main' function for the websocket client: wait for incoming message,
|
||||
and handle it."""
|
||||
try:
|
||||
async for message in self.websocket:
|
||||
method = getattr(self, 'handle_rx_%s' % message['msg_type'], None)
|
||||
if not method:
|
||||
await self.tx_error("Unknonw msg_type: %s" % message['msg_type'])
|
||||
else:
|
||||
method(message)
|
||||
except ConnectionClosedError:
|
||||
# we handle this in the outer loop
|
||||
pass
|
||||
|
||||
class CardClient(WsClient):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.state = 'init'
|
||||
|
||||
def __str__(self):
|
||||
eid = self.identity.get('EID', None)
|
||||
if eid:
|
||||
return '%s(EID=%s)' % (self.__class__.__name__, eid)
|
||||
iccid = self.identity.get('ICCID', None)
|
||||
if iccid:
|
||||
return '%s(ICCID=%s)' % (self.__class__.__name__, iccid)
|
||||
return super().__str__()
|
||||
|
||||
"""A websocket client that represents a reader/card. This is what we use to talk to a card"""
|
||||
async def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||
"""transceive a single APDU with the card"""
|
||||
message = await self.xceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||
return message['response'], message['sw']
|
||||
|
||||
async def xceive_apdu(self, pdu: Hexstr) -> ResTuple:
|
||||
"""transceive an APDU with the card, handling T=0 GET_RESPONSE cases"""
|
||||
prev_pdu = pdu
|
||||
data, sw = await self.xceive_apdu_raw(pdu)
|
||||
|
||||
if sw is not None:
|
||||
while (sw[0:2] in ['9f', '61', '62', '63']):
|
||||
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
|
||||
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
|
||||
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
|
||||
pdu_gr = pdu[0:2] + 'c00000' + sw[2:4]
|
||||
prev_pdu = pdu_gr
|
||||
d, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||
data += d
|
||||
if sw[0:2] == '6c':
|
||||
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
|
||||
pdu_gr = prev_pdu[0:8] + sw[2:4]
|
||||
data, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||
|
||||
return data, sw
|
||||
|
||||
async def xceive_apdu_checksw(self, pdu: Hexstr, sw: SwMatchstr = "9000") -> ResTuple:
|
||||
"""like xceive_apdu, but checking the status word matches the expected pattern"""
|
||||
rv = await self.xceive_apdu(pdu)
|
||||
last_sw = rv[1]
|
||||
|
||||
if not sw_match(rv[1], sw):
|
||||
raise SwMatchError(rv[1], sw.lower())
|
||||
return rv
|
||||
|
||||
async def card_reset(self):
|
||||
"""reset the card"""
|
||||
rx = await self.xceive_json('reset_req', exp_msg_type='reset_resp')
|
||||
|
||||
async def get_iccid(self):
|
||||
"""high-level method to obtain the ICCID of the card"""
|
||||
await self.xceive_apdu_checksw('00a40000023f00') # SELECT MF
|
||||
await self.xceive_apdu_checksw('00a40000022fe2') # SELECT EF.ICCID
|
||||
res, sw = await self.xceive_apdu_checksw('00b0000000') # READ BINARY
|
||||
return dec_iccid(res)
|
||||
|
||||
async def get_eid_sgp22(self):
|
||||
"""high-level method to obtain the EID of a SGP.22 consumer eUICC"""
|
||||
await self.xceive_apdu_checksw('00a4040410a0000005591010ffffffff8900000100')
|
||||
res, sw = await self.xceive_apdu_checksw('80e2910006bf3e035c015a')
|
||||
return res[-32:]
|
||||
|
||||
async def identify(self):
|
||||
# identify the card by asking for its EID and/or ICCID
|
||||
try:
|
||||
eid = await self.get_eid_sgp22()
|
||||
logger.debug("EID: %s", eid)
|
||||
self.identity['EID'] = eid
|
||||
except SwMatchError:
|
||||
pass
|
||||
try:
|
||||
iccid = await self.get_iccid()
|
||||
logger.debug("ICCID: %s", iccid)
|
||||
self.identity['ICCID'] = iccid
|
||||
except SwMatchError:
|
||||
pass
|
||||
logger.info("Card now in READY state")
|
||||
self.state = 'ready'
|
||||
|
||||
@staticmethod
|
||||
def find_client_for_id(id_type: str, id_str: str) -> Optional['CardClient']:
|
||||
for c in card_clients:
|
||||
print("testing card %s in state %s" % (c, c.state))
|
||||
if c.state != 'ready':
|
||||
continue
|
||||
c_id = c.identity.get(id_type.upper(), None)
|
||||
if c_id and c_id.lower() == id_str.lower():
|
||||
return c
|
||||
return None
|
||||
|
||||
class UserClient(WsClient):
|
||||
"""A websocket client representing a user application like pySim-shell."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.state = 'init'
|
||||
|
||||
async def state_init(self):
|
||||
"""Wait for incoming 'select_card' and process it."""
|
||||
while True:
|
||||
rx = await self.rx_json()
|
||||
if rx['msg_type'] == 'select_card':
|
||||
# look-up if the card can be found
|
||||
card = CardClient.find_client_for_id(rx['id_type'], rx['id_str'])
|
||||
if not card:
|
||||
await self.tx_error('No CardClient found for %s == %s' % (rx['id_type'], rx['id_str']))
|
||||
continue
|
||||
# transition to next statee
|
||||
self.state = 'associated'
|
||||
card.state = 'associated'
|
||||
self.card = card
|
||||
await self.tx_json('select_card_ack', {'identities': card.identity})
|
||||
break
|
||||
else:
|
||||
self.tx_error('Unknown message type %s' % rx['msg_type'])
|
||||
|
||||
async def state_selected(self):
|
||||
while True:
|
||||
rx = await self.rx_json()
|
||||
if rx['msg_type'] == 'c_apdu':
|
||||
rsp, sw = await self.card.xceive_apdu_raw(rx['command'])
|
||||
await self.tx_json('r_apdu', {'response': rsp, 'sw': sw})
|
||||
|
||||
|
||||
async def ws_conn_hdlr(websocket):
|
||||
rx_raw = await websocket.recv()
|
||||
rx = json.loads(rx_raw)
|
||||
assert rx['msg_type'] == 'hello'
|
||||
client_type = rx['client_type']
|
||||
logger.info("New client (type %s) connection accepted", client_type)
|
||||
|
||||
if client_type == 'card':
|
||||
card = CardClient(websocket, rx)
|
||||
await card.tx_hello_ack()
|
||||
card_clients.add(card)
|
||||
# first obtain the identity of the card
|
||||
await card.identify()
|
||||
# then go into the "main loop"
|
||||
try:
|
||||
await card.ws_hdlr()
|
||||
finally:
|
||||
logger.info("%s: connection closed", card)
|
||||
card_clients.remove(card)
|
||||
elif client_type == 'user':
|
||||
user = UserClient(websocket, rx)
|
||||
await user.tx_hello_ack()
|
||||
user_clients.add(user)
|
||||
# first wait for the user to specify the select the card
|
||||
await user.state_init()
|
||||
try:
|
||||
await user.state_selected()
|
||||
finally:
|
||||
logger.info("%s: connection closed", user)
|
||||
user_clients.remove(user)
|
||||
else:
|
||||
logger.info("Rejecting client (unknown type %s) connection", client_type)
|
||||
raise ValueError
|
||||
|
||||
|
||||
async def main():
|
||||
async with serve(ws_conn_hdlr, "localhost", 8765):
|
||||
await asyncio.get_running_loop().create_future() # run forever
|
||||
|
||||
asyncio.run(main())
|
||||
206
contrib/fsdump2saip.py
Executable file
206
contrib/fsdump2saip.py
Executable file
@@ -0,0 +1,206 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
# This is a script to generate a [partial] eSIM profile from the 'fsdump' of another USIM/ISIM. This is
|
||||
# useful to generate an "as close as possible" eSIM from a physical USIM, as far as that is possible
|
||||
# programmatically and in a portable way.
|
||||
#
|
||||
# Of course, this really only affects the file sytem aspects of the card. It's not possible
|
||||
# to read the K/OPc or other authentication related parameters off a random USIM, and hence
|
||||
# we cannot replicate that. Similarly, it's not possible to export the java applets from a USIM,
|
||||
# and hence we cannot replicate those.
|
||||
|
||||
import argparse
|
||||
|
||||
from pySim.esim.saip import *
|
||||
from pySim.ts_102_221 import *
|
||||
|
||||
class FsdumpToSaip:
|
||||
def __init__(self, pes: ProfileElementSequence):
|
||||
self.pes = pes
|
||||
|
||||
@staticmethod
|
||||
def fcp_raw2saip_fcp(fname:str, fcp_raw: bytes) -> Dict:
|
||||
"""Convert a raw TLV-encoded FCP to a SAIP dict-format (as needed by asn1encode)."""
|
||||
ftype = fname.split('.')[0]
|
||||
# use the raw FCP as basis so we don't get stuck with potentially old decoder bugs
|
||||
# ore future format incompatibilities
|
||||
fcp = FcpTemplate()
|
||||
fcp.from_tlv(fcp_raw)
|
||||
r = {}
|
||||
|
||||
r['fileDescriptor'] = fcp.child_by_type(FileDescriptor).to_bytes()
|
||||
|
||||
file_id = fcp.child_by_type(FileIdentifier)
|
||||
if file_id:
|
||||
r['fileID'] = file_id.to_bytes()
|
||||
else:
|
||||
if ftype in ['ADF']:
|
||||
print('%s is an ADF but has no [mandatory] file_id!' % fname)
|
||||
#raise ValueError('%s is an ADF but has no [mandatory] file_id!' % fname)
|
||||
r['fileID'] = b'\x7f\xff' # FIXME: auto-increment
|
||||
|
||||
df_name = fcp.child_by_type(DfName)
|
||||
if ftype in ['ADF']:
|
||||
if not df_name:
|
||||
raise ValueError('%s is an ADF but has no [mandatory] df_name!' % fname)
|
||||
r['dfName'] = df_name.to_bytes()
|
||||
|
||||
lcsi_byte = fcp.child_by_type(LifeCycleStatusInteger).to_bytes()
|
||||
if lcsi_byte != b'\x05':
|
||||
r['lcsi'] = lcsi_byte
|
||||
|
||||
sa_ref = fcp.child_by_type(SecurityAttribReferenced)
|
||||
if sa_ref:
|
||||
r['securityAttributesReferenced'] = sa_ref.to_bytes()
|
||||
|
||||
file_size = fcp.child_by_type(FileSize)
|
||||
if ftype in ['EF']:
|
||||
if file_size:
|
||||
r['efFileSize'] = file_size.to_bytes()
|
||||
|
||||
psdo = fcp.child_by_type(PinStatusTemplate_DO)
|
||||
if ftype in ['MF', 'ADF', 'DF']:
|
||||
if not psdo:
|
||||
raise ValueError('%s is an %s but has no [mandatory] PinStatusTemplateDO' % fname)
|
||||
else:
|
||||
r['pinStatusTemplateDO'] = psdo.to_bytes()
|
||||
|
||||
sfid = fcp.child_by_type(ShortFileIdentifier)
|
||||
if sfid and sfid.decoded:
|
||||
if ftype not in ['EF']:
|
||||
raise ValueError('%s is not an EF but has [forbidden] shortEFID' % fname)
|
||||
r['shortEFID'] = sfid.to_bytes()
|
||||
|
||||
pinfo = fcp.child_by_type(ProprietaryInformation)
|
||||
if pinfo and ftype in ['EF']:
|
||||
spinfo = pinfo.child_by_type(SpecialFileInfo)
|
||||
fill_p = pinfo.child_by_type(FillingPattern)
|
||||
repeat_p = pinfo.child_by_type(RepeatPattern)
|
||||
# only exists for BER-TLV files
|
||||
max_fsize = pinfo.child_by_type(MaximumFileSize)
|
||||
|
||||
if spinfo or fill_p or repeat_p or max_fsize:
|
||||
r['proprietaryEFInfo'] = {}
|
||||
if spinfo:
|
||||
r['proprietaryEFInfo']['specialFileInformation'] = spinfo.to_bytes()
|
||||
if fill_p:
|
||||
r['proprietaryEFInfo']['fillPattern'] = fill_p.to_bytes()
|
||||
if repeat_p:
|
||||
r['proprietaryEFInfo']['repeatPattern'] = repeat_p.to_bytes()
|
||||
if max_fsize:
|
||||
r['proprietaryEFInfo']['maximumFileSize'] = max_fsize.to_bytes()
|
||||
|
||||
# TODO: linkPath
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def fcp_fsdump2saip(fsdump_ef: Dict):
|
||||
"""Convert a file from its "fsdump" representation to the SAIP representation of a File type
|
||||
in the decoded format as used by the asn1tools-generated codec."""
|
||||
# first convert the FCP
|
||||
path = fsdump_ef['path']
|
||||
fdesc = FsdumpToSaip.fcp_raw2saip_fcp(path[-1], h2b(fsdump_ef['fcp_raw']))
|
||||
r = [
|
||||
('fileDescriptor', fdesc),
|
||||
]
|
||||
# then convert the body. We're doing a straight-forward conversion without any optimization
|
||||
# like not encoding all-ff files. This should be done by a subsequent optimizer
|
||||
if 'body' in fsdump_ef and fsdump_ef['body']:
|
||||
if isinstance(fsdump_ef['body'], list):
|
||||
for b_seg in fsdump_ef['body']:
|
||||
r.append(('fillFileContent', h2b(b_seg)))
|
||||
else:
|
||||
r.append(('fillFileContent', h2b(fsdump_ef['body'])))
|
||||
print(fsdump_ef['path'])
|
||||
return r
|
||||
|
||||
def add_file_from_fsdump(self, fsdump_ef: Dict):
|
||||
fid = int(fsdump_ef['fcp']['file_identifier'])
|
||||
# determine NAA
|
||||
if fsdump_ef['path'][0:1] == ['MF', 'ADF.USIM']:
|
||||
naa = NaaUsim
|
||||
elif fsdump_ef['path'][0:1] == ['MF', 'ADF.ISIM']:
|
||||
naa = NaaIsim
|
||||
else:
|
||||
print("Unable to determine NAA for %s" % fsdump_ef['path'])
|
||||
return
|
||||
pes.pes_by_naa[naa.name]
|
||||
for pe in pes:
|
||||
print("PE %s" % pe)
|
||||
if not isinstance(pe, FsProfileElement):
|
||||
print("Skipping PE %s" % pe)
|
||||
continue
|
||||
if not pe.template:
|
||||
print("No template for PE %s" % pe )
|
||||
continue
|
||||
if not fid in pe.template.files_by_fid:
|
||||
print("File %04x not available in template; must create it using GenericFileMgmt" % fid)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('fsdump', help='')
|
||||
|
||||
def has_unsupported_path_prefix(path: List[str]) -> bool:
|
||||
# skip some paths from export as they don't exist in an eSIM profile
|
||||
UNSUPPORTED_PATHS = [
|
||||
['MF', 'DF.GSM'],
|
||||
]
|
||||
for p in UNSUPPORTED_PATHS:
|
||||
prefix = path[:len(p)]
|
||||
if prefix == p:
|
||||
return True
|
||||
# any ADF not USIM or ISIM are unsupported
|
||||
SUPPORTED_ADFS = [ 'ADF.USIM', 'ADF.ISIM' ]
|
||||
if len(path) == 2 and path[0] == 'MF' and path[1].startswith('ADF.') and path[1] not in SUPPORTED_ADFS:
|
||||
return True
|
||||
return False
|
||||
|
||||
import traceback
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = parser.parse_args()
|
||||
|
||||
with open(opts.fsdump, 'r') as f:
|
||||
fsdump = json.loads(f.read())
|
||||
|
||||
pes = ProfileElementSequence()
|
||||
|
||||
# FIXME: fsdump has strting-name path, but we need FID-list path for ProfileElementSequence
|
||||
for path, fsdump_ef in fsdump['files'].items():
|
||||
print("=" * 80)
|
||||
#print(fsdump_ef)
|
||||
if not 'fcp_raw' in fsdump_ef:
|
||||
continue
|
||||
if has_unsupported_path_prefix(fsdump_ef['path']):
|
||||
print("Skipping eSIM-unsupported path %s" % ('/'.join(fsdump_ef['path'])))
|
||||
continue
|
||||
saip_dec = FsdumpToSaip.fcp_fsdump2saip(fsdump_ef)
|
||||
#print(saip_dec)
|
||||
try:
|
||||
f = pes.add_file_at_path(Path(path), saip_dec)
|
||||
print(repr(f))
|
||||
except Exception as e:
|
||||
print("EXCEPTION: %s" % traceback.format_exc())
|
||||
#print("EXCEPTION: %s" % e)
|
||||
continue
|
||||
|
||||
print("=== Tree === ")
|
||||
pes.mf.print_tree()
|
||||
|
||||
# FIXME: export the actual PE Sequence
|
||||
|
||||
@@ -48,6 +48,7 @@ pySim consists of several parts:
|
||||
sim-rest
|
||||
suci-keytool
|
||||
saip-tool
|
||||
wsrc
|
||||
|
||||
|
||||
Indices and tables
|
||||
|
||||
31
docs/wsrc.rst
Normal file
31
docs/wsrc.rst
Normal file
@@ -0,0 +1,31 @@
|
||||
WebSocket Remote Card (WSRC)
|
||||
============================
|
||||
|
||||
WSRC (*Web Socket Remote Card*) is a mechanism by which card readers can be made remotely available
|
||||
via a computer network. The transport mechanism is (as the name implies) a WebSocket. This transport
|
||||
method was chosen to be as firewall/NAT friendly as possible.
|
||||
|
||||
WSRC Network Architecture
|
||||
-------------------------
|
||||
|
||||
In a WSRC network, there are three major elements:
|
||||
|
||||
* The **WSRC Card Client** which exposes a locally attached smart card (usually via a Smart Card Reader)
|
||||
to a remote *WSRC Server*
|
||||
* The **WSRC Server** manges incoming connections from both *WSRC Card Clients* as well as *WSRC User Clients*
|
||||
* The **WSRC User Client** is a user application, like for example pySim-shell, which is accessing a remote
|
||||
card by connecting to the *WSRC Server* which relays the information to the selected *WSRC Card Client*
|
||||
|
||||
WSRC Protocol
|
||||
-------------
|
||||
|
||||
The WSRC protocl consits of JSON objects being sent over a websocket. The websocket communication itself
|
||||
is based on HTTP and should usually operate via TLS for security reasons.
|
||||
|
||||
The detailed protocol is currently still WIP. The plan is to document it here.
|
||||
|
||||
|
||||
pySim implementations
|
||||
---------------------
|
||||
|
||||
TBD
|
||||
154
ota_test.py
Executable file
154
ota_test.py
Executable file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
from pySim.ota import *
|
||||
from pySim.sms import SMS_SUBMIT, SMS_DELIVER, AddressField
|
||||
from pySim.utils import h2b, h2b
|
||||
|
||||
# pre-defined SPI values for use in test cases below
|
||||
SPI_CC_POR_CIPHERED_CC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True,
|
||||
'por_rc_cc_ds': 'cc',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_POR_UNCIPHERED_CC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'cc',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_POR_UNCIPHERED_NOCC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'no_rc_cc_ds',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
# SJA5 SAMPLE cards provisioned by execute_ipr.py
|
||||
OTA_KEYSET_SJA5_SAMPLES = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3,
|
||||
algo_auth='triple_des_cbc2', kid_idx=3,
|
||||
kic=h2b('300102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('301102030405060708090a0b0c0d0e0f'))
|
||||
|
||||
OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
|
||||
algo_auth='aes_cmac', kid_idx=2,
|
||||
kic=h2b('200102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('201102030405060708090a0b0c0d0e0f'))
|
||||
|
||||
# TS.48 profile on sysmoEUICC1-C2G
|
||||
OTA_KEYSET_C2G_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
|
||||
algo_auth='aes_cmac', kid_idx=2,
|
||||
kic=h2b('66778899AABBCCDD1122334455EEFF10'),
|
||||
kid=h2b('112233445566778899AABBCCDDEEFF10'))
|
||||
|
||||
|
||||
# ISD-R on sysmoEUICC1-C2G
|
||||
OTA_KEYSET_C2G_AES128_ISDR = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('B52F9C5938D1C19ED73E1AE772937FD7'),
|
||||
kid=h2b('3BC696ACD1EEC95A6624F7330D22FC81'))
|
||||
|
||||
# ISD-A on sysmoEUICC1-C2G
|
||||
OTA_KEYSET_C2G_AES128_ISDA = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('8DAAD1DAAA8D7C9000E3BBED8B7556E7'),
|
||||
kid=h2b('5392D503AE050DDEAF81AFAEFF275A2B'))
|
||||
|
||||
# TODO: AES192
|
||||
# TODO: AES256
|
||||
|
||||
testcases = [
|
||||
{
|
||||
'name': '3DES-SJA5-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
|
||||
}
|
||||
}, {
|
||||
'name': '3DES-SJA5-UNCIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
|
||||
}
|
||||
}, {
|
||||
'name': '3DES-SJA5-UNCIPHERED-NOCC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100000e0ab0001100000000000000016132',
|
||||
}
|
||||
}, {
|
||||
'name': 'AES128-C2G-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_C2G_AES128_ISDR,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
'request': {
|
||||
#'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'apdu': h2b('80ec800300'),
|
||||
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
for t in testcases:
|
||||
print()
|
||||
print("==== TESTCASE: %s" % t['name'])
|
||||
od = t['ota_keyset']
|
||||
|
||||
# RAM: B00000
|
||||
# SIM RFM: B00010
|
||||
# USIM RFM: B00011
|
||||
# ISD-R: 000001
|
||||
# ECASD: 000002
|
||||
tar = h2b('000001')
|
||||
|
||||
dialect = OtaDialectSms()
|
||||
outp = dialect.encode_cmd(od, tar, t['spi'], apdu=t['request']['apdu'])
|
||||
print("result: %s" % b2h(outp))
|
||||
#assert(b2h(outp) == t['request']['encoded_cmd'])
|
||||
|
||||
with_udh = b'\x02\x70\x00' + outp
|
||||
print("with_udh: %s" % b2h(with_udh))
|
||||
|
||||
|
||||
# processing of the response from the card
|
||||
da = AddressField('12345678', 'unknown', 'isdn_e164')
|
||||
#tpdu = SMS_SUBMIT(tp_udhi=True, tp_mr=0x23, tp_da=da, tp_pid=0x7F, tp_dcs=0xF6, tp_udl=3, tp_ud=with_udh)
|
||||
tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=da, tp_pid=0x7F, tp_dcs=0xF6, tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
|
||||
print("TPDU: %s" % tpdu)
|
||||
print("tpdu: %s" % b2h(tpdu.to_bytes()))
|
||||
#assert(b2h(tpdu.to_bytes()) == t['request']['encoded_tpdu'])
|
||||
|
||||
r = dialect.decode_resp(od, t['spi'], t['response']['encoded_resp'])
|
||||
print("RESP: ", r)
|
||||
@@ -23,6 +23,7 @@ from pySim.apdu_source.gsmtap import GsmtapApduSource
|
||||
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive
|
||||
from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap
|
||||
from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource
|
||||
from pySim.apdu_source.stdin_hex import StdinHexApduSource
|
||||
|
||||
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
|
||||
|
||||
@@ -32,10 +33,11 @@ logger = colorlog.getLogger()
|
||||
|
||||
# merge all of the command sets into one global set. This will override instructions,
|
||||
# the one from the 'last' set in the addition below will prevail.
|
||||
from pySim.apdu.ts_51_011 import ApduCommands as SimApduCommands
|
||||
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
|
||||
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
|
||||
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
|
||||
ApduCommands = UiccApduCommands + UsimApduCommands #+ GpApduCommands
|
||||
ApduCommands = SimApduCommands + UiccApduCommands + UsimApduCommands #+ GpApduCommands
|
||||
|
||||
|
||||
class DummySimLink(LinkBase):
|
||||
@@ -190,6 +192,10 @@ parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
|
||||
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
|
||||
help='Name of te log file to be read')
|
||||
|
||||
parser_stdin_hex = subparsers.add_parser('stdin-hex', help="""
|
||||
Read APDUs as hex-string from stdin.""")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
opts = option_parser.parse_args()
|
||||
@@ -205,6 +211,8 @@ if __name__ == '__main__':
|
||||
s = PysharkGsmtapPcap(opts.pcap_file)
|
||||
elif opts.source == 'tca-loader-log':
|
||||
s = TcaLoaderLogApduSource(opts.log_file)
|
||||
elif opts.source == 'stdin-hex':
|
||||
s = StdinHexApduSource()
|
||||
else:
|
||||
raise ValueError("unsupported source %s", opts.source)
|
||||
|
||||
|
||||
339
pySim/apdu/ts_51_011.py
Normal file
339
pySim/apdu/ts_51_011.py
Normal file
@@ -0,0 +1,339 @@
|
||||
# coding=utf-8
|
||||
"""APDU definitions/decoders of 3GPP TS 51.011, the classic SIM spec.
|
||||
|
||||
(C) 2022 by Harald Welte <laforge@osmocom.org>
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from construct import GreedyRange, Struct
|
||||
from pySim.construct import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.runtime import RuntimeLchan
|
||||
from pySim.apdu import ApduCommand, ApduCommandSet
|
||||
from typing import Optional, Dict, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# TS 51.011 Section 9.2.1
|
||||
class SimSelect(ApduCommand, n='SELECT', ins=0xA4, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
path = [self.cmd_data[i:i+2] for i in range(0, len(self.cmd_data), 2)]
|
||||
for file in path:
|
||||
file_hex = b2h(file)
|
||||
sels = lchan.selected_file.get_selectables(['FIDS'])
|
||||
if file_hex in sels:
|
||||
if self.successful:
|
||||
#print("\tSELECT %s" % sels[file_hex])
|
||||
lchan.selected_file = sels[file_hex]
|
||||
else:
|
||||
#print("\tSELECT %s FAILED" % sels[file_hex])
|
||||
pass
|
||||
continue
|
||||
logger.warning('SELECT UNKNOWN FID %s (%s)' % (file_hex, '/'.join([b2h(x) for x in path])))
|
||||
if len(self.cmd_data) != 2:
|
||||
raise ValueError('Expecting a 2-byte FID')
|
||||
|
||||
# decode the SELECT response
|
||||
if self.successful:
|
||||
self.file = lchan.selected_file
|
||||
if 'body' in self.rsp_dict:
|
||||
# not every SELECT is asking for the FCP in response...
|
||||
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
|
||||
return None
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.2
|
||||
class SimStatus(ApduCommand, n='STATUS', ins=0xF2, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
if self.successful:
|
||||
if 'body' in self.rsp_dict:
|
||||
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
|
||||
|
||||
def _decode_binary_p1p2(p1, p2) -> Dict:
|
||||
ret = {}
|
||||
if p1 & 0x80:
|
||||
ret['file'] = 'sfi'
|
||||
ret['sfi'] = p1 & 0x1f
|
||||
ret['offset'] = p2
|
||||
else:
|
||||
ret['file'] = 'currently_selected_ef'
|
||||
ret['offset'] = ((p1 & 0x7f) << 8) & p2
|
||||
return ret
|
||||
|
||||
# TS 51.011 Section 9.2.3 / 31.101
|
||||
class ReadBinary(ApduCommand, n='READ BINARY', ins=0xB0, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
def _decode_p1p2(self):
|
||||
return _decode_binary_p1p2(self.p1, self.p2)
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, TransparentEF):
|
||||
return b2h(self.rsp_data)
|
||||
# our decoders don't work for non-zero offsets / short reads
|
||||
if self.cmd_dict['offset'] != 0 or self.lr < self.file.size[0]:
|
||||
return b2h(self.rsp_data)
|
||||
method = getattr(self.file, 'decode_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.rsp_data)
|
||||
|
||||
# TS 51.011 Section 9.2.4 / 31.101
|
||||
class UpdateBinary(ApduCommand, n='UPDATE BINARY', ins=0xD6, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
def _decode_p1p2(self):
|
||||
return _decode_binary_p1p2(self.p1, self.p2)
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, TransparentEF):
|
||||
return b2h(self.rsp_data)
|
||||
# our decoders don't work for non-zero offsets / short writes
|
||||
if self.cmd_dict['offset'] != 0 or self.lc < self.file.size[0]:
|
||||
return b2h(self.cmd_data)
|
||||
method = getattr(self.file, 'decode_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.cmd_data)
|
||||
|
||||
def _decode_record_p1p2(p1, p2):
|
||||
ret = {}
|
||||
ret['record_number'] = p1
|
||||
if p2 >> 3 == 0:
|
||||
ret['file'] = 'currently_selected_ef'
|
||||
else:
|
||||
ret['file'] = 'sfi'
|
||||
ret['sfi'] = p2 >> 3
|
||||
mode = p2 & 0x7
|
||||
if mode == 2:
|
||||
ret['mode'] = 'next_record'
|
||||
elif mode == 3:
|
||||
ret['mode'] = 'previous_record'
|
||||
elif mode == 8:
|
||||
ret['mode'] = 'absolute_current'
|
||||
return ret
|
||||
|
||||
# TS 51.011 Section 9.2.5
|
||||
class ReadRecord(ApduCommand, n='READ RECORD', ins=0xB2, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
def _decode_p1p2(self):
|
||||
r = _decode_record_p1p2(self.p1, self.p2)
|
||||
self.col_id = '%02u' % r['record_number']
|
||||
return r
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, LinFixedEF):
|
||||
return b2h(self.rsp_data)
|
||||
method = getattr(self.file, 'decode_record_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.rsp_data)
|
||||
|
||||
# TS 51.011 Section 9.2.6
|
||||
class UpdateRecord(ApduCommand, n='UPDATE RECORD', ins=0xDC, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
def _decode_p1p2(self):
|
||||
r = _decode_record_p1p2(self.p1, self.p2)
|
||||
self.col_id = '%02u' % r['record_number']
|
||||
return r
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
if not isinstance(self.file, LinFixedEF):
|
||||
return b2h(self.cmd_data)
|
||||
method = getattr(self.file, 'decode_record_bin', None)
|
||||
if self.successful and callable(method):
|
||||
return method(self.cmd_data)
|
||||
|
||||
# TS 51.011 Section 9.2.7
|
||||
class Seek(ApduCommand, n='SEEK', ins=0xA2, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
_construct_rsp = GreedyRange(Int8ub)
|
||||
|
||||
def _decode_p1p2(self):
|
||||
ret = {}
|
||||
sfi = self.p2 >> 3
|
||||
if sfi == 0:
|
||||
ret['file'] = 'currently_selected_ef'
|
||||
else:
|
||||
ret['file'] = 'sfi'
|
||||
ret['sfi'] = sfi
|
||||
mode = self.p2 & 0x7
|
||||
if mode in [0x4, 0x5]:
|
||||
if mode == 0x4:
|
||||
ret['mode'] = 'forward_search'
|
||||
else:
|
||||
ret['mode'] = 'backward_search'
|
||||
ret['record_number'] = self.p1
|
||||
self.col_id = '%02u' % ret['record_number']
|
||||
elif mode == 6:
|
||||
ret['mode'] = 'enhanced_search'
|
||||
# TODO: further decode
|
||||
elif mode == 7:
|
||||
ret['mode'] = 'proprietary_search'
|
||||
return ret
|
||||
|
||||
def _decode_cmd(self):
|
||||
ret = self._decode_p1p2()
|
||||
if self.cmd_data:
|
||||
if ret['mode'] == 'enhanced_search':
|
||||
ret['search_indication'] = b2h(self.cmd_data[:2])
|
||||
ret['search_string'] = b2h(self.cmd_data[2:])
|
||||
else:
|
||||
ret['search_string'] = b2h(self.cmd_data)
|
||||
return ret
|
||||
|
||||
def process_on_lchan(self, lchan):
|
||||
self._determine_file(lchan)
|
||||
return self.to_dict()
|
||||
|
||||
# TS 51.011 Section 9.2.8
|
||||
class Increase(ApduCommand, n='INCREASE', ins=0x32, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
|
||||
PinConstructP2 = BitStruct('scope'/Enum(Flag, global_mf=0, specific_df_adf=1),
|
||||
BitsInteger(2), 'reference_data_nr'/BitsInteger(5))
|
||||
|
||||
# TS 51.011 Section 9.2.9
|
||||
class VerifyChv(ApduCommand, n='VERIFY CHV', ins=0x20, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
@staticmethod
|
||||
def _pin_process(apdu):
|
||||
processed = {
|
||||
'scope': apdu.cmd_dict['p2']['scope'],
|
||||
'referenced_data_nr': apdu.cmd_dict['p2']['reference_data_nr'],
|
||||
}
|
||||
if apdu.lc == 0:
|
||||
# this is just a question on the counters remaining
|
||||
processed['mode'] = 'check_remaining_attempts'
|
||||
else:
|
||||
processed['pin'] = b2h(apdu.cmd_data)
|
||||
if apdu.sw[0] == 0x63:
|
||||
processed['remaining_attempts'] = apdu.sw[1] & 0xf
|
||||
return processed
|
||||
|
||||
@staticmethod
|
||||
def _pin_is_success(sw):
|
||||
if sw[0] == 0x63:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.10
|
||||
class ChangeChv(ApduCommand, n='CHANGE CHV', ins=0x24, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.11
|
||||
class DisableChv(ApduCommand, n='DISABLE CHV', ins=0x26, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.12
|
||||
class EnableChv(ApduCommand, n='ENABLE CHV', ins=0x28, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.13
|
||||
class UnblockChv(ApduCommand, n='UNBLOCK CHV', ins=0x2C, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
_construct_p2 = PinConstructP2
|
||||
|
||||
def process_on_lchan(self, lchan: RuntimeLchan):
|
||||
return VerifyChv._pin_process(self)
|
||||
|
||||
def _is_success(self):
|
||||
return VerifyChv._pin_is_success(self.sw)
|
||||
|
||||
|
||||
# TS 51.011 Section 9.2.14
|
||||
class Invalidate(ApduCommand, n='INVALIDATE', ins=0x04, cla=['A0']):
|
||||
_apdu_case = 1
|
||||
_construct_p1 = BitStruct(BitsInteger(4),
|
||||
'select_mode'/Enum(BitsInteger(4), ef_by_file_id=0,
|
||||
path_from_mf=8, path_from_current_df=9))
|
||||
|
||||
# TS 51.011 Section 9.2.15
|
||||
class Rehabilitate(ApduCommand, n='REHABILITATE', ins=0x44, cla=['A0']):
|
||||
_apdu_case = 1
|
||||
_construct_p1 = Invalidate._construct_p1
|
||||
|
||||
# TS 51.011 Section 9.2.16
|
||||
class RunGsmAlgorithm(ApduCommand, n='RUN GSM ALGORITHM', ins=0x88, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
_construct = Struct('rand'/Bytes(16))
|
||||
_construct_rsp = Struct('sres'/Bytes(4), 'kc'/Bytes(8))
|
||||
|
||||
# TS 51.011 Section 9.2.17
|
||||
class Sleep(ApduCommand, n='SLEEP', ins=0xFA, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
# TS 51.011 Section 9.2.18
|
||||
class GetResponse(ApduCommand, n='GET RESPONSE', ins=0xC0, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
# TS 51.011 Section 9.2.19
|
||||
class TerminalProfile(ApduCommand, n='TERMINAL PROFILE', ins=0x10, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
|
||||
# TS 51.011 Section 9.2.20
|
||||
class Envelope(ApduCommand, n='ENVELOPE', ins=0xC2, cla=['A0']):
|
||||
_apdu_case = 4
|
||||
|
||||
# TS 51.011 Section 9.2.21
|
||||
class Fetch(ApduCommand, n='FETCH', ins=0x12, cla=['A0']):
|
||||
_apdu_case = 2
|
||||
|
||||
# TS 51.011 Section 9.2.22
|
||||
class TerminalResponse(ApduCommand, n='TERMINAL RESPONSE', ins=0x14, cla=['A0']):
|
||||
_apdu_case = 3
|
||||
|
||||
|
||||
ApduCommands = ApduCommandSet('TS 51.011', cmds=[SimSelect, SimStatus, ReadBinary, UpdateBinary, ReadRecord,
|
||||
UpdateRecord, Seek, Increase, VerifyChv, ChangeChv, DisableChv,
|
||||
EnableChv, UnblockChv, Invalidate, Rehabilitate, RunGsmAlgorithm,
|
||||
Sleep, GetResponse, TerminalProfile, Envelope, Fetch, TerminalResponse])
|
||||
39
pySim/apdu_source/stdin_hex.py
Normal file
39
pySim/apdu_source/stdin_hex.py
Normal file
@@ -0,0 +1,39 @@
|
||||
# coding=utf-8
|
||||
|
||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from pySim.utils import h2b
|
||||
from pySim.gsmtap import GsmtapSource
|
||||
|
||||
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
|
||||
from pySim.apdu.ts_102_222 import ApduCommands as UiccAdmApduCommands
|
||||
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
|
||||
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
|
||||
|
||||
from . import ApduSource, PacketType, CardReset
|
||||
|
||||
ApduCommands = UiccApduCommands + UiccAdmApduCommands + UsimApduCommands + GpApduCommands
|
||||
|
||||
class StdinHexApduSource(ApduSource):
|
||||
"""ApduSource for reading apdu hex-strings from stdin."""
|
||||
|
||||
def read_packet(self) -> PacketType:
|
||||
while True:
|
||||
command = input("C-APDU >")
|
||||
response = '9000'
|
||||
return ApduCommands.parse_cmd_bytes(h2b(command) + h2b(response))
|
||||
raise StopIteration
|
||||
@@ -31,6 +31,7 @@ from pySim.exceptions import SwMatchError
|
||||
# CardModel is created, which will add the ATR-based matching and
|
||||
# calling of SysmocomSJA2.add_files. See CardModel.apply_matching_models
|
||||
import pySim.sysmocom_sja2
|
||||
#import pySim.sysmocom_euicc1
|
||||
|
||||
# we need to import these modules so that the various sub-classes of
|
||||
# CardProfile are created, which will be used in init_card() to iterate
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
mainly) ETSI TS 102 223, ETSI TS 101 220 and USIM Application Toolkit (SAT)
|
||||
as described in 3GPP TS 31.111."""
|
||||
|
||||
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
|
||||
# (C) 2021-2024 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
|
||||
@@ -21,6 +21,8 @@ import io
|
||||
import os
|
||||
from typing import Tuple, List, Optional, Dict, Union
|
||||
from collections import OrderedDict
|
||||
from difflib import SequenceMatcher, Match
|
||||
|
||||
import asn1tools
|
||||
import zipfile
|
||||
from pySim import javacard
|
||||
@@ -44,6 +46,29 @@ asn1 = compile_asn1_subdir('saip')
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class NonMatch(Match):
|
||||
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
|
||||
@classmethod
|
||||
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
|
||||
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
|
||||
The caller must ensure that the input list is ordered, non-overlapping and only contains
|
||||
matches at equal offsets in a and b."""
|
||||
res = []
|
||||
cur = 0
|
||||
for match in l:
|
||||
if match.a != match.b:
|
||||
return ValueError('only works for equal-offset matches')
|
||||
assert match.a >= cur
|
||||
nm_len = match.a - cur
|
||||
if nm_len > 0:
|
||||
# there's no point in generating zero-lenth non-matching sections
|
||||
res.append(cls(a=cur, b=cur, size=nm_len))
|
||||
cur = match.a + match.size
|
||||
if size > cur:
|
||||
res.append(cls(a=cur, b=cur, size=size-cur))
|
||||
|
||||
return res
|
||||
|
||||
class Naa:
|
||||
"""A class defining a Network Access Application (NAA)"""
|
||||
name = None
|
||||
@@ -406,12 +431,35 @@ class File:
|
||||
return ValueError("Unknown key '%s' in tuple list" % k)
|
||||
return stream.getvalue()
|
||||
|
||||
def file_content_to_tuples(self) -> List[Tuple]:
|
||||
# FIXME: simplistic approach. needs optimization. We should first check if the content
|
||||
# matches the expanded default value from the template. If it does, return empty list.
|
||||
# Next, we should compute the diff between the default value and self.body, and encode
|
||||
# that as a sequence of fillFileOffset and fillFileContent tuples.
|
||||
def file_content_to_tuples(self, optimize:bool = True) -> List[Tuple]:
|
||||
if not self.file_type in ['TR', 'LF', 'CY', 'BT']:
|
||||
return []
|
||||
if not optimize:
|
||||
# simplistic approach: encode the full file, ignoring the template/default
|
||||
return [('fillFileContent', self.body)]
|
||||
# Try to 'compress' the file body, based on the default file contents.
|
||||
if self.template:
|
||||
default = self.template.expand_default_value_pattern(length=len(self.body))
|
||||
if not default:
|
||||
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
||||
else:
|
||||
if default == self.body:
|
||||
# 100% match: return an empty tuple list to make eUICC use the default
|
||||
return []
|
||||
sm = SequenceMatcher(a=default, b=self.body)
|
||||
else:
|
||||
# no template at all: we can only remove padding
|
||||
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
||||
matching_blocks = sm.get_matching_blocks()
|
||||
# we can only make use of matches that have the same offset in 'a' and 'b'
|
||||
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
|
||||
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
|
||||
ret = []
|
||||
cur = 0
|
||||
for block in non_matching_blocks:
|
||||
ret.append(('fillFileOffset', block.a - cur))
|
||||
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
|
||||
return ret
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "File(%s)" % self.pe_name
|
||||
|
||||
15
pySim/ota.py
15
pySim/ota.py
@@ -302,7 +302,7 @@ class OtaAlgoAuthDES3(OtaAlgoAuth):
|
||||
class OtaAlgoCryptAES(OtaAlgoCrypt):
|
||||
name = 'AES'
|
||||
enum_name = 'aes_cbc'
|
||||
blocksize = 16 # TODO: is this needed?
|
||||
blocksize = 16
|
||||
def _encrypt(self, data:bytes) -> bytes:
|
||||
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
|
||||
return cipher.encrypt(data)
|
||||
@@ -356,20 +356,20 @@ class OtaDialectSms(OtaDialect):
|
||||
|
||||
# CHL + SPI (+ KIC + KID)
|
||||
part_head = self.hdr_construct.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
|
||||
#print("part_head: %s" % b2h(part_head))
|
||||
print("part_head: %s" % b2h(part_head))
|
||||
|
||||
# CNTR + PCNTR (CNTR not used)
|
||||
part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
|
||||
#print("part_cnt: %s" % b2h(part_cnt))
|
||||
print("part_cnt: %s" % b2h(part_cnt))
|
||||
|
||||
envelope_data = part_head + part_cnt + apdu
|
||||
#print("envelope_data: %s" % b2h(envelope_data))
|
||||
print("envelope_data: %s" % b2h(envelope_data))
|
||||
|
||||
# 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
|
||||
# CPL from and including CPI to end of secured data, including any padding for ciphering
|
||||
cpl = len(envelope_data) + len_sig
|
||||
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
|
||||
#print("envelope_data with cpl: %s" % b2h(envelope_data))
|
||||
print("envelope_data with cpl: %s" % b2h(envelope_data))
|
||||
|
||||
if spi['rc_cc_ds'] == 'cc':
|
||||
cc = otak.auth.sign(envelope_data)
|
||||
@@ -383,7 +383,7 @@ class OtaDialectSms(OtaDialect):
|
||||
else:
|
||||
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
|
||||
|
||||
#print("envelope_data with sig: %s" % b2h(envelope_data))
|
||||
print("envelope_data with sig: %s" % b2h(envelope_data))
|
||||
|
||||
# encrypt as needed
|
||||
if spi['ciphering']: # ciphering is requested
|
||||
@@ -395,7 +395,7 @@ class OtaDialectSms(OtaDialect):
|
||||
else:
|
||||
envelope_data = part_head + envelope_data
|
||||
|
||||
#print("envelope_data: %s" % b2h(envelope_data))
|
||||
print("envelope_data: %s" % b2h(envelope_data))
|
||||
|
||||
if len(envelope_data) > 140:
|
||||
raise ValueError('Cannot encode command in a single SMS; Fragmentation not implemented')
|
||||
@@ -492,6 +492,7 @@ class OtaDialectSms(OtaDialect):
|
||||
else:
|
||||
raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
|
||||
|
||||
print(res)
|
||||
# TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
|
||||
if res.response_status == 'por_ok' and len(res['secured_data']):
|
||||
dec = CompactRemoteResp.parse(res['secured_data'])
|
||||
|
||||
@@ -30,6 +30,13 @@ from smpp.pdu import pdu_types, operations
|
||||
|
||||
BytesOrHex = typing.Union[Hexstr, bytes]
|
||||
|
||||
# 07
|
||||
# 00 03 000201 # part 01 of 02 in reference 00
|
||||
# 70 00
|
||||
|
||||
# 05
|
||||
# 00 03 000202
|
||||
|
||||
class UserDataHeader:
|
||||
# a single IE in the user data header
|
||||
ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
|
||||
|
||||
@@ -162,6 +162,28 @@ class EF_SIM_AUTH_KEY(TransparentEF):
|
||||
'key'/Bytes(16),
|
||||
'op_opc' /Bytes(16))
|
||||
|
||||
class EF_HTTPS_CFG(TransparentEF):
|
||||
def __init__(self, fid='6f2a', name='EF.HTTPS_CFG'):
|
||||
super().__init__(fid, name=name, desc='HTTPS configuration')
|
||||
|
||||
class EF_HTTPS_KEYS(TransparentEF):
|
||||
KeyRecord = Struct('security_domain'/Int8ub,
|
||||
'key_type'/Enum(Int8ub, des=0x80, psk=0x85, aes=0x88),
|
||||
'key_version'/Int8ub,
|
||||
'key_id'/Int8ub,
|
||||
'key_length'/Int8ub,
|
||||
'key'/Bytes(this.key_length))
|
||||
def __init__(self, fid='6f2b', name='EF.HTTPS_KEYS'):
|
||||
super().__init__(fid, name=name, desc='HTTPS PSK and DEK keys')
|
||||
self._construct = GreedyRange(self.KeyRecord)
|
||||
|
||||
class EF_HTTPS_POLL(TransparentEF):
|
||||
TimeUnit = Enum(Int8ub, seconds=0, minutes=1, hours=2, days=3, ten_days=4)
|
||||
def __init__(self, fid='6f2c', name='EF.HTTPS_POLL'):
|
||||
super().__init__(fid, name=name, desc='HTTPS polling interval')
|
||||
self._construct = Struct(Const(b'\x82'), 'time_unit'/self.TimeUnit, 'value'/Int8ub,
|
||||
'adm_session_triggering_tlv'/GreedyBytes)
|
||||
|
||||
|
||||
class DF_SYSTEM(CardDF):
|
||||
def __init__(self):
|
||||
@@ -180,6 +202,9 @@ class DF_SYSTEM(CardDF):
|
||||
EF_0348_COUNT(),
|
||||
EF_GP_COUNT(),
|
||||
EF_GP_DIV_DATA(),
|
||||
EF_HTTPS_CFG(),
|
||||
EF_HTTPS_KEYS(),
|
||||
EF_HTTPS_POLL(),
|
||||
]
|
||||
self.add_files(files)
|
||||
|
||||
|
||||
@@ -333,11 +333,13 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
||||
from pySim.transport.pcsc import PcscSimLink
|
||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
from pySim.transport.calypso import CalypsoSimLink
|
||||
from pySim.transport.wsrc import WsrcSimLink
|
||||
|
||||
SerialSimLink.argparse_add_reader_args(arg_parser)
|
||||
PcscSimLink.argparse_add_reader_args(arg_parser)
|
||||
ModemATCommandLink.argparse_add_reader_args(arg_parser)
|
||||
CalypsoSimLink.argparse_add_reader_args(arg_parser)
|
||||
WsrcSimLink.argparse_add_reader_args(arg_parser)
|
||||
arg_parser.add_argument('--apdu-trace', action='store_true',
|
||||
help='Trace the command/response APDUs exchanged with the card')
|
||||
|
||||
@@ -360,6 +362,9 @@ def init_reader(opts, **kwargs) -> LinkBase:
|
||||
elif opts.modem_dev is not None:
|
||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
sl = ModemATCommandLink(opts, **kwargs)
|
||||
elif opts.wsrc_server_url is not None:
|
||||
from pySim.transport.wsrc import WsrcSimLink
|
||||
sl = WsrcSimLink(opts, **kwargs)
|
||||
else: # Serial reader is default
|
||||
print("No reader/driver specified; falling back to default (Serial reader)")
|
||||
from pySim.transport.serial import SerialSimLink
|
||||
|
||||
101
pySim/transport/wsrc.py
Normal file
101
pySim/transport/wsrc.py
Normal file
@@ -0,0 +1,101 @@
|
||||
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import argparse
|
||||
from typing import Optional
|
||||
|
||||
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
|
||||
|
||||
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
|
||||
from pySim.transport import LinkBase
|
||||
from pySim.utils import ResTuple
|
||||
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||
|
||||
class UserWsClientBlocking(WsClientBlocking):
|
||||
def perform_outbound_hello(self):
|
||||
hello_data = {
|
||||
'client_type': 'user',
|
||||
}
|
||||
super().perform_outbound_hello(hello_data)
|
||||
|
||||
def select_card(self, id_type:str, id_str:str):
|
||||
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
|
||||
'select_card_ack')
|
||||
return rx
|
||||
|
||||
def reset_card(self):
|
||||
self.transceive_json('reset_req', {}, 'reset_resp')
|
||||
|
||||
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||
return rx['response'], rx['sw']
|
||||
|
||||
|
||||
class WsrcSimLink(LinkBase):
|
||||
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
|
||||
name = 'WSRC'
|
||||
|
||||
def __init__(self, opts: argparse.Namespace, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.identities = {}
|
||||
self.server_url = opts.wsrc_server_url
|
||||
if opts.wsrc_eid:
|
||||
self.id_type = 'eid'
|
||||
self.id_str = opts.wsrc_eid
|
||||
elif opts.wsrc_iccid:
|
||||
self.id_type = 'iccid'
|
||||
self.id_str = opts.wsrc_iccid
|
||||
self.client = UserWsClientBlocking(self.server_url)
|
||||
self.client.connect()
|
||||
|
||||
def __del__(self):
|
||||
# FIXME: disconnect from server
|
||||
pass
|
||||
|
||||
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
|
||||
self.connect()
|
||||
|
||||
def connect(self):
|
||||
rx = self.client.select_card(self.id_type, self.id_str)
|
||||
self.identitites = rx['identities']
|
||||
|
||||
def get_atr(self) -> Hexstr:
|
||||
return self.identities['atr']
|
||||
|
||||
def disconnect(self):
|
||||
self.__delete__()
|
||||
|
||||
def _reset_card(self):
|
||||
self.client.reset_card()
|
||||
return 1
|
||||
|
||||
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
|
||||
return self.client.xceive_apdu_raw(pdu)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
|
||||
|
||||
@staticmethod
|
||||
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
||||
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
|
||||
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
|
||||
can be accessed via a network.""")
|
||||
wsrc_group.add_argument('--wsrc-server-url',
|
||||
help='URI of the WSRC server to connect to')
|
||||
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
|
||||
help='ICCID of the card to open via WSRC')
|
||||
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
|
||||
help='EID of the card to open via WSRC')
|
||||
0
pySim/wsrc/__init__.py
Normal file
0
pySim/wsrc/__init__.py
Normal file
65
pySim/wsrc/client_blocking.py
Normal file
65
pySim/wsrc/client_blocking.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||
perform commands on it."""
|
||||
|
||||
import abc
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
from websockets.sync.client import connect
|
||||
from osmocom.utils import b2h
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WsClientBlocking(abc.ABC):
|
||||
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
|
||||
|
||||
def __init__(self, ws_uri: str):
|
||||
self.ws_uri = ws_uri
|
||||
self.ws = None
|
||||
|
||||
def connect(self):
|
||||
self.ws = connect(uri=self.ws_uri)
|
||||
self.perform_outbound_hello()
|
||||
|
||||
def tx_json(self, msg_type: str, d: dict = {}):
|
||||
"""JSON-Encode and transmit a message to the given websocket."""
|
||||
d['msg_type'] = msg_type
|
||||
d_js = json.dumps(d)
|
||||
logger.debug("Tx: %s", d_js)
|
||||
self.ws.send(d_js)
|
||||
|
||||
def tx_error(self, message: str):
|
||||
event = {
|
||||
'message': message,
|
||||
}
|
||||
self.tx_json('error', event)
|
||||
|
||||
def rx_json(self):
|
||||
"""Receive a single message from the given websocket and JSON-decode it."""
|
||||
rx = self.ws.recv()
|
||||
rx_js = json.loads(rx)
|
||||
logger.debug("Rx: %s", rx_js)
|
||||
assert 'msg_type' in rx
|
||||
return rx_js
|
||||
|
||||
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
|
||||
self.tx_json(tx_msg_type, tx_d)
|
||||
rx = self.rx_json()
|
||||
assert rx['msg_type'] == rx_msg_type
|
||||
return rx
|
||||
|
||||
def perform_outbound_hello(self, tx: dict = {}):
|
||||
self.tx_json('hello', tx)
|
||||
rx = self.rx_json()
|
||||
assert rx['msg_type'] == 'hello_ack'
|
||||
return rx
|
||||
|
||||
def rx_and_execute_cmd(self):
|
||||
"""Receve and dispatch/execute a single command from the server."""
|
||||
rx = self.rx_json()
|
||||
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
|
||||
if handler:
|
||||
handler(rx)
|
||||
else:
|
||||
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
|
||||
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])
|
||||
@@ -15,3 +15,4 @@ git+https://github.com/osmocom/asn1tools
|
||||
packaging
|
||||
git+https://github.com/hologram-io/smpp.pdu
|
||||
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
|
||||
smpplib
|
||||
|
||||
142
smpp_ota_apdu2.py
Executable file
142
smpp_ota_apdu2.py
Executable file
@@ -0,0 +1,142 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
import sys
|
||||
from pprint import pprint as pp
|
||||
|
||||
from pySim.ota import OtaKeyset, OtaDialectSms
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
import smpplib.gsm
|
||||
import smpplib.client
|
||||
import smpplib.consts
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# if you want to know what's happening
|
||||
logging.basicConfig(level='DEBUG')
|
||||
|
||||
class Foo:
|
||||
def smpp_rx_handler(self, pdu):
|
||||
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
|
||||
if pdu.short_message:
|
||||
try:
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
|
||||
except ValueError:
|
||||
spi = self.spi.copy()
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
|
||||
pp(dec)
|
||||
return None
|
||||
|
||||
def __init__(self):
|
||||
# Two parts, UCS2, SMS with UDH
|
||||
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
|
||||
|
||||
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
|
||||
|
||||
# Print when obtain message_id
|
||||
client.set_message_sent_handler(
|
||||
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
|
||||
#client.set_message_received_handler(
|
||||
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
|
||||
client.set_message_received_handler(self.smpp_rx_handler)
|
||||
|
||||
client.connect()
|
||||
client.bind_transceiver(system_id='test', password='test')
|
||||
|
||||
self.client = client
|
||||
|
||||
if False:
|
||||
KIC1 = h2b('000102030405060708090a0b0c0d0e0f')
|
||||
KID1 = h2b('101112131415161718191a1b1c1d1e1f')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
|
||||
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
|
||||
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
|
||||
|
||||
if False:
|
||||
KIC1 = h2b('4BE2D58A1FA7233DD723B3C70996E6E6')
|
||||
KID1 = h2b('4a664208eba091d32c4ecbc299da1f34')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
|
||||
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
|
||||
#KIC3 = h2b('A4074D8E1FE69B484A7E62682ED09B51')
|
||||
#KID3 = h2b('41FF1033910112DB4EBEBB7807F939CD')
|
||||
#self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
|
||||
# algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
|
||||
#self.tar = h2b('B00011') # USIM RFM
|
||||
self.tar = h2b('000000') # RAM
|
||||
|
||||
if False: # sysmoEUICC1-C2G
|
||||
KIC1 = h2b('B52F9C5938D1C19ED73E1AE772937FD7')
|
||||
KID1 = h2b('3BC696ACD1EEC95A6624F7330D22FC81')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
|
||||
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
|
||||
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
|
||||
|
||||
if False: # TS.48 profile
|
||||
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
|
||||
KID1 = h2b('112233445566778899aabbccddeeff10')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
|
||||
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
|
||||
self.tar = h2b('000000') # ISD-P according to FIXME
|
||||
|
||||
if False: # TS.48 profile AES
|
||||
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
|
||||
KID1 = h2b('112233445566778899aabbccddeeff10')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=2, kid=KID1)
|
||||
self.tar = h2b('000000') # ISD-P according to FIXME
|
||||
|
||||
if True: # eSIM profile
|
||||
KIC1 = h2b('207c5d2c1aa80d58cd8f542fb9ef2f80')
|
||||
KID1 = h2b('312367f1681902fd67d9a71c62a840e3')
|
||||
#KIC1 = h2b('B6B0E130EEE1C9A4A29DAEC034D35C9E')
|
||||
#KID1 = h2b('889C51CD2A381A9D4EDDA9224C24A9AF')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
|
||||
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
|
||||
self.ota_keyset.cntr = 7
|
||||
self.tar = h2b('b00001') # ADF.USIM
|
||||
|
||||
self.ota_dialect = OtaDialectSms()
|
||||
self.spi = {'counter':'counter_must_be_higher', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
|
||||
|
||||
def tx_sms_tpdu(self, tpdu: bytes):
|
||||
self.client.send_message(
|
||||
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure it is a byte string, not unicode:
|
||||
source_addr='12',
|
||||
|
||||
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure thease two params are byte strings, not unicode:
|
||||
destination_addr='23',
|
||||
short_message=tpdu,
|
||||
|
||||
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
|
||||
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
|
||||
protocol_id=0x7f,
|
||||
#registered_delivery=True,
|
||||
)
|
||||
|
||||
def tx_c_apdu(self, apdu: bytes):
|
||||
logger.info("C-APDU: %s" % b2h(apdu))
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
# add user data header
|
||||
tpdu = b'\x02\x70\x00' + secured
|
||||
# send via SMPP
|
||||
self.tx_sms_tpdu(tpdu)
|
||||
|
||||
|
||||
f = Foo()
|
||||
print("initialized")
|
||||
#f.tx_c_apdu(h2b('80a40400023f00'))
|
||||
#f.tx_c_apdu(h2b('80EC010100'))
|
||||
#f.tx_c_apdu(h2b('80EC0101' + '0E' + '350103' + '390203e8' + '3e052101020304'))
|
||||
f.tx_c_apdu(h2b('00a40004026f07' + '00b0000009'))
|
||||
f.client.listen()
|
||||
@@ -90,5 +90,34 @@ class OidTest(unittest.TestCase):
|
||||
self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0'))
|
||||
self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1'))
|
||||
|
||||
class NonMatchTest(unittest.TestCase):
|
||||
def test_nonmatch(self):
|
||||
# non-matches before, in between and after matches
|
||||
match_list = [Match(a=10, b=10, size=5), Match(a=20, b=20, size=4)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 26)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=10), NonMatch(a=15, b=15, size=5),
|
||||
NonMatch(a=24, b=24, size=2)])
|
||||
|
||||
def test_nonmatch_beg(self):
|
||||
# single match at beginning
|
||||
match_list = [Match(a=0, b=0, size=5)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 20)
|
||||
self.assertEqual(nm_list, [NonMatch(a=5, b=5, size=15)])
|
||||
|
||||
def test_nonmatch_end(self):
|
||||
# single match at end
|
||||
match_list = [Match(a=19, b=19, size=5)]
|
||||
nm_list = NonMatch.from_matchlist(match_list, 24)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=19)])
|
||||
|
||||
def test_nonmatch_none(self):
|
||||
# no match at all
|
||||
match_list = []
|
||||
nm_list = NonMatch.from_matchlist(match_list, 24)
|
||||
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=24)])
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -16,6 +16,16 @@ SPI_CC_POR_CIPHERED_CC = {
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_CTR_POR_UNCIPHERED_CC = {
|
||||
'counter':'counter_must_be_higher',
|
||||
'ciphering':True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'cc',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_POR_UNCIPHERED_CC = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':True,
|
||||
@@ -36,6 +46,17 @@ SPI_CC_POR_UNCIPHERED_NOCC = {
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
SPI_CC_UNCIPHERED = {
|
||||
'counter':'no_counter',
|
||||
'ciphering':False,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False,
|
||||
'por_rc_cc_ds': 'no_rc_cc_ds',
|
||||
'por': 'por_required'
|
||||
}
|
||||
|
||||
|
||||
######################################################################
|
||||
# old-style code-driven test (lots of code copy+paste)
|
||||
######################################################################
|
||||
@@ -74,6 +95,48 @@ class Test_SMS_AES128(unittest.TestCase):
|
||||
self.assertEqual(b2h(dec_tar), b2h(self.tar))
|
||||
self.assertEqual(dec_spi, spi)
|
||||
|
||||
def test_open_channel(self):
|
||||
spi = self.spi_base
|
||||
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('000102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('101112131415161718191a1b1c1d1e1f'))
|
||||
tpdu = h2b('40048111227ff6407070611535007e070003000201700000781516011212000001ccda206e8b0d46304247bf00bfdc9853eed2a826f9af8dc7c2974ce2cb9bb55cc1a8577e047cc8f5d450380ba86b25354fe69f58884f671d7ace0c911f7c74830dc1d58b62cce4934568697ba1f577eecbca26c5dbfa32b0e2f0877948a9fb46a122e4214947386f467de11c')
|
||||
#'40048111227ff6407070611535000a0500030002027b6abed3'
|
||||
submit = SMS_DELIVER.from_bytes(tpdu)
|
||||
submit.tp_udhi = True
|
||||
print(submit)
|
||||
#print("UD: %s" % b2h(submit.tp_ud))
|
||||
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
|
||||
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
|
||||
print("UDHD: %s" % udhd)
|
||||
print("DATA: %s" % b2h(data))
|
||||
tpdu2 = h2b('40048111227ff6407070611535000a0500030002027b6abed3')
|
||||
submit2 = SMS_DELIVER.from_bytes(tpdu2)
|
||||
print(submit2)
|
||||
udhd2, data2 = UserDataHeader.from_bytes(submit2.tp_ud)
|
||||
print("UDHD: %s" % udhd2)
|
||||
print("DATA: %s" % b2h(data2))
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data + data2)
|
||||
print(b2h(dec_tar), b2h(dec_apdu))
|
||||
|
||||
def test_open_channel2(self):
|
||||
spi = self.spi_base
|
||||
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('000102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('101112131415161718191a1b1c1d1e1f'))
|
||||
tpdu = h2b('40048111227ff6407070611535004d02700000481516011212000001fe4c0943aea42e45021c078ae06c66afc09303608874b72f58bacadb0dcf665c29349c799fbb522e61709c9baf1890015e8e8e196e36153106c8b92f95153774')
|
||||
submit = SMS_DELIVER.from_bytes(tpdu)
|
||||
submit.tp_udhi = True
|
||||
print(submit)
|
||||
#print("UD: %s" % b2h(submit.tp_ud))
|
||||
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
|
||||
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
|
||||
print("UDHD: %s" % udhd)
|
||||
print("DATA: %s" % b2h(data))
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data)
|
||||
print(b2h(dec_tar), b2h(dec_apdu))
|
||||
|
||||
class Test_SMS_3DES(unittest.TestCase):
|
||||
tar = h2b('b00000')
|
||||
@@ -178,6 +241,13 @@ OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
|
||||
kic=h2b('200102030405060708090a0b0c0d0e0f'),
|
||||
kid=h2b('201102030405060708090a0b0c0d0e0f'))
|
||||
|
||||
OTA_KEYSET_eSIM_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
|
||||
algo_auth='aes_cmac', kid_idx=1,
|
||||
kic=h2b('207c5d2c1aa80d58cd8f542fb9ef2f80'),
|
||||
kid=h2b('312367f1681902fd67d9a71c62a840e3'))
|
||||
OTA_KEYSET_eSIM_AES128.cntr = 2
|
||||
|
||||
|
||||
class OtaTestCase(unittest.TestCase):
|
||||
def __init__(self, methodName='runTest', **kwargs):
|
||||
super().__init__(methodName, **kwargs)
|
||||
@@ -185,6 +255,7 @@ class OtaTestCase(unittest.TestCase):
|
||||
# SIM RFM: B00010
|
||||
# USIM RFM: B00011
|
||||
self.tar = h2b('B00011')
|
||||
self.tar = h2b('B00000')
|
||||
|
||||
class SmsOtaTestCase(OtaTestCase):
|
||||
# Array describing the input/output data for the tests. We use the
|
||||
@@ -193,70 +264,102 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
# manually writing one class per test.
|
||||
testdatasets = [
|
||||
{
|
||||
'name': '3DES-SJA5-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
# 'name': '3DES-SJA5-CIPHERED-CC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
# 'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
# 'name': '3DES-SJA5-UNCIPHERED-CC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
# 'spi': SPI_CC_POR_UNCIPHERED_CC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
# 'name': '3DES-SJA5-UNCIPHERED-NOCC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
# 'spi': SPI_CC_POR_UNCIPHERED_NOCC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100000e0ab0001100000000000000016132',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
# 'name': 'AES128-SJA5-CIPHERED-CC',
|
||||
# 'ota_keyset': OTA_KEYSET_SJA5_AES128,
|
||||
# 'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
# 'request': {
|
||||
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
# 'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
# },
|
||||
# 'response': {
|
||||
# 'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
|
||||
# 'response_status': 'por_ok',
|
||||
# 'number_of_commands': 1,
|
||||
# 'last_status_word': '6132',
|
||||
# 'last_response_data': '',
|
||||
# }
|
||||
# }, {
|
||||
'name': 'AES128-eSIM-UNCIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_eSIM_AES128,
|
||||
'spi': SPI_CC_UNCIPHERED,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
|
||||
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
|
||||
'encoded_cmd': '1502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
|
||||
'encoded_tpdu': '40048111227FF6407070611535002702700000221502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
'encoded_resp': '027100000B0AB000000000000000000A9000',
|
||||
'response_status': 'insufficient_security_level',
|
||||
}
|
||||
}, {
|
||||
'name': '3DES-SJA5-UNCIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_CC,
|
||||
'name': 'AES128-eSIM-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_eSIM_AES128,
|
||||
'spi': SPI_CC_CTR_POR_UNCIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
|
||||
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
|
||||
'encoded_cmd': '00281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
|
||||
'encoded_tpdu': '40048111227FF6407070611535002D02700000281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
}
|
||||
}, {
|
||||
'name': '3DES-SJA5-UNCIPHERED-NOCC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
|
||||
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100000e0ab0001100000000000000016132',
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
}
|
||||
}, {
|
||||
'name': 'AES128-SJA5-CIPHERED-CC',
|
||||
'ota_keyset': OTA_KEYSET_SJA5_AES128,
|
||||
'spi': SPI_CC_POR_CIPHERED_CC,
|
||||
'request': {
|
||||
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
|
||||
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
|
||||
},
|
||||
'response': {
|
||||
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
|
||||
'encoded_resp': '027100001C12B00000000000000200007DB7086951F7DB55AB0780010123026D009000',
|
||||
#'encoded_resp': '027100000b0ab0000100000000000006'
|
||||
#'encoded_resp': '027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c'
|
||||
'response_status': 'por_ok',
|
||||
'number_of_commands': 1,
|
||||
'last_status_word': '6132',
|
||||
'last_response_data': '',
|
||||
}
|
||||
},
|
||||
|
||||
# TODO: AES192
|
||||
# TODO: AES256
|
||||
]
|
||||
@@ -272,7 +375,7 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
kset = t['ota_keyset']
|
||||
outp = self.dialect.encode_cmd(kset, self.tar, t['spi'], apdu=t['request']['apdu'])
|
||||
#print("result: %s" % b2h(outp))
|
||||
self.assertEqual(b2h(outp), t['request']['encoded_cmd'])
|
||||
self.assertEqual(b2h(outp), t['request']['encoded_cmd'].lower())
|
||||
|
||||
with_udh = b'\x02\x70\x00' + outp
|
||||
#print("with_udh: %s" % b2h(with_udh))
|
||||
@@ -281,7 +384,7 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
|
||||
#print("TPDU: %s" % tpdu)
|
||||
#print("tpdu: %s" % b2h(tpdu.to_bytes()))
|
||||
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'])
|
||||
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'].lower())
|
||||
|
||||
# also test decoder
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(kset, outp)
|
||||
@@ -296,8 +399,11 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
r, d = self.dialect.decode_resp(kset, t['spi'], t['response']['encoded_resp'])
|
||||
#print("RESP: %s / %s" % (r, d))
|
||||
self.assertEqual(r.response_status, t['response']['response_status'])
|
||||
if 'number_of_commands' in t['response']:
|
||||
self.assertEqual(d.number_of_commands, t['response']['number_of_commands'])
|
||||
if 'last_status_word' in t['response']:
|
||||
self.assertEqual(d.last_status_word, t['response']['last_status_word'])
|
||||
if 'last_response_data' in t['response']:
|
||||
self.assertEqual(d.last_response_data, t['response']['last_response_data'])
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
301
vpcd2smpp.py
Executable file
301
vpcd2smpp.py
Executable file
@@ -0,0 +1,301 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# This program receive APDUs via the VPCD protocol of Frank Morgner's
|
||||
# virtualsmartcard, encrypts them with OTA (over the air) keys and
|
||||
# forwards them via SMPP to a SMSC (SMS service centre).
|
||||
#
|
||||
# In other words, you can use it as a poor man's OTA server, to enable
|
||||
# you to use unmodified application software with PC/SC support to talk
|
||||
# securely via OTA with a remote SMS card.
|
||||
#
|
||||
# This is very much a work in progress at this point.
|
||||
|
||||
#######################################################################
|
||||
# twisted VPCD Library
|
||||
#######################################################################
|
||||
|
||||
import logging
|
||||
import struct
|
||||
import abc
|
||||
from typing import Union, Optional
|
||||
from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild
|
||||
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class VirtualCard(abc.ABC):
|
||||
"""Abstract base class for a virtual smart card."""
|
||||
def __init__(self, atr: Union[str, bytes]):
|
||||
if isinstance(atr, str):
|
||||
atr = h2b(atr)
|
||||
self.atr = atr
|
||||
|
||||
@abc.abstractmethod
|
||||
def power_change(self, new_state: bool):
|
||||
"""Power the card on or off."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def reset(self):
|
||||
"""Reset the card."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def rx_c_apdu(self, apdu: bytes):
|
||||
"""Receive a C-APDU from the reader/application."""
|
||||
pass
|
||||
|
||||
def tx_r_apdu(self, apdu: Union[str, bytes]):
|
||||
if isinstance(apdu, str):
|
||||
apdu = h2b(apdu)
|
||||
logger.info("R-APDU: %s" % b2h(apdu))
|
||||
self.protocol.send_data(apdu)
|
||||
|
||||
class VpcdProtocolBase(Protocol):
|
||||
# Prefixed couldn't be used as the this.length wouldn't be available in this case
|
||||
construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)),
|
||||
'data'/If(this.length > 1, Bytes(this.length)),
|
||||
'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4)))
|
||||
def __init__(self, vcard: VirtualCard):
|
||||
self.recvBuffer = b''
|
||||
self.connectionCorrupted = False
|
||||
self.pduReadTimer = None
|
||||
self.pduReadTimerSecs = 10
|
||||
self.callLater = reactor.callLater
|
||||
self.on = False
|
||||
self.vcard = vcard
|
||||
self.vcard.protocol = self
|
||||
|
||||
def dataReceived(self, data: bytes):
|
||||
"""entry point where twisted tells us data was received."""
|
||||
#logger.debug('Data received: %s' % b2h(data))
|
||||
self.recvBuffer = self.recvBuffer + data
|
||||
while True:
|
||||
if self.connectionCorrupted:
|
||||
return
|
||||
msg = self.readMessage()
|
||||
if msg is None:
|
||||
break
|
||||
self.endPDURead()
|
||||
self.rawMessageReceived(msg)
|
||||
|
||||
if len(self.recvBuffer) > 0:
|
||||
self.incompletePDURead()
|
||||
|
||||
def incompletePDURead(self):
|
||||
"""We have an incomplete PDU in readBuffer, schedule pduReadTimer"""
|
||||
if self.pduReadTimer and self.pduReadTimer.active():
|
||||
return
|
||||
self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout)
|
||||
|
||||
def endPDURead(self):
|
||||
"""We completed reading a PDU, cancel the pduReadTimer."""
|
||||
if self.pduReadTimer and self.pduReadTimer.active():
|
||||
self.pduReadTimer.cancel()
|
||||
|
||||
def readMessage(self) -> Optional[bytes]:
|
||||
"""read an entire [raw] message."""
|
||||
pduLen = self._getMessageLength()
|
||||
if pduLen is None:
|
||||
return None
|
||||
return self._getMessage(pduLen)
|
||||
|
||||
def _getMessageLength(self) -> Optional[int]:
|
||||
if len(self.recvBuffer) < 2:
|
||||
return None
|
||||
return struct.unpack('!H', self.recvBuffer[:2])[0]
|
||||
|
||||
def _getMessage(self, pduLen: int) -> Optional[bytes]:
|
||||
if len(self.recvBuffer) < pduLen+2:
|
||||
return None
|
||||
|
||||
message = self.recvBuffer[:pduLen+2]
|
||||
self.recvBuffer = self.recvBuffer[pduLen+2:]
|
||||
return message
|
||||
|
||||
def onPDUReadTimeout(self):
|
||||
logger.error('PDU read timed out. Buffer is now considered corrupt')
|
||||
#self.coruptDataReceived
|
||||
|
||||
def rawMessageReceived(self, message: bytes):
|
||||
"""Called once a complete binary vpcd message has been received."""
|
||||
pdu = None
|
||||
try:
|
||||
pdu = VpcdProtocolBase.construct.parse(message)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.critical('Received corrupt PDU %s' % b2h(message))
|
||||
#self.corupDataRecvd()
|
||||
else:
|
||||
self.PDUReceived(pdu)
|
||||
|
||||
def PDUReceived(self, pdu):
|
||||
logger.debug("Rx PDU: %s" % pdu)
|
||||
if pdu['data']:
|
||||
return self.on_rx_data(pdu)
|
||||
else:
|
||||
method = getattr(self, 'on_rx_' + pdu['ctrl'])
|
||||
return method(pdu)
|
||||
|
||||
def on_rx_atr(self, pdu):
|
||||
self.send_data(self.vcard.atr)
|
||||
|
||||
def on_rx_on(self, pdu):
|
||||
if self.on:
|
||||
return
|
||||
else:
|
||||
self.on = True
|
||||
self.vcard.power_change(self.on)
|
||||
|
||||
def on_rx_reset(self, pdu):
|
||||
self.vcard.reset()
|
||||
|
||||
def on_rx_off(self, pdu):
|
||||
if not self.on:
|
||||
return
|
||||
else:
|
||||
self.on = False
|
||||
self.vcard.power_change(self.on)
|
||||
|
||||
def on_rx_data(self, pdu):
|
||||
self.vcard.rx_c_apdu(pdu['data'])
|
||||
|
||||
def send_pdu(self, pdu):
|
||||
logger.debug("Sending PDU: %s" % pdu)
|
||||
encoded = VpcdProtocolBase.construct.build(pdu)
|
||||
#logger.debug("Sending binary: %s" % b2h(encoded))
|
||||
self.transport.write(encoded)
|
||||
|
||||
def send_data(self, data: Union[str, bytes]):
|
||||
if isinstance(data, str):
|
||||
data = h2b(data)
|
||||
return self.send_pdu({'length': 0, 'ctrl': '', 'data': data})
|
||||
|
||||
def send_ctrl(self, ctrl: str):
|
||||
return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''})
|
||||
|
||||
|
||||
class VpcdProtocolClient(VpcdProtocolBase):
|
||||
pass
|
||||
|
||||
|
||||
class VpcdClientFactory(ReconnectingClientFactory):
|
||||
def __init__(self, vcard_class: VirtualCard):
|
||||
self.vcard_class = vcard_class
|
||||
|
||||
def startedConnecting(self, connector):
|
||||
logger.debug('Started to connect')
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
logger.info('Connection established to %s' % addr)
|
||||
self.resetDelay()
|
||||
return VpcdProtocolClient(vcard = self.vcard_class())
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
logger.warning('Connection lost (reason: %s)' % reason)
|
||||
super().clientConnectionLost(connector, reason)
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
logger.warning('Connection failed (reason: %s)' % reason)
|
||||
super().clientConnectionFailed(connector, reason)
|
||||
|
||||
#######################################################################
|
||||
# Application
|
||||
#######################################################################
|
||||
|
||||
from pprint import pprint as pp
|
||||
|
||||
from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator
|
||||
from twisted.internet import reactor
|
||||
|
||||
from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService
|
||||
from smpp.twisted.protocol import SMPPClientProtocol
|
||||
from smpp.twisted.config import SMPPClientConfig
|
||||
from smpp.pdu.operations import SubmitSM, DeliverSM
|
||||
from smpp.pdu import pdu_types
|
||||
|
||||
from pySim.ota import OtaKeyset, OtaDialectSms
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
|
||||
class MyVcard(VirtualCard):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs)
|
||||
self.smpp_client = None
|
||||
# KIC1 + KID1 of 8988211000000467285
|
||||
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
|
||||
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
|
||||
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
|
||||
KID3 = h2b('12110C78E678C25408233076AA033615')
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
|
||||
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
|
||||
self.ota_dialect = OtaDialectSms()
|
||||
self.tar = h2b('B00011')
|
||||
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
|
||||
def ensure_smpp(self):
|
||||
config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test')
|
||||
if self.smpp_client:
|
||||
return
|
||||
self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp)
|
||||
smpp = self.smpp_client.connectAndBind()
|
||||
#self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp)
|
||||
#d = self.smpp.connectTCP(config.host, config.port)
|
||||
#d = self.smpp.connectAndBind()
|
||||
#d.addCallback(self.forwardToClient, self.smpp)
|
||||
|
||||
def power_change(self, new_state: bool):
|
||||
if new_state:
|
||||
logger.info("POWER ON")
|
||||
self.ensure_smpp()
|
||||
else:
|
||||
logger.info("POWER OFF")
|
||||
|
||||
def reset(self):
|
||||
logger.info("RESET")
|
||||
|
||||
def rx_c_apdu(self, apdu: bytes):
|
||||
pp(self.smpp_client.smpp)
|
||||
logger.info("C-APDU: %s" % b2h(apdu))
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
# add user data header
|
||||
tpdu = b'\x02\x70\x00' + secured
|
||||
# send via SMPP
|
||||
self.tx_sms_tpdu(tpdu)
|
||||
#self.tx_r_apdu('9000')
|
||||
|
||||
def tx_sms_tpdu(self, tpdu: bytes):
|
||||
"""Send a SMS TPDU via SMPP SubmitSM."""
|
||||
dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
|
||||
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
|
||||
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
|
||||
gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
|
||||
submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class,
|
||||
protocol_id=0x7f, short_message=tpdu)
|
||||
self.smpp_client.smpp.sendDataRequest(submit)
|
||||
|
||||
def handleSmpp(self, smpp, pdu):
|
||||
#logger.info("Received SMPP %s" % pdu)
|
||||
data = pdu.params['short_message']
|
||||
#logger.info("Received SMS Data %s" % b2h(data))
|
||||
r, d = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data)
|
||||
logger.info("Decoded SMPP %s" % r)
|
||||
self.tx_r_apdu(r['last_response_data'] + r['last_status_word'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
import colorlog
|
||||
log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
|
||||
colorlog.basicConfig(level=logging.INFO, format = log_format)
|
||||
logger = colorlog.getLogger()
|
||||
|
||||
from twisted.internet import reactor
|
||||
host = 'localhost'
|
||||
port = 35963
|
||||
reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard))
|
||||
reactor.run()
|
||||
Reference in New Issue
Block a user