21 Commits

Author SHA1 Message Date
Harald Welte
b767a78935 Fixup smpp_ota_apdu2.py
Change-Id: I992f70385b716fd1c1df2d245acfeb5d172e3ff9
2025-05-07 19:35:54 +02:00
Harald Welte
89ff53922e [UNTESTED] sysmocom_sja2: Support files related to OTA HTTPS features
Change-Id: I5710b82c2eea6b6bf5b38882b2a1ec7d60a725d8
2025-05-07 19:35:54 +02:00
Harald Welte
9e9db415b9 WIP: initial step towards websocket-based remote card [reader] access
Change-Id: I588bf4b3d9891766dd688a6818057ca20fb26e3f
2025-05-07 19:35:54 +02:00
Harald Welte
3203f7d0ff fsdump2saip: treat maxFileSize for BER-TLV
Change-Id: If699f94998bf3fba47231c19b794b0f473dfabf5
2025-05-07 19:35:54 +02:00
Harald Welte
bc23a2cc7b WIP: classic SIM (3GPP TS 51.011) support.
Change-Id: I1cbbbabd22a67048f3ee9330c12f72c34152ce45
2025-05-07 19:35:54 +02:00
Harald Welte
a5dd041f4c WIP: contrib/fsdump2saip
Change-Id: I8fc23b4177f229170a6ee99eca8863518196fa51
2025-05-07 19:35:54 +02:00
Harald Welte
8e05d83913 pySim.apdu_source.stdin_hex
Change-Id: I5aacf13b7c27cea9efd42f01dacca61068c3aa33
2025-05-07 19:35:54 +02:00
Harald Welte
4f73968bde pySim.esim.saip: Don't try to generate file contents for MF/DF/ADF
only EFs have data content

Change-Id: I02a54a3b2f73a0e9118db87f8b514d1dbf53971f
2025-05-07 19:35:54 +02:00
Harald Welte
f61196ace8 pySim.esim.saip: Implement optimized file content encoding
Make sure we make use of the fill pattern when encoding file contents:
Only encode the differences to the fill pattern of the file, in order
to reduce the profile download size.

Change-Id: I61e4a5e04beba5c9092979fc546292d5ef3d7aad
2025-05-07 19:35:54 +02:00
Harald Welte
286d96c8ad PRIVATE WIP
Change-Id: Id2d5505ecd3fac9daa91d58ba2c08d4d48fbbdd2
2025-05-07 19:35:54 +02:00
Harald Welte
165b145d48 WIP OTA testing
Change-Id: I96d4556fb9b1978cc2a1bfdcfbab9a13284423e4
2025-05-07 19:35:54 +02:00
Harald Welte
e707a2fe0b WIP
Change-Id: If7a0d6fb5e0a823530a1e24062712615fc414ce5
2025-05-07 19:35:54 +02:00
Harald Welte
dd2dc510d3 tests/test_ota: case-independent compare; partial responses
* use case-insensitive string compares for hexdumps
* not all responses contain all fields; make their verification optional

Change-Id: Ied6101236836450a0f14f0b42372c758378d17e3
2025-05-07 19:35:54 +02:00
Harald Welte
b4a5776815 PRIVATE: different key material
Change-Id: Id041aae86b9984928e39dd4eb4d25badc85973a5
2025-05-07 19:35:54 +02:00
Harald Welte
227da5935f ota_apdu2: add example keys of C2T
Change-Id: I3aa6efff615bf6a089eb4c0727069c1fdc5883f0
2025-05-07 19:35:54 +02:00
Harald Welte
3fae277dfa HACK
Change-Id: Iac752c8ef6cbc085c2f190be1e07a660fe3cd18a
2025-05-07 19:35:54 +02:00
Harald Welte
d45123f6ac WIP
Change-Id: Id3ae902ffb0a25fb69e2ebb469f925b7482f8d26
2025-05-07 19:35:54 +02:00
Harald Welte
ad9c01f1c1 ota_test
Change-Id: I9d3f9b16dc885f4e2b864a976d6bc09b3c17b2ee
2025-05-07 19:35:54 +02:00
Harald Welte
d2f1ee4c0e WIP: vpcd2smpp.py
Change-Id: I501f2fea075706df379a4bd65a7c6bc19f48277f
2025-05-07 19:35:54 +02:00
Harald Welte
82c6575a77 smpp_ota_apdu2: Re-try decoding PoR without crypto if it fails with
If something prevents the response from being ciphered, we need to fall
back to plaintext.

Change-Id: I748c5690029c28a0fcf39a2d99bc6c03fcb33dbd
2025-05-07 19:35:54 +02:00
Harald Welte
dc5fdfca39 hack: smpp_ota_apdu2.py
Test program...

Change-Id: Idf8326298cab7ebc68b09c7e829bfc2061222f51
2025-05-07 19:35:54 +02:00
24 changed files with 2018 additions and 70 deletions

97
contrib/card2server.py Executable file
View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""Connect smartcard to a remote server, so the remote server can take control and
perform commands on it."""
import sys
import json
import logging
import argparse
from osmocom.utils import b2h
import websockets
from pySim.transport import init_reader, argparse_add_reader_args, LinkBase
from pySim.commands import SimCardCommands
from pySim.wsrc.client_blocking import WsClientBlocking
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
class CardWsClientBlocking(WsClientBlocking):
"""Implementation of the card (reader) client of the WSRC (WebSocket Remote Card) protocol"""
def __init__(self, ws_uri, tp: LinkBase):
super().__init__(ws_uri)
self.tp = tp
def perform_outbound_hello(self):
hello_data = {
'client_type': 'card',
'atr': b2h(self.tp.get_atr()),
# TODO: include various card information in the HELLO message
}
super().perform_outbound_hello(hello_data)
def handle_rx_c_apdu(self, rx: dict):
"""handle an inbound APDU transceive command"""
data, sw = self.tp.send_apdu(rx['command'])
tx = {
'response': data,
'sw': sw,
}
self.tx_json('r_apdu', tx)
def handle_rx_disconnect(self, rx: dict):
"""server tells us to disconnect"""
self.tx_json('disconnect_ack')
# FIXME: tear down connection and/or terminate entire program
def handle_rx_print(self, rx: dict):
"""print a message (text) given by server to the local console/log"""
print(rx['message'])
# no response
def handle_rx_reset_req(self, rx: dict):
"""server tells us to reset the card"""
self.tp.reset()
self.tx_json('reset_resp', {'atr': b2h(self.tp.get_atr())})
parser = argparse.ArgumentParser()
argparse_add_reader_args(parser)
parser.add_argument("--uri", default="ws://localhost:8765/",
help="URI of the sever to which to connect")
opts = parser.parse_args()
# open the card reader / slot
logger.info("Initializing Card Reader...")
tp = init_reader(opts)
logger.info("Connecting to Card...")
tp.connect()
scc = SimCardCommands(transport=tp)
logger.info("Detected Card with ATR: %s" % b2h(tp.get_atr()))
# TODO: gather various information about the card; print it
# create + connect the client to the server
cl = CardWsClientBlocking(opts.uri, tp)
logger.info("Connecting to remote server...")
try:
cl.connect()
print("Successfully connected to Server")
except ConnectionRefusedError as e:
print(e)
sys.exit(1)
try:
while True:
# endless loop: wait for inbound command from server + execute it
cl.rx_and_execute_cmd()
# TODO: clean handling of websocket disconnect
except websockets.exceptions.ConnectionClosedOK as e:
print(e)
sys.exit(1)
except KeyboardInterrupt as e:
print(e.__class__.__name__)
sys.exit(2)

241
contrib/card_server.py Executable file
View File

@@ -0,0 +1,241 @@
#!/usr/bin/env python
import json
import asyncio
import logging
from typing import Optional, Tuple
from websockets.asyncio.server import serve
from websockets.exceptions import ConnectionClosedError
from osmocom.utils import Hexstr, swap_nibbles
from pySim.utils import SwMatchstr, ResTuple, sw_match, dec_iccid
from pySim.exceptions import SwMatchError
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
card_clients = set()
user_clients = set()
class WsClient:
def __init__(self, websocket, hello: dict):
self.websocket = websocket
self.hello = hello
self.identity = {}
def __str__(self):
return '%s(ws=%s)' % (self.__class__.__name__, self.websocket)
async def rx_json(self):
rx = await self.websocket.recv()
rx_js = json.loads(rx)
logger.debug("Rx: %s", rx_js)
assert 'msg_type' in rx
return rx_js
async def tx_json(self, msg_type:str, d: dict = {}):
"""Transmit a json-serializable dict to the peer"""
d['msg_type'] = msg_type
d_js = json.dumps(d)
logger.debug("Tx: %s", d_js)
await self.websocket.send(d_js)
async def tx_hello_ack(self):
await self.tx_json('hello_ack')
async def xceive_json(self, msg_type:str, d:dict = {}, exp_msg_type:Optional[str] = None) -> dict:
await self.tx_json(msg_type, d)
rx = await self.rx_json()
if exp_msg_type:
assert rx['msg_type'] == exp_msg_type
return rx;
async def tx_error(self, message: str):
"""Transmit an error message to the peer"""
event = {
"message": message,
}
await self.tx_json('error', event)
async def ws_hdlr(self):
"""kind of a 'main' function for the websocket client: wait for incoming message,
and handle it."""
try:
async for message in self.websocket:
method = getattr(self, 'handle_rx_%s' % message['msg_type'], None)
if not method:
await self.tx_error("Unknonw msg_type: %s" % message['msg_type'])
else:
method(message)
except ConnectionClosedError:
# we handle this in the outer loop
pass
class CardClient(WsClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.state = 'init'
def __str__(self):
eid = self.identity.get('EID', None)
if eid:
return '%s(EID=%s)' % (self.__class__.__name__, eid)
iccid = self.identity.get('ICCID', None)
if iccid:
return '%s(ICCID=%s)' % (self.__class__.__name__, iccid)
return super().__str__()
"""A websocket client that represents a reader/card. This is what we use to talk to a card"""
async def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
"""transceive a single APDU with the card"""
message = await self.xceive_json('c_apdu', {'command': cmd}, 'r_apdu')
return message['response'], message['sw']
async def xceive_apdu(self, pdu: Hexstr) -> ResTuple:
"""transceive an APDU with the card, handling T=0 GET_RESPONSE cases"""
prev_pdu = pdu
data, sw = await self.xceive_apdu_raw(pdu)
if sw is not None:
while (sw[0:2] in ['9f', '61', '62', '63']):
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
pdu_gr = pdu[0:2] + 'c00000' + sw[2:4]
prev_pdu = pdu_gr
d, sw = await self.xceive_apdu_raw(pdu_gr)
data += d
if sw[0:2] == '6c':
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
pdu_gr = prev_pdu[0:8] + sw[2:4]
data, sw = await self.xceive_apdu_raw(pdu_gr)
return data, sw
async def xceive_apdu_checksw(self, pdu: Hexstr, sw: SwMatchstr = "9000") -> ResTuple:
"""like xceive_apdu, but checking the status word matches the expected pattern"""
rv = await self.xceive_apdu(pdu)
last_sw = rv[1]
if not sw_match(rv[1], sw):
raise SwMatchError(rv[1], sw.lower())
return rv
async def card_reset(self):
"""reset the card"""
rx = await self.xceive_json('reset_req', exp_msg_type='reset_resp')
async def get_iccid(self):
"""high-level method to obtain the ICCID of the card"""
await self.xceive_apdu_checksw('00a40000023f00') # SELECT MF
await self.xceive_apdu_checksw('00a40000022fe2') # SELECT EF.ICCID
res, sw = await self.xceive_apdu_checksw('00b0000000') # READ BINARY
return dec_iccid(res)
async def get_eid_sgp22(self):
"""high-level method to obtain the EID of a SGP.22 consumer eUICC"""
await self.xceive_apdu_checksw('00a4040410a0000005591010ffffffff8900000100')
res, sw = await self.xceive_apdu_checksw('80e2910006bf3e035c015a')
return res[-32:]
async def identify(self):
# identify the card by asking for its EID and/or ICCID
try:
eid = await self.get_eid_sgp22()
logger.debug("EID: %s", eid)
self.identity['EID'] = eid
except SwMatchError:
pass
try:
iccid = await self.get_iccid()
logger.debug("ICCID: %s", iccid)
self.identity['ICCID'] = iccid
except SwMatchError:
pass
logger.info("Card now in READY state")
self.state = 'ready'
@staticmethod
def find_client_for_id(id_type: str, id_str: str) -> Optional['CardClient']:
for c in card_clients:
print("testing card %s in state %s" % (c, c.state))
if c.state != 'ready':
continue
c_id = c.identity.get(id_type.upper(), None)
if c_id and c_id.lower() == id_str.lower():
return c
return None
class UserClient(WsClient):
"""A websocket client representing a user application like pySim-shell."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.state = 'init'
async def state_init(self):
"""Wait for incoming 'select_card' and process it."""
while True:
rx = await self.rx_json()
if rx['msg_type'] == 'select_card':
# look-up if the card can be found
card = CardClient.find_client_for_id(rx['id_type'], rx['id_str'])
if not card:
await self.tx_error('No CardClient found for %s == %s' % (rx['id_type'], rx['id_str']))
continue
# transition to next statee
self.state = 'associated'
card.state = 'associated'
self.card = card
await self.tx_json('select_card_ack', {'identities': card.identity})
break
else:
self.tx_error('Unknown message type %s' % rx['msg_type'])
async def state_selected(self):
while True:
rx = await self.rx_json()
if rx['msg_type'] == 'c_apdu':
rsp, sw = await self.card.xceive_apdu_raw(rx['command'])
await self.tx_json('r_apdu', {'response': rsp, 'sw': sw})
async def ws_conn_hdlr(websocket):
rx_raw = await websocket.recv()
rx = json.loads(rx_raw)
assert rx['msg_type'] == 'hello'
client_type = rx['client_type']
logger.info("New client (type %s) connection accepted", client_type)
if client_type == 'card':
card = CardClient(websocket, rx)
await card.tx_hello_ack()
card_clients.add(card)
# first obtain the identity of the card
await card.identify()
# then go into the "main loop"
try:
await card.ws_hdlr()
finally:
logger.info("%s: connection closed", card)
card_clients.remove(card)
elif client_type == 'user':
user = UserClient(websocket, rx)
await user.tx_hello_ack()
user_clients.add(user)
# first wait for the user to specify the select the card
await user.state_init()
try:
await user.state_selected()
finally:
logger.info("%s: connection closed", user)
user_clients.remove(user)
else:
logger.info("Rejecting client (unknown type %s) connection", client_type)
raise ValueError
async def main():
async with serve(ws_conn_hdlr, "localhost", 8765):
await asyncio.get_running_loop().create_future() # run forever
asyncio.run(main())

206
contrib/fsdump2saip.py Executable file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# This is a script to generate a [partial] eSIM profile from the 'fsdump' of another USIM/ISIM. This is
# useful to generate an "as close as possible" eSIM from a physical USIM, as far as that is possible
# programmatically and in a portable way.
#
# Of course, this really only affects the file sytem aspects of the card. It's not possible
# to read the K/OPc or other authentication related parameters off a random USIM, and hence
# we cannot replicate that. Similarly, it's not possible to export the java applets from a USIM,
# and hence we cannot replicate those.
import argparse
from pySim.esim.saip import *
from pySim.ts_102_221 import *
class FsdumpToSaip:
def __init__(self, pes: ProfileElementSequence):
self.pes = pes
@staticmethod
def fcp_raw2saip_fcp(fname:str, fcp_raw: bytes) -> Dict:
"""Convert a raw TLV-encoded FCP to a SAIP dict-format (as needed by asn1encode)."""
ftype = fname.split('.')[0]
# use the raw FCP as basis so we don't get stuck with potentially old decoder bugs
# ore future format incompatibilities
fcp = FcpTemplate()
fcp.from_tlv(fcp_raw)
r = {}
r['fileDescriptor'] = fcp.child_by_type(FileDescriptor).to_bytes()
file_id = fcp.child_by_type(FileIdentifier)
if file_id:
r['fileID'] = file_id.to_bytes()
else:
if ftype in ['ADF']:
print('%s is an ADF but has no [mandatory] file_id!' % fname)
#raise ValueError('%s is an ADF but has no [mandatory] file_id!' % fname)
r['fileID'] = b'\x7f\xff' # FIXME: auto-increment
df_name = fcp.child_by_type(DfName)
if ftype in ['ADF']:
if not df_name:
raise ValueError('%s is an ADF but has no [mandatory] df_name!' % fname)
r['dfName'] = df_name.to_bytes()
lcsi_byte = fcp.child_by_type(LifeCycleStatusInteger).to_bytes()
if lcsi_byte != b'\x05':
r['lcsi'] = lcsi_byte
sa_ref = fcp.child_by_type(SecurityAttribReferenced)
if sa_ref:
r['securityAttributesReferenced'] = sa_ref.to_bytes()
file_size = fcp.child_by_type(FileSize)
if ftype in ['EF']:
if file_size:
r['efFileSize'] = file_size.to_bytes()
psdo = fcp.child_by_type(PinStatusTemplate_DO)
if ftype in ['MF', 'ADF', 'DF']:
if not psdo:
raise ValueError('%s is an %s but has no [mandatory] PinStatusTemplateDO' % fname)
else:
r['pinStatusTemplateDO'] = psdo.to_bytes()
sfid = fcp.child_by_type(ShortFileIdentifier)
if sfid and sfid.decoded:
if ftype not in ['EF']:
raise ValueError('%s is not an EF but has [forbidden] shortEFID' % fname)
r['shortEFID'] = sfid.to_bytes()
pinfo = fcp.child_by_type(ProprietaryInformation)
if pinfo and ftype in ['EF']:
spinfo = pinfo.child_by_type(SpecialFileInfo)
fill_p = pinfo.child_by_type(FillingPattern)
repeat_p = pinfo.child_by_type(RepeatPattern)
# only exists for BER-TLV files
max_fsize = pinfo.child_by_type(MaximumFileSize)
if spinfo or fill_p or repeat_p or max_fsize:
r['proprietaryEFInfo'] = {}
if spinfo:
r['proprietaryEFInfo']['specialFileInformation'] = spinfo.to_bytes()
if fill_p:
r['proprietaryEFInfo']['fillPattern'] = fill_p.to_bytes()
if repeat_p:
r['proprietaryEFInfo']['repeatPattern'] = repeat_p.to_bytes()
if max_fsize:
r['proprietaryEFInfo']['maximumFileSize'] = max_fsize.to_bytes()
# TODO: linkPath
return r
@staticmethod
def fcp_fsdump2saip(fsdump_ef: Dict):
"""Convert a file from its "fsdump" representation to the SAIP representation of a File type
in the decoded format as used by the asn1tools-generated codec."""
# first convert the FCP
path = fsdump_ef['path']
fdesc = FsdumpToSaip.fcp_raw2saip_fcp(path[-1], h2b(fsdump_ef['fcp_raw']))
r = [
('fileDescriptor', fdesc),
]
# then convert the body. We're doing a straight-forward conversion without any optimization
# like not encoding all-ff files. This should be done by a subsequent optimizer
if 'body' in fsdump_ef and fsdump_ef['body']:
if isinstance(fsdump_ef['body'], list):
for b_seg in fsdump_ef['body']:
r.append(('fillFileContent', h2b(b_seg)))
else:
r.append(('fillFileContent', h2b(fsdump_ef['body'])))
print(fsdump_ef['path'])
return r
def add_file_from_fsdump(self, fsdump_ef: Dict):
fid = int(fsdump_ef['fcp']['file_identifier'])
# determine NAA
if fsdump_ef['path'][0:1] == ['MF', 'ADF.USIM']:
naa = NaaUsim
elif fsdump_ef['path'][0:1] == ['MF', 'ADF.ISIM']:
naa = NaaIsim
else:
print("Unable to determine NAA for %s" % fsdump_ef['path'])
return
pes.pes_by_naa[naa.name]
for pe in pes:
print("PE %s" % pe)
if not isinstance(pe, FsProfileElement):
print("Skipping PE %s" % pe)
continue
if not pe.template:
print("No template for PE %s" % pe )
continue
if not fid in pe.template.files_by_fid:
print("File %04x not available in template; must create it using GenericFileMgmt" % fid)
parser = argparse.ArgumentParser()
parser.add_argument('fsdump', help='')
def has_unsupported_path_prefix(path: List[str]) -> bool:
# skip some paths from export as they don't exist in an eSIM profile
UNSUPPORTED_PATHS = [
['MF', 'DF.GSM'],
]
for p in UNSUPPORTED_PATHS:
prefix = path[:len(p)]
if prefix == p:
return True
# any ADF not USIM or ISIM are unsupported
SUPPORTED_ADFS = [ 'ADF.USIM', 'ADF.ISIM' ]
if len(path) == 2 and path[0] == 'MF' and path[1].startswith('ADF.') and path[1] not in SUPPORTED_ADFS:
return True
return False
import traceback
if __name__ == '__main__':
opts = parser.parse_args()
with open(opts.fsdump, 'r') as f:
fsdump = json.loads(f.read())
pes = ProfileElementSequence()
# FIXME: fsdump has strting-name path, but we need FID-list path for ProfileElementSequence
for path, fsdump_ef in fsdump['files'].items():
print("=" * 80)
#print(fsdump_ef)
if not 'fcp_raw' in fsdump_ef:
continue
if has_unsupported_path_prefix(fsdump_ef['path']):
print("Skipping eSIM-unsupported path %s" % ('/'.join(fsdump_ef['path'])))
continue
saip_dec = FsdumpToSaip.fcp_fsdump2saip(fsdump_ef)
#print(saip_dec)
try:
f = pes.add_file_at_path(Path(path), saip_dec)
print(repr(f))
except Exception as e:
print("EXCEPTION: %s" % traceback.format_exc())
#print("EXCEPTION: %s" % e)
continue
print("=== Tree === ")
pes.mf.print_tree()
# FIXME: export the actual PE Sequence

View File

@@ -48,6 +48,7 @@ pySim consists of several parts:
sim-rest
suci-keytool
saip-tool
wsrc
Indices and tables

31
docs/wsrc.rst Normal file
View File

@@ -0,0 +1,31 @@
WebSocket Remote Card (WSRC)
============================
WSRC (*Web Socket Remote Card*) is a mechanism by which card readers can be made remotely available
via a computer network. The transport mechanism is (as the name implies) a WebSocket. This transport
method was chosen to be as firewall/NAT friendly as possible.
WSRC Network Architecture
-------------------------
In a WSRC network, there are three major elements:
* The **WSRC Card Client** which exposes a locally attached smart card (usually via a Smart Card Reader)
to a remote *WSRC Server*
* The **WSRC Server** manges incoming connections from both *WSRC Card Clients* as well as *WSRC User Clients*
* The **WSRC User Client** is a user application, like for example pySim-shell, which is accessing a remote
card by connecting to the *WSRC Server* which relays the information to the selected *WSRC Card Client*
WSRC Protocol
-------------
The WSRC protocl consits of JSON objects being sent over a websocket. The websocket communication itself
is based on HTTP and should usually operate via TLS for security reasons.
The detailed protocol is currently still WIP. The plan is to document it here.
pySim implementations
---------------------
TBD

154
ota_test.py Executable file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/python3
from pySim.ota import *
from pySim.sms import SMS_SUBMIT, SMS_DELIVER, AddressField
from pySim.utils import h2b, h2b
# pre-defined SPI values for use in test cases below
SPI_CC_POR_CIPHERED_CC = {
'counter':'no_counter',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':True,
'por_rc_cc_ds': 'cc',
'por': 'por_required'
}
SPI_CC_POR_UNCIPHERED_CC = {
'counter':'no_counter',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'cc',
'por': 'por_required'
}
SPI_CC_POR_UNCIPHERED_NOCC = {
'counter':'no_counter',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'no_rc_cc_ds',
'por': 'por_required'
}
# SJA5 SAMPLE cards provisioned by execute_ipr.py
OTA_KEYSET_SJA5_SAMPLES = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3,
algo_auth='triple_des_cbc2', kid_idx=3,
kic=h2b('300102030405060708090a0b0c0d0e0f'),
kid=h2b('301102030405060708090a0b0c0d0e0f'))
OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
algo_auth='aes_cmac', kid_idx=2,
kic=h2b('200102030405060708090a0b0c0d0e0f'),
kid=h2b('201102030405060708090a0b0c0d0e0f'))
# TS.48 profile on sysmoEUICC1-C2G
OTA_KEYSET_C2G_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
algo_auth='aes_cmac', kid_idx=2,
kic=h2b('66778899AABBCCDD1122334455EEFF10'),
kid=h2b('112233445566778899AABBCCDDEEFF10'))
# ISD-R on sysmoEUICC1-C2G
OTA_KEYSET_C2G_AES128_ISDR = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('B52F9C5938D1C19ED73E1AE772937FD7'),
kid=h2b('3BC696ACD1EEC95A6624F7330D22FC81'))
# ISD-A on sysmoEUICC1-C2G
OTA_KEYSET_C2G_AES128_ISDA = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('8DAAD1DAAA8D7C9000E3BBED8B7556E7'),
kid=h2b('5392D503AE050DDEAF81AFAEFF275A2B'))
# TODO: AES192
# TODO: AES256
testcases = [
{
'name': '3DES-SJA5-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_CIPHERED_CC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
},
'response': {
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
}
}, {
'name': '3DES-SJA5-UNCIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_CC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
},
'response': {
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
}
}, {
'name': '3DES-SJA5-UNCIPHERED-NOCC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
},
'response': {
'encoded_resp': '027100000e0ab0001100000000000000016132',
}
}, {
'name': 'AES128-C2G-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_C2G_AES128_ISDR,
'spi': SPI_CC_POR_CIPHERED_CC,
'request': {
#'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'apdu': h2b('80ec800300'),
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
},
'response': {
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
}
}
]
for t in testcases:
print()
print("==== TESTCASE: %s" % t['name'])
od = t['ota_keyset']
# RAM: B00000
# SIM RFM: B00010
# USIM RFM: B00011
# ISD-R: 000001
# ECASD: 000002
tar = h2b('000001')
dialect = OtaDialectSms()
outp = dialect.encode_cmd(od, tar, t['spi'], apdu=t['request']['apdu'])
print("result: %s" % b2h(outp))
#assert(b2h(outp) == t['request']['encoded_cmd'])
with_udh = b'\x02\x70\x00' + outp
print("with_udh: %s" % b2h(with_udh))
# processing of the response from the card
da = AddressField('12345678', 'unknown', 'isdn_e164')
#tpdu = SMS_SUBMIT(tp_udhi=True, tp_mr=0x23, tp_da=da, tp_pid=0x7F, tp_dcs=0xF6, tp_udl=3, tp_ud=with_udh)
tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=da, tp_pid=0x7F, tp_dcs=0xF6, tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
print("TPDU: %s" % tpdu)
print("tpdu: %s" % b2h(tpdu.to_bytes()))
#assert(b2h(tpdu.to_bytes()) == t['request']['encoded_tpdu'])
r = dialect.decode_resp(od, t['spi'], t['response']['encoded_resp'])
print("RESP: ", r)

View File

@@ -23,6 +23,7 @@ from pySim.apdu_source.gsmtap import GsmtapApduSource
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap, PysharkRsproLive
from pySim.apdu_source.pyshark_gsmtap import PysharkGsmtapPcap
from pySim.apdu_source.tca_loader_log import TcaLoaderLogApduSource
from pySim.apdu_source.stdin_hex import StdinHexApduSource
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
@@ -32,10 +33,11 @@ logger = colorlog.getLogger()
# merge all of the command sets into one global set. This will override instructions,
# the one from the 'last' set in the addition below will prevail.
from pySim.apdu.ts_51_011 import ApduCommands as SimApduCommands
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
ApduCommands = UiccApduCommands + UsimApduCommands #+ GpApduCommands
ApduCommands = SimApduCommands + UiccApduCommands + UsimApduCommands #+ GpApduCommands
class DummySimLink(LinkBase):
@@ -190,6 +192,10 @@ parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
help='Name of te log file to be read')
parser_stdin_hex = subparsers.add_parser('stdin-hex', help="""
Read APDUs as hex-string from stdin.""")
if __name__ == '__main__':
opts = option_parser.parse_args()
@@ -205,6 +211,8 @@ if __name__ == '__main__':
s = PysharkGsmtapPcap(opts.pcap_file)
elif opts.source == 'tca-loader-log':
s = TcaLoaderLogApduSource(opts.log_file)
elif opts.source == 'stdin-hex':
s = StdinHexApduSource()
else:
raise ValueError("unsupported source %s", opts.source)

339
pySim/apdu/ts_51_011.py Normal file
View File

@@ -0,0 +1,339 @@
# coding=utf-8
"""APDU definitions/decoders of 3GPP TS 51.011, the classic SIM spec.
(C) 2022 by Harald Welte <laforge@osmocom.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
from construct import GreedyRange, Struct
from pySim.construct import *
from pySim.filesystem import *
from pySim.runtime import RuntimeLchan
from pySim.apdu import ApduCommand, ApduCommandSet
from typing import Optional, Dict, Tuple
logger = logging.getLogger(__name__)
# TS 51.011 Section 9.2.1
class SimSelect(ApduCommand, n='SELECT', ins=0xA4, cla=['A0']):
_apdu_case = 4
def process_on_lchan(self, lchan: RuntimeLchan):
path = [self.cmd_data[i:i+2] for i in range(0, len(self.cmd_data), 2)]
for file in path:
file_hex = b2h(file)
sels = lchan.selected_file.get_selectables(['FIDS'])
if file_hex in sels:
if self.successful:
#print("\tSELECT %s" % sels[file_hex])
lchan.selected_file = sels[file_hex]
else:
#print("\tSELECT %s FAILED" % sels[file_hex])
pass
continue
logger.warning('SELECT UNKNOWN FID %s (%s)' % (file_hex, '/'.join([b2h(x) for x in path])))
if len(self.cmd_data) != 2:
raise ValueError('Expecting a 2-byte FID')
# decode the SELECT response
if self.successful:
self.file = lchan.selected_file
if 'body' in self.rsp_dict:
# not every SELECT is asking for the FCP in response...
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
return None
# TS 51.011 Section 9.2.2
class SimStatus(ApduCommand, n='STATUS', ins=0xF2, cla=['A0']):
_apdu_case = 2
def process_on_lchan(self, lchan):
if self.successful:
if 'body' in self.rsp_dict:
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
def _decode_binary_p1p2(p1, p2) -> Dict:
ret = {}
if p1 & 0x80:
ret['file'] = 'sfi'
ret['sfi'] = p1 & 0x1f
ret['offset'] = p2
else:
ret['file'] = 'currently_selected_ef'
ret['offset'] = ((p1 & 0x7f) << 8) & p2
return ret
# TS 51.011 Section 9.2.3 / 31.101
class ReadBinary(ApduCommand, n='READ BINARY', ins=0xB0, cla=['A0']):
_apdu_case = 2
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short reads
if self.cmd_dict['offset'] != 0 or self.lr < self.file.size[0]:
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 51.011 Section 9.2.4 / 31.101
class UpdateBinary(ApduCommand, n='UPDATE BINARY', ins=0xD6, cla=['A0']):
_apdu_case = 3
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short writes
if self.cmd_dict['offset'] != 0 or self.lc < self.file.size[0]:
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
def _decode_record_p1p2(p1, p2):
ret = {}
ret['record_number'] = p1
if p2 >> 3 == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = p2 >> 3
mode = p2 & 0x7
if mode == 2:
ret['mode'] = 'next_record'
elif mode == 3:
ret['mode'] = 'previous_record'
elif mode == 8:
ret['mode'] = 'absolute_current'
return ret
# TS 51.011 Section 9.2.5
class ReadRecord(ApduCommand, n='READ RECORD', ins=0xB2, cla=['A0']):
_apdu_case = 2
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 51.011 Section 9.2.6
class UpdateRecord(ApduCommand, n='UPDATE RECORD', ins=0xDC, cla=['A0']):
_apdu_case = 3
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
# TS 51.011 Section 9.2.7
class Seek(ApduCommand, n='SEEK', ins=0xA2, cla=['A0']):
_apdu_case = 4
_construct_rsp = GreedyRange(Int8ub)
def _decode_p1p2(self):
ret = {}
sfi = self.p2 >> 3
if sfi == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = sfi
mode = self.p2 & 0x7
if mode in [0x4, 0x5]:
if mode == 0x4:
ret['mode'] = 'forward_search'
else:
ret['mode'] = 'backward_search'
ret['record_number'] = self.p1
self.col_id = '%02u' % ret['record_number']
elif mode == 6:
ret['mode'] = 'enhanced_search'
# TODO: further decode
elif mode == 7:
ret['mode'] = 'proprietary_search'
return ret
def _decode_cmd(self):
ret = self._decode_p1p2()
if self.cmd_data:
if ret['mode'] == 'enhanced_search':
ret['search_indication'] = b2h(self.cmd_data[:2])
ret['search_string'] = b2h(self.cmd_data[2:])
else:
ret['search_string'] = b2h(self.cmd_data)
return ret
def process_on_lchan(self, lchan):
self._determine_file(lchan)
return self.to_dict()
# TS 51.011 Section 9.2.8
class Increase(ApduCommand, n='INCREASE', ins=0x32, cla=['A0']):
_apdu_case = 4
PinConstructP2 = BitStruct('scope'/Enum(Flag, global_mf=0, specific_df_adf=1),
BitsInteger(2), 'reference_data_nr'/BitsInteger(5))
# TS 51.011 Section 9.2.9
class VerifyChv(ApduCommand, n='VERIFY CHV', ins=0x20, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
@staticmethod
def _pin_process(apdu):
processed = {
'scope': apdu.cmd_dict['p2']['scope'],
'referenced_data_nr': apdu.cmd_dict['p2']['reference_data_nr'],
}
if apdu.lc == 0:
# this is just a question on the counters remaining
processed['mode'] = 'check_remaining_attempts'
else:
processed['pin'] = b2h(apdu.cmd_data)
if apdu.sw[0] == 0x63:
processed['remaining_attempts'] = apdu.sw[1] & 0xf
return processed
@staticmethod
def _pin_is_success(sw):
if sw[0] == 0x63:
return True
else:
return False
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.10
class ChangeChv(ApduCommand, n='CHANGE CHV', ins=0x24, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.11
class DisableChv(ApduCommand, n='DISABLE CHV', ins=0x26, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.12
class EnableChv(ApduCommand, n='ENABLE CHV', ins=0x28, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.13
class UnblockChv(ApduCommand, n='UNBLOCK CHV', ins=0x2C, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyChv._pin_process(self)
def _is_success(self):
return VerifyChv._pin_is_success(self.sw)
# TS 51.011 Section 9.2.14
class Invalidate(ApduCommand, n='INVALIDATE', ins=0x04, cla=['A0']):
_apdu_case = 1
_construct_p1 = BitStruct(BitsInteger(4),
'select_mode'/Enum(BitsInteger(4), ef_by_file_id=0,
path_from_mf=8, path_from_current_df=9))
# TS 51.011 Section 9.2.15
class Rehabilitate(ApduCommand, n='REHABILITATE', ins=0x44, cla=['A0']):
_apdu_case = 1
_construct_p1 = Invalidate._construct_p1
# TS 51.011 Section 9.2.16
class RunGsmAlgorithm(ApduCommand, n='RUN GSM ALGORITHM', ins=0x88, cla=['A0']):
_apdu_case = 4
_construct = Struct('rand'/Bytes(16))
_construct_rsp = Struct('sres'/Bytes(4), 'kc'/Bytes(8))
# TS 51.011 Section 9.2.17
class Sleep(ApduCommand, n='SLEEP', ins=0xFA, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.18
class GetResponse(ApduCommand, n='GET RESPONSE', ins=0xC0, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.19
class TerminalProfile(ApduCommand, n='TERMINAL PROFILE', ins=0x10, cla=['A0']):
_apdu_case = 3
# TS 51.011 Section 9.2.20
class Envelope(ApduCommand, n='ENVELOPE', ins=0xC2, cla=['A0']):
_apdu_case = 4
# TS 51.011 Section 9.2.21
class Fetch(ApduCommand, n='FETCH', ins=0x12, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.22
class TerminalResponse(ApduCommand, n='TERMINAL RESPONSE', ins=0x14, cla=['A0']):
_apdu_case = 3
ApduCommands = ApduCommandSet('TS 51.011', cmds=[SimSelect, SimStatus, ReadBinary, UpdateBinary, ReadRecord,
UpdateRecord, Seek, Increase, VerifyChv, ChangeChv, DisableChv,
EnableChv, UnblockChv, Invalidate, Rehabilitate, RunGsmAlgorithm,
Sleep, GetResponse, TerminalProfile, Envelope, Fetch, TerminalResponse])

View File

@@ -0,0 +1,39 @@
# coding=utf-8
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.utils import h2b
from pySim.gsmtap import GsmtapSource
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_102_222 import ApduCommands as UiccAdmApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
from . import ApduSource, PacketType, CardReset
ApduCommands = UiccApduCommands + UiccAdmApduCommands + UsimApduCommands + GpApduCommands
class StdinHexApduSource(ApduSource):
"""ApduSource for reading apdu hex-strings from stdin."""
def read_packet(self) -> PacketType:
while True:
command = input("C-APDU >")
response = '9000'
return ApduCommands.parse_cmd_bytes(h2b(command) + h2b(response))
raise StopIteration

View File

@@ -31,6 +31,7 @@ from pySim.exceptions import SwMatchError
# CardModel is created, which will add the ATR-based matching and
# calling of SysmocomSJA2.add_files. See CardModel.apply_matching_models
import pySim.sysmocom_sja2
#import pySim.sysmocom_euicc1
# we need to import these modules so that the various sub-classes of
# CardProfile are created, which will be used in init_card() to iterate

View File

@@ -2,7 +2,7 @@
mainly) ETSI TS 102 223, ETSI TS 101 220 and USIM Application Toolkit (SAT)
as described in 3GPP TS 31.111."""
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
# (C) 2021-2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by

View File

@@ -21,6 +21,8 @@ import io
import os
from typing import Tuple, List, Optional, Dict, Union
from collections import OrderedDict
from difflib import SequenceMatcher, Match
import asn1tools
import zipfile
from pySim import javacard
@@ -44,6 +46,29 @@ asn1 = compile_asn1_subdir('saip')
logger = logging.getLogger(__name__)
class NonMatch(Match):
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
@classmethod
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
The caller must ensure that the input list is ordered, non-overlapping and only contains
matches at equal offsets in a and b."""
res = []
cur = 0
for match in l:
if match.a != match.b:
return ValueError('only works for equal-offset matches')
assert match.a >= cur
nm_len = match.a - cur
if nm_len > 0:
# there's no point in generating zero-lenth non-matching sections
res.append(cls(a=cur, b=cur, size=nm_len))
cur = match.a + match.size
if size > cur:
res.append(cls(a=cur, b=cur, size=size-cur))
return res
class Naa:
"""A class defining a Network Access Application (NAA)"""
name = None
@@ -406,12 +431,35 @@ class File:
return ValueError("Unknown key '%s' in tuple list" % k)
return stream.getvalue()
def file_content_to_tuples(self) -> List[Tuple]:
# FIXME: simplistic approach. needs optimization. We should first check if the content
# matches the expanded default value from the template. If it does, return empty list.
# Next, we should compute the diff between the default value and self.body, and encode
# that as a sequence of fillFileOffset and fillFileContent tuples.
return [('fillFileContent', self.body)]
def file_content_to_tuples(self, optimize:bool = True) -> List[Tuple]:
if not self.file_type in ['TR', 'LF', 'CY', 'BT']:
return []
if not optimize:
# simplistic approach: encode the full file, ignoring the template/default
return [('fillFileContent', self.body)]
# Try to 'compress' the file body, based on the default file contents.
if self.template:
default = self.template.expand_default_value_pattern(length=len(self.body))
if not default:
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
else:
if default == self.body:
# 100% match: return an empty tuple list to make eUICC use the default
return []
sm = SequenceMatcher(a=default, b=self.body)
else:
# no template at all: we can only remove padding
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
matching_blocks = sm.get_matching_blocks()
# we can only make use of matches that have the same offset in 'a' and 'b'
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
ret = []
cur = 0
for block in non_matching_blocks:
ret.append(('fillFileOffset', block.a - cur))
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
return ret
def __str__(self) -> str:
return "File(%s)" % self.pe_name

View File

@@ -302,7 +302,7 @@ class OtaAlgoAuthDES3(OtaAlgoAuth):
class OtaAlgoCryptAES(OtaAlgoCrypt):
name = 'AES'
enum_name = 'aes_cbc'
blocksize = 16 # TODO: is this needed?
blocksize = 16
def _encrypt(self, data:bytes) -> bytes:
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
return cipher.encrypt(data)
@@ -356,20 +356,20 @@ class OtaDialectSms(OtaDialect):
# CHL + SPI (+ KIC + KID)
part_head = self.hdr_construct.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
#print("part_head: %s" % b2h(part_head))
print("part_head: %s" % b2h(part_head))
# CNTR + PCNTR (CNTR not used)
part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
#print("part_cnt: %s" % b2h(part_cnt))
print("part_cnt: %s" % b2h(part_cnt))
envelope_data = part_head + part_cnt + apdu
#print("envelope_data: %s" % b2h(envelope_data))
print("envelope_data: %s" % b2h(envelope_data))
# 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
# CPL from and including CPI to end of secured data, including any padding for ciphering
cpl = len(envelope_data) + len_sig
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
#print("envelope_data with cpl: %s" % b2h(envelope_data))
print("envelope_data with cpl: %s" % b2h(envelope_data))
if spi['rc_cc_ds'] == 'cc':
cc = otak.auth.sign(envelope_data)
@@ -383,7 +383,7 @@ class OtaDialectSms(OtaDialect):
else:
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
#print("envelope_data with sig: %s" % b2h(envelope_data))
print("envelope_data with sig: %s" % b2h(envelope_data))
# encrypt as needed
if spi['ciphering']: # ciphering is requested
@@ -395,7 +395,7 @@ class OtaDialectSms(OtaDialect):
else:
envelope_data = part_head + envelope_data
#print("envelope_data: %s" % b2h(envelope_data))
print("envelope_data: %s" % b2h(envelope_data))
if len(envelope_data) > 140:
raise ValueError('Cannot encode command in a single SMS; Fragmentation not implemented')
@@ -492,6 +492,7 @@ class OtaDialectSms(OtaDialect):
else:
raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
print(res)
# TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
if res.response_status == 'por_ok' and len(res['secured_data']):
dec = CompactRemoteResp.parse(res['secured_data'])

View File

@@ -30,6 +30,13 @@ from smpp.pdu import pdu_types, operations
BytesOrHex = typing.Union[Hexstr, bytes]
# 07
# 00 03 000201 # part 01 of 02 in reference 00
# 70 00
# 05
# 00 03 000202
class UserDataHeader:
# a single IE in the user data header
ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))

View File

@@ -162,6 +162,28 @@ class EF_SIM_AUTH_KEY(TransparentEF):
'key'/Bytes(16),
'op_opc' /Bytes(16))
class EF_HTTPS_CFG(TransparentEF):
def __init__(self, fid='6f2a', name='EF.HTTPS_CFG'):
super().__init__(fid, name=name, desc='HTTPS configuration')
class EF_HTTPS_KEYS(TransparentEF):
KeyRecord = Struct('security_domain'/Int8ub,
'key_type'/Enum(Int8ub, des=0x80, psk=0x85, aes=0x88),
'key_version'/Int8ub,
'key_id'/Int8ub,
'key_length'/Int8ub,
'key'/Bytes(this.key_length))
def __init__(self, fid='6f2b', name='EF.HTTPS_KEYS'):
super().__init__(fid, name=name, desc='HTTPS PSK and DEK keys')
self._construct = GreedyRange(self.KeyRecord)
class EF_HTTPS_POLL(TransparentEF):
TimeUnit = Enum(Int8ub, seconds=0, minutes=1, hours=2, days=3, ten_days=4)
def __init__(self, fid='6f2c', name='EF.HTTPS_POLL'):
super().__init__(fid, name=name, desc='HTTPS polling interval')
self._construct = Struct(Const(b'\x82'), 'time_unit'/self.TimeUnit, 'value'/Int8ub,
'adm_session_triggering_tlv'/GreedyBytes)
class DF_SYSTEM(CardDF):
def __init__(self):
@@ -180,6 +202,9 @@ class DF_SYSTEM(CardDF):
EF_0348_COUNT(),
EF_GP_COUNT(),
EF_GP_DIV_DATA(),
EF_HTTPS_CFG(),
EF_HTTPS_KEYS(),
EF_HTTPS_POLL(),
]
self.add_files(files)

View File

@@ -333,11 +333,13 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
from pySim.transport.pcsc import PcscSimLink
from pySim.transport.modem_atcmd import ModemATCommandLink
from pySim.transport.calypso import CalypsoSimLink
from pySim.transport.wsrc import WsrcSimLink
SerialSimLink.argparse_add_reader_args(arg_parser)
PcscSimLink.argparse_add_reader_args(arg_parser)
ModemATCommandLink.argparse_add_reader_args(arg_parser)
CalypsoSimLink.argparse_add_reader_args(arg_parser)
WsrcSimLink.argparse_add_reader_args(arg_parser)
arg_parser.add_argument('--apdu-trace', action='store_true',
help='Trace the command/response APDUs exchanged with the card')
@@ -360,6 +362,9 @@ def init_reader(opts, **kwargs) -> LinkBase:
elif opts.modem_dev is not None:
from pySim.transport.modem_atcmd import ModemATCommandLink
sl = ModemATCommandLink(opts, **kwargs)
elif opts.wsrc_server_url is not None:
from pySim.transport.wsrc import WsrcSimLink
sl = WsrcSimLink(opts, **kwargs)
else: # Serial reader is default
print("No reader/driver specified; falling back to default (Serial reader)")
from pySim.transport.serial import SerialSimLink

101
pySim/transport/wsrc.py Normal file
View File

@@ -0,0 +1,101 @@
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
from typing import Optional
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
from pySim.transport import LinkBase
from pySim.utils import ResTuple
from pySim.wsrc.client_blocking import WsClientBlocking
class UserWsClientBlocking(WsClientBlocking):
def perform_outbound_hello(self):
hello_data = {
'client_type': 'user',
}
super().perform_outbound_hello(hello_data)
def select_card(self, id_type:str, id_str:str):
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
'select_card_ack')
return rx
def reset_card(self):
self.transceive_json('reset_req', {}, 'reset_resp')
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
return rx['response'], rx['sw']
class WsrcSimLink(LinkBase):
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
name = 'WSRC'
def __init__(self, opts: argparse.Namespace, **kwargs):
super().__init__(**kwargs)
self.identities = {}
self.server_url = opts.wsrc_server_url
if opts.wsrc_eid:
self.id_type = 'eid'
self.id_str = opts.wsrc_eid
elif opts.wsrc_iccid:
self.id_type = 'iccid'
self.id_str = opts.wsrc_iccid
self.client = UserWsClientBlocking(self.server_url)
self.client.connect()
def __del__(self):
# FIXME: disconnect from server
pass
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
self.connect()
def connect(self):
rx = self.client.select_card(self.id_type, self.id_str)
self.identitites = rx['identities']
def get_atr(self) -> Hexstr:
return self.identities['atr']
def disconnect(self):
self.__delete__()
def _reset_card(self):
self.client.reset_card()
return 1
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
return self.client.xceive_apdu_raw(pdu)
def __str__(self) -> str:
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
@staticmethod
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
can be accessed via a network.""")
wsrc_group.add_argument('--wsrc-server-url',
help='URI of the WSRC server to connect to')
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
help='ICCID of the card to open via WSRC')
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
help='EID of the card to open via WSRC')

0
pySim/wsrc/__init__.py Normal file
View File

View File

@@ -0,0 +1,65 @@
"""Connect smartcard to a remote server, so the remote server can take control and
perform commands on it."""
import abc
import json
import logging
from typing import Optional
from websockets.sync.client import connect
from osmocom.utils import b2h
logger = logging.getLogger(__name__)
class WsClientBlocking(abc.ABC):
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
def __init__(self, ws_uri: str):
self.ws_uri = ws_uri
self.ws = None
def connect(self):
self.ws = connect(uri=self.ws_uri)
self.perform_outbound_hello()
def tx_json(self, msg_type: str, d: dict = {}):
"""JSON-Encode and transmit a message to the given websocket."""
d['msg_type'] = msg_type
d_js = json.dumps(d)
logger.debug("Tx: %s", d_js)
self.ws.send(d_js)
def tx_error(self, message: str):
event = {
'message': message,
}
self.tx_json('error', event)
def rx_json(self):
"""Receive a single message from the given websocket and JSON-decode it."""
rx = self.ws.recv()
rx_js = json.loads(rx)
logger.debug("Rx: %s", rx_js)
assert 'msg_type' in rx
return rx_js
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
self.tx_json(tx_msg_type, tx_d)
rx = self.rx_json()
assert rx['msg_type'] == rx_msg_type
return rx
def perform_outbound_hello(self, tx: dict = {}):
self.tx_json('hello', tx)
rx = self.rx_json()
assert rx['msg_type'] == 'hello_ack'
return rx
def rx_and_execute_cmd(self):
"""Receve and dispatch/execute a single command from the server."""
rx = self.rx_json()
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
if handler:
handler(rx)
else:
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])

View File

@@ -15,3 +15,4 @@ git+https://github.com/osmocom/asn1tools
packaging
git+https://github.com/hologram-io/smpp.pdu
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
smpplib

142
smpp_ota_apdu2.py Executable file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
import logging
import sys
from pprint import pprint as pp
from pySim.ota import OtaKeyset, OtaDialectSms
from pySim.utils import b2h, h2b
import smpplib.gsm
import smpplib.client
import smpplib.consts
logger = logging.getLogger(__name__)
# if you want to know what's happening
logging.basicConfig(level='DEBUG')
class Foo:
def smpp_rx_handler(self, pdu):
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
if pdu.short_message:
try:
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
except ValueError:
spi = self.spi.copy()
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
pp(dec)
return None
def __init__(self):
# Two parts, UCS2, SMS with UDH
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
# Print when obtain message_id
client.set_message_sent_handler(
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
#client.set_message_received_handler(
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
client.set_message_received_handler(self.smpp_rx_handler)
client.connect()
client.bind_transceiver(system_id='test', password='test')
self.client = client
if False:
KIC1 = h2b('000102030405060708090a0b0c0d0e0f')
KID1 = h2b('101112131415161718191a1b1c1d1e1f')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
if False:
KIC1 = h2b('4BE2D58A1FA7233DD723B3C70996E6E6')
KID1 = h2b('4a664208eba091d32c4ecbc299da1f34')
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
#KIC3 = h2b('A4074D8E1FE69B484A7E62682ED09B51')
#KID3 = h2b('41FF1033910112DB4EBEBB7807F939CD')
#self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
# algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
#self.tar = h2b('B00011') # USIM RFM
self.tar = h2b('000000') # RAM
if False: # sysmoEUICC1-C2G
KIC1 = h2b('B52F9C5938D1C19ED73E1AE772937FD7')
KID1 = h2b('3BC696ACD1EEC95A6624F7330D22FC81')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
self.tar = h2b('000001') # ISD-R according to Annex H of SGP.02
#self.tar = h2b('000002') # ECASD according to Annex H of SGP.02
if False: # TS.48 profile
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
KID1 = h2b('112233445566778899aabbccddeeff10')
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=1, kic=KIC1,
algo_auth='triple_des_cbc2', kid_idx=1, kid=KID1)
self.tar = h2b('000000') # ISD-P according to FIXME
if False: # TS.48 profile AES
KIC1 = h2b('66778899aabbccdd1122334455eeff10')
KID1 = h2b('112233445566778899aabbccddeeff10')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2, kic=KIC1,
algo_auth='aes_cmac', kid_idx=2, kid=KID1)
self.tar = h2b('000000') # ISD-P according to FIXME
if True: # eSIM profile
KIC1 = h2b('207c5d2c1aa80d58cd8f542fb9ef2f80')
KID1 = h2b('312367f1681902fd67d9a71c62a840e3')
#KIC1 = h2b('B6B0E130EEE1C9A4A29DAEC034D35C9E')
#KID1 = h2b('889C51CD2A381A9D4EDDA9224C24A9AF')
self.ota_keyset = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1, kic=KIC1,
algo_auth='aes_cmac', kid_idx=1, kid=KID1)
self.ota_keyset.cntr = 7
self.tar = h2b('b00001') # ADF.USIM
self.ota_dialect = OtaDialectSms()
self.spi = {'counter':'counter_must_be_higher', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
def tx_sms_tpdu(self, tpdu: bytes):
self.client.send_message(
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure it is a byte string, not unicode:
source_addr='12',
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure thease two params are byte strings, not unicode:
destination_addr='23',
short_message=tpdu,
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
protocol_id=0x7f,
#registered_delivery=True,
)
def tx_c_apdu(self, apdu: bytes):
logger.info("C-APDU: %s" % b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
self.tx_sms_tpdu(tpdu)
f = Foo()
print("initialized")
#f.tx_c_apdu(h2b('80a40400023f00'))
#f.tx_c_apdu(h2b('80EC010100'))
#f.tx_c_apdu(h2b('80EC0101' + '0E' + '350103' + '390203e8' + '3e052101020304'))
f.tx_c_apdu(h2b('00a40004026f07' + '00b0000009'))
f.client.listen()

View File

@@ -90,5 +90,34 @@ class OidTest(unittest.TestCase):
self.assertTrue(oid.OID('1.0.1') > oid.OID('1.0'))
self.assertTrue(oid.OID('1.0.2') > oid.OID('1.0.1'))
class NonMatchTest(unittest.TestCase):
def test_nonmatch(self):
# non-matches before, in between and after matches
match_list = [Match(a=10, b=10, size=5), Match(a=20, b=20, size=4)]
nm_list = NonMatch.from_matchlist(match_list, 26)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=10), NonMatch(a=15, b=15, size=5),
NonMatch(a=24, b=24, size=2)])
def test_nonmatch_beg(self):
# single match at beginning
match_list = [Match(a=0, b=0, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 20)
self.assertEqual(nm_list, [NonMatch(a=5, b=5, size=15)])
def test_nonmatch_end(self):
# single match at end
match_list = [Match(a=19, b=19, size=5)]
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=19)])
def test_nonmatch_none(self):
# no match at all
match_list = []
nm_list = NonMatch.from_matchlist(match_list, 24)
self.assertEqual(nm_list, [NonMatch(a=0, b=0, size=24)])
if __name__ == "__main__":
unittest.main()

View File

@@ -16,6 +16,16 @@ SPI_CC_POR_CIPHERED_CC = {
'por': 'por_required'
}
SPI_CC_CTR_POR_UNCIPHERED_CC = {
'counter':'counter_must_be_higher',
'ciphering':True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'cc',
'por': 'por_required'
}
SPI_CC_POR_UNCIPHERED_CC = {
'counter':'no_counter',
'ciphering':True,
@@ -36,6 +46,17 @@ SPI_CC_POR_UNCIPHERED_NOCC = {
'por': 'por_required'
}
SPI_CC_UNCIPHERED = {
'counter':'no_counter',
'ciphering':False,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por_shall_be_ciphered':False,
'por_rc_cc_ds': 'no_rc_cc_ds',
'por': 'por_required'
}
######################################################################
# old-style code-driven test (lots of code copy+paste)
######################################################################
@@ -74,6 +95,48 @@ class Test_SMS_AES128(unittest.TestCase):
self.assertEqual(b2h(dec_tar), b2h(self.tar))
self.assertEqual(dec_spi, spi)
def test_open_channel(self):
spi = self.spi_base
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('000102030405060708090a0b0c0d0e0f'),
kid=h2b('101112131415161718191a1b1c1d1e1f'))
tpdu = h2b('40048111227ff6407070611535007e070003000201700000781516011212000001ccda206e8b0d46304247bf00bfdc9853eed2a826f9af8dc7c2974ce2cb9bb55cc1a8577e047cc8f5d450380ba86b25354fe69f58884f671d7ace0c911f7c74830dc1d58b62cce4934568697ba1f577eecbca26c5dbfa32b0e2f0877948a9fb46a122e4214947386f467de11c')
#'40048111227ff6407070611535000a0500030002027b6abed3'
submit = SMS_DELIVER.from_bytes(tpdu)
submit.tp_udhi = True
print(submit)
#print("UD: %s" % b2h(submit.tp_ud))
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
print("UDHD: %s" % udhd)
print("DATA: %s" % b2h(data))
tpdu2 = h2b('40048111227ff6407070611535000a0500030002027b6abed3')
submit2 = SMS_DELIVER.from_bytes(tpdu2)
print(submit2)
udhd2, data2 = UserDataHeader.from_bytes(submit2.tp_ud)
print("UDHD: %s" % udhd2)
print("DATA: %s" % b2h(data2))
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data + data2)
print(b2h(dec_tar), b2h(dec_apdu))
def test_open_channel2(self):
spi = self.spi_base
self.od = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('000102030405060708090a0b0c0d0e0f'),
kid=h2b('101112131415161718191a1b1c1d1e1f'))
tpdu = h2b('40048111227ff6407070611535004d02700000481516011212000001fe4c0943aea42e45021c078ae06c66afc09303608874b72f58bacadb0dcf665c29349c799fbb522e61709c9baf1890015e8e8e196e36153106c8b92f95153774')
submit = SMS_DELIVER.from_bytes(tpdu)
submit.tp_udhi = True
print(submit)
#print("UD: %s" % b2h(submit.tp_ud))
#print("len(UD)=%u, UDL=%u" % (len(submit.tp_ud), submit.tp_udl))
udhd, data = UserDataHeader.from_bytes(submit.tp_ud)
print("UDHD: %s" % udhd)
print("DATA: %s" % b2h(data))
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, data)
print(b2h(dec_tar), b2h(dec_apdu))
class Test_SMS_3DES(unittest.TestCase):
tar = h2b('b00000')
@@ -178,6 +241,13 @@ OTA_KEYSET_SJA5_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=2,
kic=h2b('200102030405060708090a0b0c0d0e0f'),
kid=h2b('201102030405060708090a0b0c0d0e0f'))
OTA_KEYSET_eSIM_AES128 = OtaKeyset(algo_crypt='aes_cbc', kic_idx=1,
algo_auth='aes_cmac', kid_idx=1,
kic=h2b('207c5d2c1aa80d58cd8f542fb9ef2f80'),
kid=h2b('312367f1681902fd67d9a71c62a840e3'))
OTA_KEYSET_eSIM_AES128.cntr = 2
class OtaTestCase(unittest.TestCase):
def __init__(self, methodName='runTest', **kwargs):
super().__init__(methodName, **kwargs)
@@ -185,6 +255,7 @@ class OtaTestCase(unittest.TestCase):
# SIM RFM: B00010
# USIM RFM: B00011
self.tar = h2b('B00011')
self.tar = h2b('B00000')
class SmsOtaTestCase(OtaTestCase):
# Array describing the input/output data for the tests. We use the
@@ -193,70 +264,102 @@ class SmsOtaTestCase(OtaTestCase):
# manually writing one class per test.
testdatasets = [
{
'name': '3DES-SJA5-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_CIPHERED_CC,
# 'name': '3DES-SJA5-CIPHERED-CC',
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
# 'spi': SPI_CC_POR_CIPHERED_CC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
# },
# 'response': {
# 'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
# 'name': '3DES-SJA5-UNCIPHERED-CC',
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
# 'spi': SPI_CC_POR_UNCIPHERED_CC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
# },
# 'response': {
# 'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
# 'name': '3DES-SJA5-UNCIPHERED-NOCC',
# 'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
# 'spi': SPI_CC_POR_UNCIPHERED_NOCC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
# },
# 'response': {
# 'encoded_resp': '027100000e0ab0001100000000000000016132',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
# 'name': 'AES128-SJA5-CIPHERED-CC',
# 'ota_keyset': OTA_KEYSET_SJA5_AES128,
# 'spi': SPI_CC_POR_CIPHERED_CC,
# 'request': {
# 'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
# 'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
# 'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
# },
# 'response': {
# 'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
# 'response_status': 'por_ok',
# 'number_of_commands': 1,
# 'last_status_word': '6132',
# 'last_response_data': '',
# }
# }, {
'name': 'AES128-eSIM-UNCIPHERED-CC',
'ota_keyset': OTA_KEYSET_eSIM_AES128,
'spi': SPI_CC_UNCIPHERED,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506193535b00011ae733256918d050b87c94fbfe12e4dc402f262c41cf67f2f',
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
'encoded_cmd': '1502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
'encoded_tpdu': '40048111227FF6407070611535002702700000221502011212B0000000000000020059E3EFF2E21D3809AA0A220880F28000024F0000',
},
'response': {
'encoded_resp': '027100001c12b000118bb989492c632529326a2f4681feb37c825bc9021c9f6d0b',
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
'encoded_resp': '027100000B0AB000000000000000000A9000',
'response_status': 'insufficient_security_level',
}
}, {
'name': '3DES-SJA5-UNCIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_CC,
'name': 'AES128-eSIM-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_eSIM_AES128,
'spi': SPI_CC_CTR_POR_UNCIPHERED_CC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506093535b00011c49ac91ab8159ba5b83a54fb6385e0a5e31694f8b215fafc',
'apdu': h2b('AA0A220880F28000024F0000'), #b'\xaa\x0a\x22\x08\x80\xf2\x80\x00\x02\x4f\x00\x00',
'encoded_cmd': '00281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
'encoded_tpdu': '40048111227FF6407070611535002D02700000281516091212B00000DD103B36C3BFBE9AA58BCE50D15DE123EE350BB19951DE24FD879A26FF252234',
},
'response': {
'encoded_resp': '027100001612b0001100000000000000b5bcd6353a421fae016132',
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
}
}, {
'name': '3DES-SJA5-UNCIPHERED-NOCC',
'ota_keyset': OTA_KEYSET_SJA5_SAMPLES,
'spi': SPI_CC_POR_UNCIPHERED_NOCC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
'encoded_tpdu': '400881214365877ff6227052000000000302700000201506013535b000113190be334900f52b025f3f7eddfe868e96ebf310023b7769',
},
'response': {
'encoded_resp': '027100000e0ab0001100000000000000016132',
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
}
}, {
'name': 'AES128-SJA5-CIPHERED-CC',
'ota_keyset': OTA_KEYSET_SJA5_AES128,
'spi': SPI_CC_POR_CIPHERED_CC,
'request': {
'apdu': b'\x00\xa4\x00\x04\x02\x3f\x00',
'encoded_cmd': '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
'encoded_tpdu': '400881214365877ff6227052000000000302700000281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058',
},
'response': {
'encoded_resp': '027100002412b00011ebc6b497e2cad7aedf36ace0e3a29b38853f0fe9ccde81913be5702b73abce1f',
'encoded_resp': '027100001C12B00000000000000200007DB7086951F7DB55AB0780010123026D009000',
#'encoded_resp': '027100000b0ab0000100000000000006'
#'encoded_resp': '027100002412b000019a551bb7c28183652de0ace6170d0e563c5e949a3ba56747fe4c1dbbef16642c'
'response_status': 'por_ok',
'number_of_commands': 1,
'last_status_word': '6132',
'last_response_data': '',
}
},
# TODO: AES192
# TODO: AES256
]
@@ -272,7 +375,7 @@ class SmsOtaTestCase(OtaTestCase):
kset = t['ota_keyset']
outp = self.dialect.encode_cmd(kset, self.tar, t['spi'], apdu=t['request']['apdu'])
#print("result: %s" % b2h(outp))
self.assertEqual(b2h(outp), t['request']['encoded_cmd'])
self.assertEqual(b2h(outp), t['request']['encoded_cmd'].lower())
with_udh = b'\x02\x70\x00' + outp
#print("with_udh: %s" % b2h(with_udh))
@@ -281,7 +384,7 @@ class SmsOtaTestCase(OtaTestCase):
tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
#print("TPDU: %s" % tpdu)
#print("tpdu: %s" % b2h(tpdu.to_bytes()))
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'])
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'].lower())
# also test decoder
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(kset, outp)
@@ -296,9 +399,12 @@ class SmsOtaTestCase(OtaTestCase):
r, d = self.dialect.decode_resp(kset, t['spi'], t['response']['encoded_resp'])
#print("RESP: %s / %s" % (r, d))
self.assertEqual(r.response_status, t['response']['response_status'])
self.assertEqual(d.number_of_commands, t['response']['number_of_commands'])
self.assertEqual(d.last_status_word, t['response']['last_status_word'])
self.assertEqual(d.last_response_data, t['response']['last_response_data'])
if 'number_of_commands' in t['response']:
self.assertEqual(d.number_of_commands, t['response']['number_of_commands'])
if 'last_status_word' in t['response']:
self.assertEqual(d.last_status_word, t['response']['last_status_word'])
if 'last_response_data' in t['response']:
self.assertEqual(d.last_response_data, t['response']['last_response_data'])
if __name__ == "__main__":
unittest.main()

301
vpcd2smpp.py Executable file
View File

@@ -0,0 +1,301 @@
#!/usr/bin/env python3
#
# This program receive APDUs via the VPCD protocol of Frank Morgner's
# virtualsmartcard, encrypts them with OTA (over the air) keys and
# forwards them via SMPP to a SMSC (SMS service centre).
#
# In other words, you can use it as a poor man's OTA server, to enable
# you to use unmodified application software with PC/SC support to talk
# securely via OTA with a remote SMS card.
#
# This is very much a work in progress at this point.
#######################################################################
# twisted VPCD Library
#######################################################################
import logging
import struct
import abc
from typing import Union, Optional
from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from pySim.utils import b2h, h2b
logger = logging.getLogger(__name__)
class VirtualCard(abc.ABC):
"""Abstract base class for a virtual smart card."""
def __init__(self, atr: Union[str, bytes]):
if isinstance(atr, str):
atr = h2b(atr)
self.atr = atr
@abc.abstractmethod
def power_change(self, new_state: bool):
"""Power the card on or off."""
pass
@abc.abstractmethod
def reset(self):
"""Reset the card."""
pass
@abc.abstractmethod
def rx_c_apdu(self, apdu: bytes):
"""Receive a C-APDU from the reader/application."""
pass
def tx_r_apdu(self, apdu: Union[str, bytes]):
if isinstance(apdu, str):
apdu = h2b(apdu)
logger.info("R-APDU: %s" % b2h(apdu))
self.protocol.send_data(apdu)
class VpcdProtocolBase(Protocol):
# Prefixed couldn't be used as the this.length wouldn't be available in this case
construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)),
'data'/If(this.length > 1, Bytes(this.length)),
'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4)))
def __init__(self, vcard: VirtualCard):
self.recvBuffer = b''
self.connectionCorrupted = False
self.pduReadTimer = None
self.pduReadTimerSecs = 10
self.callLater = reactor.callLater
self.on = False
self.vcard = vcard
self.vcard.protocol = self
def dataReceived(self, data: bytes):
"""entry point where twisted tells us data was received."""
#logger.debug('Data received: %s' % b2h(data))
self.recvBuffer = self.recvBuffer + data
while True:
if self.connectionCorrupted:
return
msg = self.readMessage()
if msg is None:
break
self.endPDURead()
self.rawMessageReceived(msg)
if len(self.recvBuffer) > 0:
self.incompletePDURead()
def incompletePDURead(self):
"""We have an incomplete PDU in readBuffer, schedule pduReadTimer"""
if self.pduReadTimer and self.pduReadTimer.active():
return
self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout)
def endPDURead(self):
"""We completed reading a PDU, cancel the pduReadTimer."""
if self.pduReadTimer and self.pduReadTimer.active():
self.pduReadTimer.cancel()
def readMessage(self) -> Optional[bytes]:
"""read an entire [raw] message."""
pduLen = self._getMessageLength()
if pduLen is None:
return None
return self._getMessage(pduLen)
def _getMessageLength(self) -> Optional[int]:
if len(self.recvBuffer) < 2:
return None
return struct.unpack('!H', self.recvBuffer[:2])[0]
def _getMessage(self, pduLen: int) -> Optional[bytes]:
if len(self.recvBuffer) < pduLen+2:
return None
message = self.recvBuffer[:pduLen+2]
self.recvBuffer = self.recvBuffer[pduLen+2:]
return message
def onPDUReadTimeout(self):
logger.error('PDU read timed out. Buffer is now considered corrupt')
#self.coruptDataReceived
def rawMessageReceived(self, message: bytes):
"""Called once a complete binary vpcd message has been received."""
pdu = None
try:
pdu = VpcdProtocolBase.construct.parse(message)
except Exception as e:
logger.exception(e)
logger.critical('Received corrupt PDU %s' % b2h(message))
#self.corupDataRecvd()
else:
self.PDUReceived(pdu)
def PDUReceived(self, pdu):
logger.debug("Rx PDU: %s" % pdu)
if pdu['data']:
return self.on_rx_data(pdu)
else:
method = getattr(self, 'on_rx_' + pdu['ctrl'])
return method(pdu)
def on_rx_atr(self, pdu):
self.send_data(self.vcard.atr)
def on_rx_on(self, pdu):
if self.on:
return
else:
self.on = True
self.vcard.power_change(self.on)
def on_rx_reset(self, pdu):
self.vcard.reset()
def on_rx_off(self, pdu):
if not self.on:
return
else:
self.on = False
self.vcard.power_change(self.on)
def on_rx_data(self, pdu):
self.vcard.rx_c_apdu(pdu['data'])
def send_pdu(self, pdu):
logger.debug("Sending PDU: %s" % pdu)
encoded = VpcdProtocolBase.construct.build(pdu)
#logger.debug("Sending binary: %s" % b2h(encoded))
self.transport.write(encoded)
def send_data(self, data: Union[str, bytes]):
if isinstance(data, str):
data = h2b(data)
return self.send_pdu({'length': 0, 'ctrl': '', 'data': data})
def send_ctrl(self, ctrl: str):
return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''})
class VpcdProtocolClient(VpcdProtocolBase):
pass
class VpcdClientFactory(ReconnectingClientFactory):
def __init__(self, vcard_class: VirtualCard):
self.vcard_class = vcard_class
def startedConnecting(self, connector):
logger.debug('Started to connect')
def buildProtocol(self, addr):
logger.info('Connection established to %s' % addr)
self.resetDelay()
return VpcdProtocolClient(vcard = self.vcard_class())
def clientConnectionLost(self, connector, reason):
logger.warning('Connection lost (reason: %s)' % reason)
super().clientConnectionLost(connector, reason)
def clientConnectionFailed(self, connector, reason):
logger.warning('Connection failed (reason: %s)' % reason)
super().clientConnectionFailed(connector, reason)
#######################################################################
# Application
#######################################################################
from pprint import pprint as pp
from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator
from twisted.internet import reactor
from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService
from smpp.twisted.protocol import SMPPClientProtocol
from smpp.twisted.config import SMPPClientConfig
from smpp.pdu.operations import SubmitSM, DeliverSM
from smpp.pdu import pdu_types
from pySim.ota import OtaKeyset, OtaDialectSms
from pySim.utils import b2h, h2b
class MyVcard(VirtualCard):
def __init__(self, **kwargs):
super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs)
self.smpp_client = None
# KIC1 + KID1 of 8988211000000467285
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
KID3 = h2b('12110C78E678C25408233076AA033615')
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
self.ota_dialect = OtaDialectSms()
self.tar = h2b('B00011')
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
def ensure_smpp(self):
config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test')
if self.smpp_client:
return
self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp)
smpp = self.smpp_client.connectAndBind()
#self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp)
#d = self.smpp.connectTCP(config.host, config.port)
#d = self.smpp.connectAndBind()
#d.addCallback(self.forwardToClient, self.smpp)
def power_change(self, new_state: bool):
if new_state:
logger.info("POWER ON")
self.ensure_smpp()
else:
logger.info("POWER OFF")
def reset(self):
logger.info("RESET")
def rx_c_apdu(self, apdu: bytes):
pp(self.smpp_client.smpp)
logger.info("C-APDU: %s" % b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
self.tx_sms_tpdu(tpdu)
#self.tx_r_apdu('9000')
def tx_sms_tpdu(self, tpdu: bytes):
"""Send a SMS TPDU via SMPP SubmitSM."""
dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class,
protocol_id=0x7f, short_message=tpdu)
self.smpp_client.smpp.sendDataRequest(submit)
def handleSmpp(self, smpp, pdu):
#logger.info("Received SMPP %s" % pdu)
data = pdu.params['short_message']
#logger.info("Received SMS Data %s" % b2h(data))
r, d = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data)
logger.info("Decoded SMPP %s" % r)
self.tx_r_apdu(r['last_response_data'] + r['last_status_word'])
if __name__ == '__main__':
import logging
logger = logging.getLogger(__name__)
import colorlog
log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
colorlog.basicConfig(level=logging.INFO, format = log_format)
logger = colorlog.getLogger()
from twisted.internet import reactor
host = 'localhost'
port = 35963
reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard))
reactor.run()