Compare commits
5 Commits
laforge/ot
...
laforge/ws
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9bb63a2db | ||
|
|
a01e87da77 | ||
|
|
53e840ad86 | ||
|
|
d15c3d1319 | ||
|
|
671b0f19b6 |
112
contrib/wsrc_card_client.py
Executable file
112
contrib/wsrc_card_client.py
Executable file
@@ -0,0 +1,112 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||||
|
perform commands on it."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import argparse
|
||||||
|
from osmocom.utils import b2h
|
||||||
|
import websockets
|
||||||
|
|
||||||
|
from pySim.transport import init_reader, argparse_add_reader_args, LinkBase
|
||||||
|
from pySim.commands import SimCardCommands
|
||||||
|
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||||
|
from pySim.exceptions import NoCardError
|
||||||
|
from pySim.wsrc import WSRC_DEFAULT_PORT_CARD
|
||||||
|
|
||||||
|
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class CardWsClientBlocking(WsClientBlocking):
|
||||||
|
"""Implementation of the card (reader) client of the WSRC (WebSocket Remote Card) protocol"""
|
||||||
|
|
||||||
|
def __init__(self, ws_uri, tp: LinkBase):
|
||||||
|
super().__init__('card', ws_uri)
|
||||||
|
self.tp = tp
|
||||||
|
|
||||||
|
def perform_outbound_hello(self):
|
||||||
|
hello_data = {
|
||||||
|
'atr': b2h(self.tp.get_atr()),
|
||||||
|
# TODO: include various card information in the HELLO message
|
||||||
|
}
|
||||||
|
super().perform_outbound_hello(hello_data)
|
||||||
|
|
||||||
|
def handle_rx_c_apdu(self, rx: dict):
|
||||||
|
"""handle an inbound APDU transceive command"""
|
||||||
|
data, sw = self.tp.send_apdu(rx['command'])
|
||||||
|
tx = {
|
||||||
|
'response': data,
|
||||||
|
'sw': sw,
|
||||||
|
}
|
||||||
|
self.tx_json('r_apdu', tx)
|
||||||
|
|
||||||
|
def handle_rx_disconnect(self, rx: dict):
|
||||||
|
"""server tells us to disconnect"""
|
||||||
|
self.tx_json('disconnect_ack')
|
||||||
|
# FIXME: tear down connection and/or terminate entire program
|
||||||
|
|
||||||
|
def handle_rx_state_notification(self, rx: dict):
|
||||||
|
logger.info("State Notification: %s" % rx['new_state'])
|
||||||
|
|
||||||
|
def handle_rx_print(self, rx: dict):
|
||||||
|
"""print a message (text) given by server to the local console/log"""
|
||||||
|
logger.info("SERVER MSG: %s" % rx['message'])
|
||||||
|
# no response
|
||||||
|
|
||||||
|
def handle_rx_reset_req(self, rx: dict):
|
||||||
|
"""server tells us to reset the card"""
|
||||||
|
self.tp.reset_card()
|
||||||
|
self.tx_json('reset_resp', {'atr': b2h(self.tp.get_atr())})
|
||||||
|
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
argparse_add_reader_args(parser)
|
||||||
|
parser.add_argument("--uri", default="ws://localhost:%u/" % (WSRC_DEFAULT_PORT_CARD),
|
||||||
|
help="URI of the sever to which to connect")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
opts = parser.parse_args()
|
||||||
|
|
||||||
|
# open the card reader / slot
|
||||||
|
logger.info("Initializing Card Reader...")
|
||||||
|
try:
|
||||||
|
tp = init_reader(opts)
|
||||||
|
except Exception as e:
|
||||||
|
logger.fatal("Error opening reader: %s" % e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
logger.info("Connecting to Card...")
|
||||||
|
try:
|
||||||
|
tp.connect()
|
||||||
|
except NoCardError as e:
|
||||||
|
logger.fatal("Error opening card! Is a card inserted in the reader?")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
scc = SimCardCommands(transport=tp)
|
||||||
|
logger.info("Detected Card with ATR: %s" % b2h(tp.get_atr()))
|
||||||
|
|
||||||
|
# TODO: gather various information about the card; print it
|
||||||
|
|
||||||
|
# create + connect the client to the server
|
||||||
|
cl = CardWsClientBlocking(opts.uri, tp)
|
||||||
|
logger.info("Connecting to remote server...")
|
||||||
|
try:
|
||||||
|
cl.connect()
|
||||||
|
logger.info("Successfully connected to Server")
|
||||||
|
except ConnectionRefusedError as e:
|
||||||
|
logger.fatal(e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# endless loop: wait for inbound command from server + execute it
|
||||||
|
cl.rx_and_execute_cmd()
|
||||||
|
except websockets.exceptions.ConnectionClosedOK as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit(1)
|
||||||
|
except KeyboardInterrupt as e:
|
||||||
|
print(e.__class__.__name__)
|
||||||
|
sys.exit(2)
|
||||||
354
contrib/wsrc_server.py
Executable file
354
contrib/wsrc_server.py
Executable file
@@ -0,0 +1,354 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import json
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import argparse
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
from websockets.asyncio.server import serve
|
||||||
|
from websockets.exceptions import ConnectionClosedError
|
||||||
|
from osmocom.utils import Hexstr, swap_nibbles
|
||||||
|
|
||||||
|
from pySim.utils import SwMatchstr, ResTuple, sw_match, dec_iccid
|
||||||
|
from pySim.exceptions import SwMatchError
|
||||||
|
from pySim.wsrc import WSRC_DEFAULT_PORT_USER, WSRC_DEFAULT_PORT_CARD
|
||||||
|
|
||||||
|
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
card_clients = set()
|
||||||
|
user_clients = set()
|
||||||
|
|
||||||
|
class WsClientLogAdapter(logging.LoggerAdapter):
|
||||||
|
"""LoggerAdapter adding context (for now: remote IP/Port of client) to log"""
|
||||||
|
def process(self, msg, kwargs):
|
||||||
|
return '%s([%s]:%u) %s' % (self.extra['type'], self.extra['remote_addr'][0],
|
||||||
|
self.extra['remote_addr'][1], msg), kwargs
|
||||||
|
|
||||||
|
class WsClient:
|
||||||
|
def __init__(self, websocket, hello: dict):
|
||||||
|
self.websocket = websocket
|
||||||
|
self.hello = hello
|
||||||
|
self.identity = {}
|
||||||
|
self.logger = WsClientLogAdapter(logger, {'type': self.__class__.__name__,
|
||||||
|
'remote_addr': websocket.remote_address})
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return '%s([%s]:%u)' % (self.__class__.__name__, self.websocket.remote_address[0],
|
||||||
|
self.websocket.remote_address[1])
|
||||||
|
|
||||||
|
async def rx_json(self):
|
||||||
|
rx = await self.websocket.recv()
|
||||||
|
rx_js = json.loads(rx)
|
||||||
|
self.logger.debug("Rx: %s", rx_js)
|
||||||
|
assert 'msg_type' in rx
|
||||||
|
return rx_js
|
||||||
|
|
||||||
|
async def tx_json(self, msg_type:str, d: dict = {}):
|
||||||
|
"""Transmit a json-serializable dict to the peer"""
|
||||||
|
d['msg_type'] = msg_type
|
||||||
|
d_js = json.dumps(d)
|
||||||
|
self.logger.debug("Tx: %s", d_js)
|
||||||
|
await self.websocket.send(d_js)
|
||||||
|
|
||||||
|
async def tx_hello_ack(self):
|
||||||
|
await self.tx_json('hello_ack')
|
||||||
|
|
||||||
|
async def xceive_json(self, msg_type:str, d:dict = {}, exp_msg_type:Optional[str] = None) -> dict:
|
||||||
|
await self.tx_json(msg_type, d)
|
||||||
|
rx = await self.rx_json()
|
||||||
|
if exp_msg_type:
|
||||||
|
assert rx['msg_type'] == exp_msg_type
|
||||||
|
return rx;
|
||||||
|
|
||||||
|
async def tx_error(self, message: str):
|
||||||
|
"""Transmit an error message to the peer"""
|
||||||
|
event = {
|
||||||
|
"message": message,
|
||||||
|
}
|
||||||
|
self.logger.error("Transmitting error message: '%s'" % message)
|
||||||
|
await self.tx_json('error', event)
|
||||||
|
|
||||||
|
# async def ws_hdlr(self):
|
||||||
|
# """kind of a 'main' function for the websocket client: wait for incoming message,
|
||||||
|
# and handle it."""
|
||||||
|
# try:
|
||||||
|
# async for message in self.websocket:
|
||||||
|
# method = getattr(self, 'handle_rx_%s' % message['msg_type'], None)
|
||||||
|
# if not method:
|
||||||
|
# await self.tx_error("Unknonw msg_type: %s" % message['msg_type'])
|
||||||
|
# else:
|
||||||
|
# method(message)
|
||||||
|
# except ConnectionClosedError:
|
||||||
|
# # we handle this in the outer loop
|
||||||
|
# pass
|
||||||
|
|
||||||
|
class CardClient(WsClient):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.identity['ATR'] = self.hello['atr']
|
||||||
|
self.state = 'init'
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
eid = self.identity.get('EID', None)
|
||||||
|
if eid:
|
||||||
|
return '%s(EID=%s)' % (self.__class__.__name__, eid)
|
||||||
|
iccid = self.identity.get('ICCID', None)
|
||||||
|
if iccid:
|
||||||
|
return '%s(ICCID=%s)' % (self.__class__.__name__, iccid)
|
||||||
|
return super().__str__()
|
||||||
|
|
||||||
|
def listing_entry(self) -> dict:
|
||||||
|
return {'remote_addr': self.websocket.remote_address,
|
||||||
|
'identities': self.identity,
|
||||||
|
'state': self.state}
|
||||||
|
|
||||||
|
"""A websocket client that represents a reader/card. This is what we use to talk to a card"""
|
||||||
|
async def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||||
|
"""transceive a single APDU with the card"""
|
||||||
|
message = await self.xceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||||
|
return message['response'], message['sw']
|
||||||
|
|
||||||
|
async def xceive_apdu(self, pdu: Hexstr) -> ResTuple:
|
||||||
|
"""transceive an APDU with the card, handling T=0 GET_RESPONSE cases"""
|
||||||
|
prev_pdu = pdu
|
||||||
|
data, sw = await self.xceive_apdu_raw(pdu)
|
||||||
|
|
||||||
|
if sw is not None:
|
||||||
|
while (sw[0:2] in ['9f', '61', '62', '63']):
|
||||||
|
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
|
||||||
|
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
|
||||||
|
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
|
||||||
|
pdu_gr = pdu[0:2] + 'c00000' + sw[2:4]
|
||||||
|
prev_pdu = pdu_gr
|
||||||
|
d, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||||
|
data += d
|
||||||
|
if sw[0:2] == '6c':
|
||||||
|
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
|
||||||
|
pdu_gr = prev_pdu[0:8] + sw[2:4]
|
||||||
|
data, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||||
|
|
||||||
|
return data, sw
|
||||||
|
|
||||||
|
async def xceive_apdu_checksw(self, pdu: Hexstr, sw: SwMatchstr = "9000") -> ResTuple:
|
||||||
|
"""like xceive_apdu, but checking the status word matches the expected pattern"""
|
||||||
|
rv = await self.xceive_apdu(pdu)
|
||||||
|
last_sw = rv[1]
|
||||||
|
|
||||||
|
if not sw_match(rv[1], sw):
|
||||||
|
raise SwMatchError(rv[1], sw.lower())
|
||||||
|
return rv
|
||||||
|
|
||||||
|
async def card_reset(self):
|
||||||
|
"""reset the card"""
|
||||||
|
rx = await self.xceive_json('reset_req', exp_msg_type='reset_resp')
|
||||||
|
self.identity['ATR'] = rx['atr']
|
||||||
|
|
||||||
|
async def notify_state(self):
|
||||||
|
"""notify the card client of a state [change]"""
|
||||||
|
await self.tx_json('state_notification', {'new_state': self.state})
|
||||||
|
|
||||||
|
async def get_iccid(self):
|
||||||
|
"""high-level method to obtain the ICCID of the card"""
|
||||||
|
await self.xceive_apdu_checksw('00a40000023f00') # SELECT MF
|
||||||
|
await self.xceive_apdu_checksw('00a40000022fe2') # SELECT EF.ICCID
|
||||||
|
res, sw = await self.xceive_apdu_checksw('00b0000000') # READ BINARY
|
||||||
|
return dec_iccid(res)
|
||||||
|
|
||||||
|
async def get_eid_sgp22(self):
|
||||||
|
"""high-level method to obtain the EID of a SGP.22 consumer eUICC"""
|
||||||
|
await self.xceive_apdu_checksw('00a4040410a0000005591010ffffffff8900000100')
|
||||||
|
res, sw = await self.xceive_apdu_checksw('80e2910006bf3e035c015a')
|
||||||
|
return res[-32:]
|
||||||
|
|
||||||
|
async def set_state(self, new_state:str):
|
||||||
|
self.logger.info("Card now in '%s' state" % new_state)
|
||||||
|
self.state = new_state
|
||||||
|
await self.notify_state()
|
||||||
|
|
||||||
|
async def identify(self):
|
||||||
|
# identify the card by asking for its EID and/or ICCID
|
||||||
|
try:
|
||||||
|
eid = await self.get_eid_sgp22()
|
||||||
|
self.logger.debug("EID: %s", eid)
|
||||||
|
self.identity['EID'] = eid
|
||||||
|
except SwMatchError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
iccid = await self.get_iccid()
|
||||||
|
self.logger.debug("ICCID: %s", iccid)
|
||||||
|
self.identity['ICCID'] = iccid
|
||||||
|
except SwMatchError:
|
||||||
|
pass
|
||||||
|
await self.set_state('ready')
|
||||||
|
|
||||||
|
async def associate_user(self, user):
|
||||||
|
assert self.state == 'ready'
|
||||||
|
self.user = user
|
||||||
|
await self.set_state('associated')
|
||||||
|
|
||||||
|
async def disassociate_user(self):
|
||||||
|
assert self.user
|
||||||
|
assert self.state == 'associated'
|
||||||
|
await self.set_state('ready')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find_client_for_id(id_type: str, id_str: str) -> Optional['CardClient']:
|
||||||
|
for c in card_clients:
|
||||||
|
if c.state != 'ready':
|
||||||
|
continue
|
||||||
|
c_id = c.identity.get(id_type.upper(), None)
|
||||||
|
if c_id and c_id.lower() == id_str.lower():
|
||||||
|
return c
|
||||||
|
return None
|
||||||
|
|
||||||
|
class UserClient(WsClient):
|
||||||
|
"""A websocket client representing a user application like pySim-shell."""
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.state = 'init'
|
||||||
|
self.card = None
|
||||||
|
|
||||||
|
async def associate_card(self, card: CardClient):
|
||||||
|
assert self.state == 'init'
|
||||||
|
self.card = card
|
||||||
|
self.state = 'associated'
|
||||||
|
|
||||||
|
async def disassociate_card(self):
|
||||||
|
assert self.state == 'associated'
|
||||||
|
self.card = None
|
||||||
|
self.state = 'init'
|
||||||
|
|
||||||
|
async def state_init(self):
|
||||||
|
"""Wait for incoming 'select_card' and process it."""
|
||||||
|
while True:
|
||||||
|
rx = await self.rx_json()
|
||||||
|
if rx['msg_type'] == 'select_card':
|
||||||
|
# look-up if the card can be found
|
||||||
|
card = CardClient.find_client_for_id(rx['id_type'], rx['id_str'])
|
||||||
|
if not card:
|
||||||
|
await self.tx_error('No CardClient found for %s == %s' % (rx['id_type'], rx['id_str']))
|
||||||
|
continue
|
||||||
|
# transition to next statee
|
||||||
|
await card.associate_user(self)
|
||||||
|
await self.associate_card(card)
|
||||||
|
await self.tx_json('select_card_ack', {'identities': card.identity})
|
||||||
|
break
|
||||||
|
elif rx['msg_type'] == 'list_cards':
|
||||||
|
res = [x.listing_entry() for x in card_clients]
|
||||||
|
await self.tx_json('list_cards_ack', {'cards': res})
|
||||||
|
else:
|
||||||
|
self.tx_error('Unknown message type %s' % rx['msg_type'])
|
||||||
|
|
||||||
|
async def state_selected(self):
|
||||||
|
while True:
|
||||||
|
rx = await self.rx_json()
|
||||||
|
if rx['msg_type'] == 'c_apdu':
|
||||||
|
rsp, sw = await self.card.xceive_apdu_raw(rx['command'])
|
||||||
|
await self.tx_json('r_apdu', {'response': rsp, 'sw': sw})
|
||||||
|
elif rx['msg_type'] == 'reset_req':
|
||||||
|
await self.card.card_reset()
|
||||||
|
await self.tx_json('reset_resp')
|
||||||
|
else:
|
||||||
|
self.logger.warning("Unknown/unsupported command '%s' received" % rx['msg_type'])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def card_conn_hdlr(websocket):
|
||||||
|
"""Handler for incoming connection to 'card' port."""
|
||||||
|
# receive first message, which should be a 'hello'
|
||||||
|
rx_raw = await websocket.recv()
|
||||||
|
rx = json.loads(rx_raw)
|
||||||
|
assert rx['msg_type'] == 'hello'
|
||||||
|
client_type = rx['client_type']
|
||||||
|
|
||||||
|
if client_type != 'card':
|
||||||
|
logger.error("Rejecting client (unknown type %s) connection", client_type)
|
||||||
|
raise ValueError("client_type '%s' != expected 'card'" % client_type)
|
||||||
|
|
||||||
|
card = CardClient(websocket, rx)
|
||||||
|
card.logger.info("connection accepted")
|
||||||
|
# first go through identity phase
|
||||||
|
async with asyncio.timeout(30):
|
||||||
|
await card.tx_hello_ack()
|
||||||
|
card.logger.info("hello-handshake completed")
|
||||||
|
card_clients.add(card)
|
||||||
|
# first obtain the identity of the card
|
||||||
|
await card.identify()
|
||||||
|
# then go into the "main loop"
|
||||||
|
try:
|
||||||
|
# wait 'indefinitely'. We cannot call websocket.recv() here, as we will call another
|
||||||
|
# recv() while waiting for the R-APDU after forwarding one from the user client.
|
||||||
|
await websocket.wait_closed()
|
||||||
|
finally:
|
||||||
|
card.logger.info("connection closed")
|
||||||
|
card_clients.remove(card)
|
||||||
|
|
||||||
|
|
||||||
|
async def user_conn_hdlr(websocket):
|
||||||
|
"""Handler for incoming connection to 'user' port."""
|
||||||
|
# receive first message, which should be a 'hello'
|
||||||
|
rx_raw = await websocket.recv()
|
||||||
|
rx = json.loads(rx_raw)
|
||||||
|
assert rx['msg_type'] == 'hello'
|
||||||
|
client_type = rx['client_type']
|
||||||
|
|
||||||
|
if client_type != 'user':
|
||||||
|
logger.error("Rejecting client (unknown type %s) connection", client_type)
|
||||||
|
raise ValueError("client_type '%s' != expected 'card'" % client_type)
|
||||||
|
|
||||||
|
user = UserClient(websocket, rx)
|
||||||
|
user.logger.info("connection accepted")
|
||||||
|
# first go through hello phase
|
||||||
|
async with asyncio.timeout(10):
|
||||||
|
await user.tx_hello_ack()
|
||||||
|
user.logger.info("hello-handshake completed")
|
||||||
|
user_clients.add(user)
|
||||||
|
# first wait for the user to specify the select the card
|
||||||
|
try:
|
||||||
|
await user.state_init()
|
||||||
|
except ConnectionClosedError:
|
||||||
|
user.logger.info("connection closed")
|
||||||
|
user_clients.remove(user)
|
||||||
|
return
|
||||||
|
except asyncio.exceptions.CancelledError: # direct cause of TimeoutError
|
||||||
|
user.logger.error("User failed to transition to 'associated' state within 10s")
|
||||||
|
user_clients.remove(user)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
user.logger.error("Unknown exception %s", e)
|
||||||
|
raise e
|
||||||
|
user_clients.remove(user)
|
||||||
|
return
|
||||||
|
# then perform APDU exchanges without any time limit
|
||||||
|
try:
|
||||||
|
await user.state_selected()
|
||||||
|
except ConnectionClosedError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
user.logger.info("connection closed")
|
||||||
|
await user.card.disassociate_user()
|
||||||
|
await user.disassociate_card()
|
||||||
|
user_clients.remove(user)
|
||||||
|
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="""Osmocom WebSocket Remote Card server""")
|
||||||
|
parser.add_argument('--user-bind-port', type=int, default=WSRC_DEFAULT_PORT_USER,
|
||||||
|
help="port number to bind for connections from users")
|
||||||
|
parser.add_argument('--user-bind-host', type=str, default='localhost',
|
||||||
|
help="local Host/IP to bind for connections from users")
|
||||||
|
parser.add_argument('--card-bind-port', type=int, default=WSRC_DEFAULT_PORT_CARD,
|
||||||
|
help="port number to bind for connections from cards")
|
||||||
|
parser.add_argument('--card-bind-host', type=str, default='localhost',
|
||||||
|
help="local Host/IP to bind for connections from cards")
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
opts = parser.parse_args()
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# we use different ports for user + card connections to ensure they can
|
||||||
|
# have different packet filter rules apply to them
|
||||||
|
async with serve(card_conn_hdlr, opts.card_bind_host, opts.card_bind_port), serve(user_conn_hdlr, opts.user_bind_host, opts.user_bind_port):
|
||||||
|
await asyncio.get_running_loop().create_future() # run forever
|
||||||
|
|
||||||
|
asyncio.run(main())
|
||||||
@@ -43,6 +43,7 @@ pySim consists of several parts:
|
|||||||
legacy
|
legacy
|
||||||
library
|
library
|
||||||
osmo-smdpp
|
osmo-smdpp
|
||||||
|
wsrc
|
||||||
|
|
||||||
|
|
||||||
Indices and tables
|
Indices and tables
|
||||||
|
|||||||
57
docs/wsrc.rst
Normal file
57
docs/wsrc.rst
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
WebSocket Remote Card (WSRC)
|
||||||
|
============================
|
||||||
|
|
||||||
|
WSRC (*Web Socket Remote Card*) is a mechanism by which card readers can be made remotely available
|
||||||
|
via a computer network. The transport mechanism is (as the name implies) a WebSocket. This transport
|
||||||
|
method was chosen to be as firewall/NAT friendly as possible.
|
||||||
|
|
||||||
|
WSRC Network Architecture
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
In a WSRC network, there are three major elements:
|
||||||
|
|
||||||
|
* The **WSRC Card Client** which exposes a locally attached smart card (usually via a Smart Card Reader)
|
||||||
|
to a remote *WSRC Server*
|
||||||
|
* The **WSRC Server** manges incoming connections from both *WSRC Card Clients* as well as *WSRC User Clients*
|
||||||
|
* The **WSRC User Client** is a user application, like for example pySim-shell, which is accessing a remote
|
||||||
|
card by connecting to the *WSRC Server* which relays the information to the selected *WSRC Card Client*
|
||||||
|
|
||||||
|
WSRC Protocol
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The WSRC protocl consits of JSON objects being sent over a websocket. The websocket communication itself
|
||||||
|
is based on HTTP and should usually operate via TLS for security reasons.
|
||||||
|
|
||||||
|
The detailed protocol is currently still WIP. The plan is to document it here.
|
||||||
|
|
||||||
|
|
||||||
|
pySim implementations
|
||||||
|
---------------------
|
||||||
|
|
||||||
|
|
||||||
|
wsrc_server
|
||||||
|
~~~~~~~~~~~~~~~~
|
||||||
|
.. argparse::
|
||||||
|
:filename: ../contrib/wsrc_server.py
|
||||||
|
:func: parser
|
||||||
|
:prog: contrib/wsrc_server.py
|
||||||
|
|
||||||
|
|
||||||
|
wsrc_card_client
|
||||||
|
~~~~~~~~~~~~~~~~
|
||||||
|
.. argparse::
|
||||||
|
:filename: ../contrib/wsrc_card_client.py
|
||||||
|
:func: parser
|
||||||
|
:prog: contrib/wsrc_card_client.py
|
||||||
|
|
||||||
|
pySim-shell
|
||||||
|
~~~~~~~~~~~
|
||||||
|
|
||||||
|
pySim-shell can talk to a remote card via WSRC if you use the *wsrc transport*, for example like this:
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
./pySim-shell.py --wsrc-eid 89882119900000000000000000007280 --wsrc-serer-url ws://localhost:4220/
|
||||||
|
|
||||||
|
|
||||||
|
You can specify `--wsrc-eid` or `--wsrc-iccid` to identify the remote eUICC or UICC you would like to select.
|
||||||
@@ -39,7 +39,7 @@ from pySim.commands import SimCardCommands
|
|||||||
from pySim.transport import init_reader, argparse_add_reader_args
|
from pySim.transport import init_reader, argparse_add_reader_args
|
||||||
from pySim.legacy.cards import _cards_classes, card_detect
|
from pySim.legacy.cards import _cards_classes, card_detect
|
||||||
from pySim.utils import derive_milenage_opc, calculate_luhn, dec_iccid
|
from pySim.utils import derive_milenage_opc, calculate_luhn, dec_iccid
|
||||||
from pySim.ts_51_011 import EF_AD
|
from pySim.profile.ts_51_011 import EF_AD
|
||||||
from pySim.legacy.ts_51_011 import EF
|
from pySim.legacy.ts_51_011 import EF
|
||||||
from pySim.card_handler import *
|
from pySim.card_handler import *
|
||||||
from pySim.utils import *
|
from pySim.utils import *
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ import sys
|
|||||||
|
|
||||||
from osmocom.utils import h2b, h2s, swap_nibbles, rpad
|
from osmocom.utils import h2b, h2s, swap_nibbles, rpad
|
||||||
|
|
||||||
from pySim.ts_51_011 import EF_SST_map, EF_AD
|
from pySim.profile.ts_51_011 import EF_SST_map, EF_AD
|
||||||
from pySim.legacy.ts_51_011 import EF, DF
|
from pySim.legacy.ts_51_011 import EF, DF
|
||||||
from pySim.ts_31_102 import EF_UST_map
|
from pySim.ts_31_102 import EF_UST_map
|
||||||
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map
|
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map
|
||||||
|
|||||||
@@ -60,7 +60,6 @@ from pySim.card_handler import CardHandler, CardHandlerAuto
|
|||||||
from pySim.filesystem import CardMF, CardEF, CardDF, CardADF, LinFixedEF, TransparentEF, BerTlvEF
|
from pySim.filesystem import CardMF, CardEF, CardDF, CardADF, LinFixedEF, TransparentEF, BerTlvEF
|
||||||
from pySim.ts_102_221 import pin_names
|
from pySim.ts_102_221 import pin_names
|
||||||
from pySim.ts_102_222 import Ts102222Commands
|
from pySim.ts_102_222 import Ts102222Commands
|
||||||
from pySim.gsm_r import DF_EIRENE
|
|
||||||
from pySim.cat import ProactiveCommand
|
from pySim.cat import ProactiveCommand
|
||||||
|
|
||||||
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
|
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ from osmocom.utils import JsonEncoder
|
|||||||
from pySim.cards import UiccCardBase
|
from pySim.cards import UiccCardBase
|
||||||
from pySim.commands import SimCardCommands
|
from pySim.commands import SimCardCommands
|
||||||
from pySim.profile import CardProfile
|
from pySim.profile import CardProfile
|
||||||
from pySim.ts_102_221 import CardProfileUICC
|
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||||
from pySim.ts_31_102 import CardApplicationUSIM
|
from pySim.ts_31_102 import CardApplicationUSIM
|
||||||
from pySim.ts_31_103 import CardApplicationISIM
|
from pySim.ts_31_103 import CardApplicationISIM
|
||||||
from pySim.euicc import CardApplicationISDR, CardApplicationECASD
|
from pySim.euicc import CardApplicationISDR, CardApplicationECASD
|
||||||
|
|||||||
@@ -22,8 +22,8 @@ from pySim.filesystem import CardModel, CardApplication
|
|||||||
from pySim.cards import card_detect, SimCardBase, UiccCardBase, CardBase
|
from pySim.cards import card_detect, SimCardBase, UiccCardBase, CardBase
|
||||||
from pySim.runtime import RuntimeState
|
from pySim.runtime import RuntimeState
|
||||||
from pySim.profile import CardProfile
|
from pySim.profile import CardProfile
|
||||||
from pySim.cdma_ruim import CardProfileRUIM
|
from pySim.profile.cdma_ruim import CardProfileRUIM
|
||||||
from pySim.ts_102_221 import CardProfileUICC
|
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||||
from pySim.utils import all_subclasses
|
from pySim.utils import all_subclasses
|
||||||
from pySim.exceptions import SwMatchError
|
from pySim.exceptions import SwMatchError
|
||||||
|
|
||||||
@@ -40,7 +40,7 @@ import pySim.ts_31_103
|
|||||||
import pySim.ts_31_104
|
import pySim.ts_31_104
|
||||||
import pySim.ara_m
|
import pySim.ara_m
|
||||||
import pySim.global_platform
|
import pySim.global_platform
|
||||||
import pySim.euicc
|
import pySim.profile.euicc
|
||||||
|
|
||||||
def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState, SimCardBase]:
|
def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState, SimCardBase]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -25,8 +25,8 @@
|
|||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
from osmocom.utils import *
|
from osmocom.utils import *
|
||||||
|
|
||||||
from pySim.ts_102_221 import EF_DIR, CardProfileUICC
|
from pySim.profile.ts_102_221 import EF_DIR, CardProfileUICC
|
||||||
from pySim.ts_51_011 import DF_GSM
|
from pySim.profile.ts_51_011 import DF_GSM
|
||||||
from pySim.utils import SwHexstr
|
from pySim.utils import SwHexstr
|
||||||
from pySim.commands import Path, SimCardCommands
|
from pySim.commands import Path, SimCardCommands
|
||||||
|
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ from osmocom.tlv import bertlv_encode_len
|
|||||||
from pySim.utils import sw_match, expand_hex, SwHexstr, ResTuple, SwMatchstr
|
from pySim.utils import sw_match, expand_hex, SwHexstr, ResTuple, SwMatchstr
|
||||||
from pySim.exceptions import SwMatchError
|
from pySim.exceptions import SwMatchError
|
||||||
from pySim.transport import LinkBase
|
from pySim.transport import LinkBase
|
||||||
|
from pySim.ts_102_221 import decode_select_response
|
||||||
|
|
||||||
# A path can be either just a FID or a list of FID
|
# A path can be either just a FID or a list of FID
|
||||||
Path = typing.Union[Hexstr, List[Hexstr]]
|
Path = typing.Union[Hexstr, List[Hexstr]]
|
||||||
@@ -176,40 +177,6 @@ class SimCardCommands:
|
|||||||
raise SwMatchError(sw, sw_exp.lower(), self._tp.sw_interpreter)
|
raise SwMatchError(sw, sw_exp.lower(), self._tp.sw_interpreter)
|
||||||
return (rsp, sw)
|
return (rsp, sw)
|
||||||
|
|
||||||
# Extract a single FCP item from TLV
|
|
||||||
def __parse_fcp(self, fcp: Hexstr):
|
|
||||||
# see also: ETSI TS 102 221, chapter 11.1.1.3.1 Response for MF,
|
|
||||||
# DF or ADF
|
|
||||||
from pytlv.TLV import TLV
|
|
||||||
tlvparser = TLV(['82', '83', '84', 'a5', '8a', '8b',
|
|
||||||
'8c', '80', 'ab', 'c6', '81', '88'])
|
|
||||||
|
|
||||||
# pytlv is case sensitive!
|
|
||||||
fcp = fcp.lower()
|
|
||||||
|
|
||||||
if fcp[0:2] != '62':
|
|
||||||
raise ValueError(
|
|
||||||
'Tag of the FCP template does not match, expected 62 but got %s' % fcp[0:2])
|
|
||||||
|
|
||||||
# Unfortunately the spec is not very clear if the FCP length is
|
|
||||||
# coded as one or two byte vale, so we have to try it out by
|
|
||||||
# checking if the length of the remaining TLV string matches
|
|
||||||
# what we get in the length field.
|
|
||||||
# See also ETSI TS 102 221, chapter 11.1.1.3.0 Base coding.
|
|
||||||
# TODO: this likely just is normal BER-TLV ("All data objects are BER-TLV except if otherwise # defined.")
|
|
||||||
exp_tlv_len = int(fcp[2:4], 16)
|
|
||||||
if len(fcp[4:]) // 2 == exp_tlv_len:
|
|
||||||
skip = 4
|
|
||||||
else:
|
|
||||||
exp_tlv_len = int(fcp[2:6], 16)
|
|
||||||
if len(fcp[4:]) // 2 == exp_tlv_len:
|
|
||||||
skip = 6
|
|
||||||
raise ValueError('Cannot determine length of TLV-length')
|
|
||||||
|
|
||||||
# Skip FCP tag and length
|
|
||||||
tlv = fcp[skip:]
|
|
||||||
return tlvparser.parse(tlv)
|
|
||||||
|
|
||||||
# Tell the length of a record by the card response
|
# Tell the length of a record by the card response
|
||||||
# USIMs respond with an FCP template, which is different
|
# USIMs respond with an FCP template, which is different
|
||||||
# from what SIMs responds. See also:
|
# from what SIMs responds. See also:
|
||||||
@@ -217,10 +184,8 @@ class SimCardCommands:
|
|||||||
# SIM: GSM 11.11, chapter 9.2.1 SELECT
|
# SIM: GSM 11.11, chapter 9.2.1 SELECT
|
||||||
def __record_len(self, r) -> int:
|
def __record_len(self, r) -> int:
|
||||||
if self.sel_ctrl == "0004":
|
if self.sel_ctrl == "0004":
|
||||||
tlv_parsed = self.__parse_fcp(r[-1])
|
fcp_parsed = decode_select_response(r[-1])
|
||||||
file_descriptor = tlv_parsed['82']
|
return fcp_parsed['file_descriptor']['record_len']
|
||||||
# See also ETSI TS 102 221, chapter 11.1.1.4.3 File Descriptor
|
|
||||||
return int(file_descriptor[4:8], 16)
|
|
||||||
else:
|
else:
|
||||||
return int(r[-1][28:30], 16)
|
return int(r[-1][28:30], 16)
|
||||||
|
|
||||||
@@ -228,8 +193,8 @@ class SimCardCommands:
|
|||||||
# above.
|
# above.
|
||||||
def __len(self, r) -> int:
|
def __len(self, r) -> int:
|
||||||
if self.sel_ctrl == "0004":
|
if self.sel_ctrl == "0004":
|
||||||
tlv_parsed = self.__parse_fcp(r[-1])
|
fcp_parsed = decode_select_response(r[-1])
|
||||||
return int(tlv_parsed['80'], 16)
|
return fcp_parsed['file_size']
|
||||||
else:
|
else:
|
||||||
return int(r[-1][4:8], 16)
|
return int(r[-1][4:8], 16)
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,6 @@ from osmocom.construct import *
|
|||||||
from pySim.exceptions import SwMatchError
|
from pySim.exceptions import SwMatchError
|
||||||
from pySim.utils import Hexstr, SwHexstr, SwMatchstr
|
from pySim.utils import Hexstr, SwHexstr, SwMatchstr
|
||||||
from pySim.commands import SimCardCommands
|
from pySim.commands import SimCardCommands
|
||||||
from pySim.ts_102_221 import CardProfileUICC
|
|
||||||
import pySim.global_platform
|
import pySim.global_platform
|
||||||
|
|
||||||
# SGP.02 Section 2.2.2
|
# SGP.02 Section 2.2.2
|
||||||
@@ -557,43 +556,3 @@ class CardApplicationECASD(pySim.global_platform.CardApplicationSD):
|
|||||||
@with_default_category('Application-Specific Commands')
|
@with_default_category('Application-Specific Commands')
|
||||||
class AddlShellCommands(CommandSet):
|
class AddlShellCommands(CommandSet):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class CardProfileEuiccSGP32(CardProfileUICC):
|
|
||||||
ORDER = 5
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__(name='IoT eUICC (SGP.32)')
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
|
||||||
# try a command only supported by SGP.32
|
|
||||||
scc.cla_byte = "00"
|
|
||||||
scc.select_adf(AID_ISD_R)
|
|
||||||
CardApplicationISDR.store_data_tlv(scc, GetCertsReq(), GetCertsResp)
|
|
||||||
|
|
||||||
class CardProfileEuiccSGP22(CardProfileUICC):
|
|
||||||
ORDER = 6
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__(name='Consumer eUICC (SGP.22)')
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
|
||||||
# try to read EID from ISD-R
|
|
||||||
scc.cla_byte = "00"
|
|
||||||
scc.select_adf(AID_ISD_R)
|
|
||||||
eid = CardApplicationISDR.get_eid(scc)
|
|
||||||
# TODO: Store EID identity?
|
|
||||||
|
|
||||||
class CardProfileEuiccSGP02(CardProfileUICC):
|
|
||||||
ORDER = 7
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__(name='M2M eUICC (SGP.02)')
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
|
||||||
scc.cla_byte = "00"
|
|
||||||
scc.select_adf(AID_ECASD)
|
|
||||||
scc.get_data(0x5a)
|
|
||||||
# TODO: Store EID identity?
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ from pySim.legacy.ts_51_011 import EF, DF
|
|||||||
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map
|
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map
|
||||||
from pySim.legacy.ts_31_103 import EF_ISIM_ADF_map
|
from pySim.legacy.ts_31_103 import EF_ISIM_ADF_map
|
||||||
|
|
||||||
from pySim.ts_51_011 import EF_AD, EF_SPN
|
from pySim.profile.ts_51_011 import EF_AD, EF_SPN
|
||||||
|
|
||||||
def format_addr(addr: str, addr_type: str) -> str:
|
def format_addr(addr: str, addr_type: str) -> str:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -148,7 +148,7 @@ def dec_st(st, table="sim") -> str:
|
|||||||
from pySim.ts_31_102 import EF_UST_map
|
from pySim.ts_31_102 import EF_UST_map
|
||||||
lookup_map = EF_UST_map
|
lookup_map = EF_UST_map
|
||||||
else:
|
else:
|
||||||
from pySim.ts_51_011 import EF_SST_map
|
from pySim.profile.ts_51_011 import EF_SST_map
|
||||||
lookup_map = EF_SST_map
|
lookup_map = EF_SST_map
|
||||||
|
|
||||||
st_bytes = [st[i:i+2] for i in range(0, len(st), 2)]
|
st_bytes = [st[i:i+2] for i in range(0, len(st), 2)]
|
||||||
|
|||||||
@@ -25,9 +25,9 @@ from osmocom.construct import *
|
|||||||
|
|
||||||
from pySim.filesystem import *
|
from pySim.filesystem import *
|
||||||
from pySim.profile import CardProfile, CardProfileAddon
|
from pySim.profile import CardProfile, CardProfileAddon
|
||||||
from pySim.ts_51_011 import CardProfileSIM
|
from pySim.profile.ts_51_011 import CardProfileSIM
|
||||||
from pySim.ts_51_011 import DF_TELECOM, DF_GSM
|
from pySim.profile.ts_51_011 import DF_TELECOM, DF_GSM
|
||||||
from pySim.ts_51_011 import EF_ServiceTable
|
from pySim.profile.ts_51_011 import EF_ServiceTable
|
||||||
|
|
||||||
|
|
||||||
# Mapping between CDMA Service Number and its description
|
# Mapping between CDMA Service Number and its description
|
||||||
58
pySim/profile/euicc.py
Normal file
58
pySim/profile/euicc.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
# Copyright (C) 2023 Harald Welte <laforge@osmocom.org>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||||
|
from pySim.commands import SimCardCommands
|
||||||
|
from pySim.euicc import CardApplicationISDR, AID_ISD_R, AID_ECASD, GetCertsReq, GetCertsResp
|
||||||
|
|
||||||
|
class CardProfileEuiccSGP32(CardProfileUICC):
|
||||||
|
ORDER = 5
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(name='IoT eUICC (SGP.32)')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||||
|
# try a command only supported by SGP.32
|
||||||
|
scc.cla_byte = "00"
|
||||||
|
scc.select_adf(AID_ISD_R)
|
||||||
|
CardApplicationISDR.store_data_tlv(scc, GetCertsReq(), GetCertsResp)
|
||||||
|
|
||||||
|
class CardProfileEuiccSGP22(CardProfileUICC):
|
||||||
|
ORDER = 6
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(name='Consumer eUICC (SGP.22)')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||||
|
# try to read EID from ISD-R
|
||||||
|
scc.cla_byte = "00"
|
||||||
|
scc.select_adf(AID_ISD_R)
|
||||||
|
eid = CardApplicationISDR.get_eid(scc)
|
||||||
|
# TODO: Store EID identity?
|
||||||
|
|
||||||
|
class CardProfileEuiccSGP02(CardProfileUICC):
|
||||||
|
ORDER = 7
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(name='M2M eUICC (SGP.02)')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||||
|
scc.cla_byte = "00"
|
||||||
|
scc.select_adf(AID_ECASD)
|
||||||
|
scc.get_data(0x5a)
|
||||||
|
# TODO: Store EID identity?
|
||||||
394
pySim/profile/ts_102_221.py
Normal file
394
pySim/profile/ts_102_221.py
Normal file
@@ -0,0 +1,394 @@
|
|||||||
|
# coding=utf-8
|
||||||
|
"""Card Profile of ETSI TS 102 221, the core UICC spec.
|
||||||
|
|
||||||
|
(C) 2021-2024 by Harald Welte <laforge@osmocom.org>
|
||||||
|
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 2 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from construct import Struct, FlagsEnum, GreedyString
|
||||||
|
|
||||||
|
from osmocom.construct import *
|
||||||
|
from osmocom.utils import *
|
||||||
|
from osmocom.tlv import BER_TLV_IE
|
||||||
|
|
||||||
|
from pySim.utils import *
|
||||||
|
from pySim.filesystem import *
|
||||||
|
from pySim.profile import CardProfile
|
||||||
|
from pySim import iso7816_4
|
||||||
|
from pySim.ts_102_221 import decode_select_response, ts_102_22x_cmdset
|
||||||
|
from pySim.ts_102_221 import AM_DO_EF, SC_DO, AdditionalInterfacesSupport, AdditionalTermCapEuicc
|
||||||
|
from pySim.ts_102_221 import TerminalPowerSupply, ExtendedLchanTerminalSupport, TerminalCapability
|
||||||
|
|
||||||
|
# A UICC will usually also support 2G functionality. If this is the case, we
|
||||||
|
# need to add DF_GSM and DF_TELECOM along with the UICC related files
|
||||||
|
from pySim.profile.ts_51_011 import AddonSIM, EF_ICCID, EF_PL
|
||||||
|
from pySim.profile.gsm_r import AddonGSMR
|
||||||
|
from pySim.profile.cdma_ruim import AddonRUIM
|
||||||
|
|
||||||
|
|
||||||
|
# TS 102 221 Section 13.1
|
||||||
|
class EF_DIR(LinFixedEF):
|
||||||
|
_test_de_encode = [
|
||||||
|
( '61294f10a0000000871002ffffffff890709000050055553696d31730ea00c80011781025f608203454150',
|
||||||
|
{ "application_template": [ { "application_id": h2b("a0000000871002ffffffff8907090000") },
|
||||||
|
{ "application_label": "USim1" },
|
||||||
|
{ "discretionary_template": h2b("a00c80011781025f608203454150") } ] }
|
||||||
|
),
|
||||||
|
( '61194f10a0000000871004ffffffff890709000050054953696d31',
|
||||||
|
{ "application_template": [ { "application_id": h2b("a0000000871004ffffffff8907090000") },
|
||||||
|
{ "application_label": "ISim1" } ] }
|
||||||
|
),
|
||||||
|
]
|
||||||
|
class ApplicationLabel(BER_TLV_IE, tag=0x50):
|
||||||
|
# TODO: UCS-2 coding option as per Annex A of TS 102 221
|
||||||
|
_construct = GreedyString('ascii')
|
||||||
|
|
||||||
|
# see https://github.com/PyCQA/pylint/issues/5794
|
||||||
|
#pylint: disable=undefined-variable
|
||||||
|
class ApplicationTemplate(BER_TLV_IE, tag=0x61,
|
||||||
|
nested=[iso7816_4.ApplicationId, ApplicationLabel, iso7816_4.FileReference,
|
||||||
|
iso7816_4.CommandApdu, iso7816_4.DiscretionaryData,
|
||||||
|
iso7816_4.DiscretionaryTemplate, iso7816_4.URL,
|
||||||
|
iso7816_4.ApplicationRelatedDOSet]):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __init__(self, fid='2f00', sfid=0x1e, name='EF.DIR', desc='Application Directory'):
|
||||||
|
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(5, 54))
|
||||||
|
self._tlv = EF_DIR.ApplicationTemplate
|
||||||
|
|
||||||
|
|
||||||
|
# TS 102 221 Section 13.4
|
||||||
|
class EF_ARR(LinFixedEF):
|
||||||
|
_test_de_encode = [
|
||||||
|
( '800101a40683010a950108800106900080016097008401d4a40683010a950108',
|
||||||
|
[ [ { "access_mode": [ "read_search_compare" ] },
|
||||||
|
{ "control_reference_template": "ADM1" } ],
|
||||||
|
[ { "access_mode": [ "write_append", "update_erase" ] },
|
||||||
|
{ "always": None } ],
|
||||||
|
[ { "access_mode": [ "delete_file", "terminate_ef" ] },
|
||||||
|
{ "never": None } ],
|
||||||
|
[ { "command_header": { "INS": 212 } },
|
||||||
|
{ "control_reference_template": "ADM1" } ]
|
||||||
|
] ),
|
||||||
|
( '80010190008001029700800118a40683010a9501088401d4a40683010a950108',
|
||||||
|
[ [ { "access_mode": [ "read_search_compare" ] },
|
||||||
|
{ "always": None } ],
|
||||||
|
[ { "access_mode": [ "update_erase" ] },
|
||||||
|
{ "never": None } ],
|
||||||
|
[ { "access_mode": [ "activate_file_or_record", "deactivate_file_or_record" ] },
|
||||||
|
{ "control_reference_template": "ADM1" } ],
|
||||||
|
[ { "command_header": { "INS": 212 } },
|
||||||
|
{ "control_reference_template": "ADM1" } ]
|
||||||
|
] ),
|
||||||
|
]
|
||||||
|
def __init__(self, fid='2f06', sfid=0x06, name='EF.ARR', desc='Access Rule Reference'):
|
||||||
|
super().__init__(fid, sfid=sfid, name=name, desc=desc)
|
||||||
|
# add those commands to the general commands of a TransparentEF
|
||||||
|
self.shell_commands += [self.AddlShellCommands()]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def flatten(inp: list):
|
||||||
|
"""Flatten the somewhat deep/complex/nested data returned from decoder."""
|
||||||
|
def sc_abbreviate(sc):
|
||||||
|
if 'always' in sc:
|
||||||
|
return 'always'
|
||||||
|
elif 'never' in sc:
|
||||||
|
return 'never'
|
||||||
|
elif 'control_reference_template' in sc:
|
||||||
|
return sc['control_reference_template']
|
||||||
|
else:
|
||||||
|
return sc
|
||||||
|
|
||||||
|
by_mode = {}
|
||||||
|
for t in inp:
|
||||||
|
am = t[0]
|
||||||
|
sc = t[1]
|
||||||
|
sc_abbr = sc_abbreviate(sc)
|
||||||
|
if 'access_mode' in am:
|
||||||
|
for m in am['access_mode']:
|
||||||
|
by_mode[m] = sc_abbr
|
||||||
|
elif 'command_header' in am:
|
||||||
|
ins = am['command_header']['INS']
|
||||||
|
if 'CLA' in am['command_header']:
|
||||||
|
cla = am['command_header']['CLA']
|
||||||
|
else:
|
||||||
|
cla = None
|
||||||
|
cmd = ts_102_22x_cmdset.lookup(ins, cla)
|
||||||
|
if cmd:
|
||||||
|
name = cmd.name.lower().replace(' ', '_')
|
||||||
|
by_mode[name] = sc_abbr
|
||||||
|
else:
|
||||||
|
raise ValueError
|
||||||
|
else:
|
||||||
|
raise ValueError
|
||||||
|
return by_mode
|
||||||
|
|
||||||
|
def _decode_record_bin(self, raw_bin_data, **kwargs):
|
||||||
|
# we can only guess if we should decode for EF or DF here :(
|
||||||
|
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
||||||
|
dec = arr_seq.decode_multi(raw_bin_data)
|
||||||
|
# we cannot pass the result through flatten() here, as we don't have a related
|
||||||
|
# 'un-flattening' decoder, and hence would be unable to encode :(
|
||||||
|
return dec[0]
|
||||||
|
|
||||||
|
def _encode_record_bin(self, in_json, **kwargs):
|
||||||
|
# we can only guess if we should decode for EF or DF here :(
|
||||||
|
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
||||||
|
return arr_seq.encode_multi(in_json)
|
||||||
|
|
||||||
|
@with_default_category('File-Specific Commands')
|
||||||
|
class AddlShellCommands(CommandSet):
|
||||||
|
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
|
||||||
|
def do_read_arr_record(self, opts):
|
||||||
|
"""Read one EF.ARR record in flattened, human-friendly form."""
|
||||||
|
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
|
||||||
|
data = self._cmd.lchan.selected_file.flatten(data)
|
||||||
|
self._cmd.poutput_json(data, opts.oneline)
|
||||||
|
|
||||||
|
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_recs_dec_parser)
|
||||||
|
def do_read_arr_records(self, opts):
|
||||||
|
"""Read + decode all EF.ARR records in flattened, human-friendly form."""
|
||||||
|
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
|
||||||
|
# collect all results in list so they are rendered as JSON list when printing
|
||||||
|
data_list = []
|
||||||
|
for recnr in range(1, 1 + num_of_rec):
|
||||||
|
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
|
||||||
|
data = self._cmd.lchan.selected_file.flatten(data)
|
||||||
|
data_list.append(data)
|
||||||
|
self._cmd.poutput_json(data_list, opts.oneline)
|
||||||
|
|
||||||
|
|
||||||
|
# TS 102 221 Section 13.6
|
||||||
|
class EF_UMPC(TransparentEF):
|
||||||
|
_test_de_encode = [
|
||||||
|
( '3cff02', { "max_current_mA": 60, "t_op_s": 255,
|
||||||
|
"addl_info": { "req_inc_idle_current": False, "support_uicc_suspend": True } } ),
|
||||||
|
( '320500', { "max_current_mA": 50, "t_op_s": 5, "addl_info": {"req_inc_idle_current": False,
|
||||||
|
"support_uicc_suspend": False } } ),
|
||||||
|
]
|
||||||
|
def __init__(self, fid='2f08', sfid=0x08, name='EF.UMPC', desc='UICC Maximum Power Consumption'):
|
||||||
|
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=(5, 5))
|
||||||
|
addl_info = FlagsEnum(Byte, req_inc_idle_current=1,
|
||||||
|
support_uicc_suspend=2)
|
||||||
|
self._construct = Struct(
|
||||||
|
'max_current_mA'/Int8ub, 't_op_s'/Int8ub, 'addl_info'/addl_info)
|
||||||
|
|
||||||
|
|
||||||
|
class CardProfileUICC(CardProfile):
|
||||||
|
|
||||||
|
ORDER = 10
|
||||||
|
|
||||||
|
def __init__(self, name='UICC'):
|
||||||
|
files = [
|
||||||
|
EF_DIR(),
|
||||||
|
EF_ICCID(),
|
||||||
|
EF_PL(),
|
||||||
|
EF_ARR(),
|
||||||
|
# FIXME: DF.CD
|
||||||
|
EF_UMPC(),
|
||||||
|
]
|
||||||
|
addons = [
|
||||||
|
AddonSIM,
|
||||||
|
AddonGSMR,
|
||||||
|
AddonRUIM,
|
||||||
|
]
|
||||||
|
sw = {
|
||||||
|
'Normal': {
|
||||||
|
'9000': 'Normal ending of the command',
|
||||||
|
'91xx': 'Normal ending of the command, with extra information from the proactive UICC containing a command for the terminal',
|
||||||
|
'92xx': 'Normal ending of the command, with extra information concerning an ongoing data transfer session',
|
||||||
|
},
|
||||||
|
'Postponed processing': {
|
||||||
|
'9300': 'SIM Application Toolkit is busy. Command cannot be executed at present, further normal commands are allowed',
|
||||||
|
},
|
||||||
|
'Warnings': {
|
||||||
|
'6200': 'No information given, state of non-volatile memory unchanged',
|
||||||
|
'6281': 'Part of returned data may be corrupted',
|
||||||
|
'6282': 'End of file/record reached before reading Le bytes or unsuccessful search',
|
||||||
|
'6283': 'Selected file invalidated/disabled; needs to be activated before use',
|
||||||
|
'6284': 'Selected file in termination state',
|
||||||
|
'62f1': 'More data available',
|
||||||
|
'62f2': 'More data available and proactive command pending',
|
||||||
|
'62f3': 'Response data available',
|
||||||
|
'63f1': 'More data expected',
|
||||||
|
'63f2': 'More data expected and proactive command pending',
|
||||||
|
'63cx': 'Command successful but after using an internal update retry routine X times',
|
||||||
|
},
|
||||||
|
'Execution errors': {
|
||||||
|
'6400': 'No information given, state of non-volatile memory unchanged',
|
||||||
|
'6500': 'No information given, state of non-volatile memory changed',
|
||||||
|
'6581': 'Memory problem',
|
||||||
|
},
|
||||||
|
'Checking errors': {
|
||||||
|
'6700': 'Wrong length',
|
||||||
|
'67xx': 'The interpretation of this status word is command dependent',
|
||||||
|
'6b00': 'Wrong parameter(s) P1-P2',
|
||||||
|
'6d00': 'Instruction code not supported or invalid',
|
||||||
|
'6e00': 'Class not supported',
|
||||||
|
'6f00': 'Technical problem, no precise diagnosis',
|
||||||
|
'6fxx': 'The interpretation of this status word is command dependent',
|
||||||
|
},
|
||||||
|
'Functions in CLA not supported': {
|
||||||
|
'6800': 'No information given',
|
||||||
|
'6881': 'Logical channel not supported',
|
||||||
|
'6882': 'Secure messaging not supported',
|
||||||
|
},
|
||||||
|
'Command not allowed': {
|
||||||
|
'6900': 'No information given',
|
||||||
|
'6981': 'Command incompatible with file structure',
|
||||||
|
'6982': 'Security status not satisfied',
|
||||||
|
'6983': 'Authentication/PIN method blocked',
|
||||||
|
'6984': 'Referenced data invalidated',
|
||||||
|
'6985': 'Conditions of use not satisfied',
|
||||||
|
'6986': 'Command not allowed (no EF selected)',
|
||||||
|
'6989': 'Command not allowed - secure channel - security not satisfied',
|
||||||
|
},
|
||||||
|
'Wrong parameters': {
|
||||||
|
'6a80': 'Incorrect parameters in the data field',
|
||||||
|
'6a81': 'Function not supported',
|
||||||
|
'6a82': 'File not found',
|
||||||
|
'6a83': 'Record not found',
|
||||||
|
'6a84': 'Not enough memory space',
|
||||||
|
'6a86': 'Incorrect parameters P1 to P2',
|
||||||
|
'6a87': 'Lc inconsistent with P1 to P2',
|
||||||
|
'6a88': 'Referenced data not found',
|
||||||
|
},
|
||||||
|
'Application errors': {
|
||||||
|
'9850': 'INCREASE cannot be performed, max value reached',
|
||||||
|
'9862': 'Authentication error, application specific',
|
||||||
|
'9863': 'Security session or association expired',
|
||||||
|
'9864': 'Minimum UICC suspension time is too long',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
super().__init__(name, desc='ETSI TS 102 221', cla="00",
|
||||||
|
sel_ctrl="0004", files_in_mf=files, sw=sw,
|
||||||
|
shell_cmdsets = [self.AddlShellCommands()], addons = addons)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def decode_select_response(data_hex: str) -> object:
|
||||||
|
"""ETSI TS 102 221 Section 11.1.1.3"""
|
||||||
|
return decode_select_response(data_hex)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||||
|
""" Try to access MF via UICC APDUs (3GPP TS 102.221), if this works, the
|
||||||
|
card is considered a UICC card."""
|
||||||
|
cls._mf_select_test(scc, "00", "0004", ["3f00"])
|
||||||
|
|
||||||
|
@with_default_category('TS 102 221 Specific Commands')
|
||||||
|
class AddlShellCommands(CommandSet):
|
||||||
|
suspend_uicc_parser = argparse.ArgumentParser()
|
||||||
|
suspend_uicc_parser.add_argument('--min-duration-secs', type=int, default=60,
|
||||||
|
help='Proposed minimum duration of suspension')
|
||||||
|
suspend_uicc_parser.add_argument('--max-duration-secs', type=int, default=24*60*60,
|
||||||
|
help='Proposed maximum duration of suspension')
|
||||||
|
|
||||||
|
# not ISO7816-4 but TS 102 221
|
||||||
|
@cmd2.with_argparser(suspend_uicc_parser)
|
||||||
|
def do_suspend_uicc(self, opts):
|
||||||
|
"""Perform the SUSPEND UICC command. Only supported on some UICC (check EF.UMPC)."""
|
||||||
|
(duration, token, sw) = self._cmd.card._scc.suspend_uicc(min_len_secs=opts.min_duration_secs,
|
||||||
|
max_len_secs=opts.max_duration_secs)
|
||||||
|
self._cmd.poutput(
|
||||||
|
'Negotiated Duration: %u secs, Token: %s, SW: %s' % (duration, token, sw))
|
||||||
|
|
||||||
|
resume_uicc_parser = argparse.ArgumentParser()
|
||||||
|
resume_uicc_parser.add_argument('TOKEN', type=str, help='Token provided during SUSPEND')
|
||||||
|
|
||||||
|
@cmd2.with_argparser(resume_uicc_parser)
|
||||||
|
def do_resume_uicc(self, opts):
|
||||||
|
"""Perform the REUSME UICC operation. Only supported on some UICC. Also: A power-cycle
|
||||||
|
of the card is required between SUSPEND and RESUME, and only very few non-RESUME
|
||||||
|
commands are permitted between SUSPEND and RESUME. See TS 102 221 Section 11.1.22."""
|
||||||
|
self._cmd.card._scc.resume_uicc(opts.TOKEN)
|
||||||
|
|
||||||
|
term_cap_parser = argparse.ArgumentParser()
|
||||||
|
# power group
|
||||||
|
tc_power_grp = term_cap_parser.add_argument_group('Terminal Power Supply')
|
||||||
|
tc_power_grp.add_argument('--used-supply-voltage-class', type=str, choices=['a','b','c','d','e'],
|
||||||
|
help='Actual used Supply voltage class')
|
||||||
|
tc_power_grp.add_argument('--maximum-available-power-supply', type=auto_uint8,
|
||||||
|
help='Maximum available power supply of the terminal')
|
||||||
|
tc_power_grp.add_argument('--actual-used-freq-100k', type=auto_uint8,
|
||||||
|
help='Actual used clock frequency (in units of 100kHz)')
|
||||||
|
# no separate groups for those two
|
||||||
|
tc_elc_grp = term_cap_parser.add_argument_group('Extended logical channels terminal support')
|
||||||
|
tc_elc_grp.add_argument('--extended-logical-channel', action='store_true',
|
||||||
|
help='Extended Logical Channel supported')
|
||||||
|
tc_aif_grp = term_cap_parser.add_argument_group('Additional interfaces support')
|
||||||
|
tc_aif_grp.add_argument('--uicc-clf', action='store_true',
|
||||||
|
help='Local User Interface in the Device (LUId) supported')
|
||||||
|
# eUICC group
|
||||||
|
tc_euicc_grp = term_cap_parser.add_argument_group('Additional Terminal capability indications related to eUICC')
|
||||||
|
tc_euicc_grp.add_argument('--lui-d', action='store_true',
|
||||||
|
help='Local User Interface in the Device (LUId) supported')
|
||||||
|
tc_euicc_grp.add_argument('--lpd-d', action='store_true',
|
||||||
|
help='Local Profile Download in the Device (LPDd) supported')
|
||||||
|
tc_euicc_grp.add_argument('--lds-d', action='store_true',
|
||||||
|
help='Local Discovery Service in the Device (LPDd) supported')
|
||||||
|
tc_euicc_grp.add_argument('--lui-e-scws', action='store_true',
|
||||||
|
help='LUIe based on SCWS supported')
|
||||||
|
tc_euicc_grp.add_argument('--metadata-update-alerting', action='store_true',
|
||||||
|
help='Metadata update alerting supported')
|
||||||
|
tc_euicc_grp.add_argument('--enterprise-capable-device', action='store_true',
|
||||||
|
help='Enterprise Capable Device')
|
||||||
|
tc_euicc_grp.add_argument('--lui-e-e4e', action='store_true',
|
||||||
|
help='LUIe using E4E (ENVELOPE tag E4) supported')
|
||||||
|
tc_euicc_grp.add_argument('--lpr', action='store_true',
|
||||||
|
help='LPR (LPA Proxy) supported')
|
||||||
|
|
||||||
|
@cmd2.with_argparser(term_cap_parser)
|
||||||
|
def do_terminal_capability(self, opts):
|
||||||
|
"""Perform the TERMINAL CAPABILITY function. Used to inform the UICC about terminal capability."""
|
||||||
|
ps_flags = {}
|
||||||
|
addl_if_flags = {}
|
||||||
|
euicc_flags = {}
|
||||||
|
|
||||||
|
opts_dict = vars(opts)
|
||||||
|
|
||||||
|
power_items = ['used_supply_voltage_class', 'maximum_available_power_supply', 'actual_used_freq_100k']
|
||||||
|
if any(opts_dict[x] for x in power_items):
|
||||||
|
if not all(opts_dict[x] for x in power_items):
|
||||||
|
raise argparse.ArgumentTypeError('If any of the Terminal Power Supply group options are used, all must be specified')
|
||||||
|
|
||||||
|
for k, v in opts_dict.items():
|
||||||
|
if k in AdditionalInterfacesSupport._construct.flags.keys():
|
||||||
|
addl_if_flags[k] = v
|
||||||
|
elif k in AdditionalTermCapEuicc._construct.flags.keys():
|
||||||
|
euicc_flags[k] = v
|
||||||
|
elif k in [f.name for f in TerminalPowerSupply._construct.subcons]:
|
||||||
|
if k == 'used_supply_voltage_class' and v:
|
||||||
|
v = {v: True}
|
||||||
|
ps_flags[k] = v
|
||||||
|
|
||||||
|
child_list = []
|
||||||
|
if any(x for x in ps_flags.values()):
|
||||||
|
child_list.append(TerminalPowerSupply(decoded=ps_flags))
|
||||||
|
|
||||||
|
if opts.extended_logical_channel:
|
||||||
|
child_list.append(ExtendedLchanTerminalSupport())
|
||||||
|
if any(x for x in addl_if_flags.values()):
|
||||||
|
child_list.append(AdditionalInterfacesSupport(decoded=addl_if_flags))
|
||||||
|
if any(x for x in euicc_flags.values()):
|
||||||
|
child_list.append(AdditionalTermCapEuicc(decoded=euicc_flags))
|
||||||
|
|
||||||
|
print(child_list)
|
||||||
|
tc = TerminalCapability(children=child_list)
|
||||||
|
self.terminal_capability(b2h(tc.to_tlv()))
|
||||||
|
|
||||||
|
def terminal_capability(self, data:Hexstr):
|
||||||
|
cmd_hex = "80AA0000%02x%s" % (len(data)//2, data)
|
||||||
|
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
|
||||||
@@ -43,7 +43,7 @@ from pySim.utils import dec_iccid, enc_iccid, dec_imsi, enc_imsi, dec_plmn, enc_
|
|||||||
from pySim.profile import CardProfile, CardProfileAddon
|
from pySim.profile import CardProfile, CardProfileAddon
|
||||||
from pySim.filesystem import *
|
from pySim.filesystem import *
|
||||||
from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X
|
from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X
|
||||||
from pySim.gsm_r import AddonGSMR
|
from pySim.profile.gsm_r import AddonGSMR
|
||||||
|
|
||||||
# Mapping between SIM Service Number and its description
|
# Mapping between SIM Service Number and its description
|
||||||
EF_SST_map = {
|
EF_SST_map = {
|
||||||
@@ -25,6 +25,7 @@ from osmocom.utils import *
|
|||||||
from osmocom.construct import *
|
from osmocom.construct import *
|
||||||
|
|
||||||
from pySim.filesystem import *
|
from pySim.filesystem import *
|
||||||
|
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||||
from pySim.runtime import RuntimeState
|
from pySim.runtime import RuntimeState
|
||||||
import pySim
|
import pySim
|
||||||
|
|
||||||
@@ -180,7 +181,7 @@ class DF_SYSTEM(CardDF):
|
|||||||
self.add_files(files)
|
self.add_files(files)
|
||||||
|
|
||||||
def decode_select_response(self, resp_hex):
|
def decode_select_response(self, resp_hex):
|
||||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(resp_hex)
|
return CardProfileUICC.decode_select_response(resp_hex)
|
||||||
|
|
||||||
|
|
||||||
class EF_USIM_SQN(TransparentEF):
|
class EF_USIM_SQN(TransparentEF):
|
||||||
|
|||||||
@@ -328,11 +328,13 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
|||||||
from pySim.transport.pcsc import PcscSimLink
|
from pySim.transport.pcsc import PcscSimLink
|
||||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||||
from pySim.transport.calypso import CalypsoSimLink
|
from pySim.transport.calypso import CalypsoSimLink
|
||||||
|
from pySim.transport.wsrc import WsrcSimLink
|
||||||
|
|
||||||
SerialSimLink.argparse_add_reader_args(arg_parser)
|
SerialSimLink.argparse_add_reader_args(arg_parser)
|
||||||
PcscSimLink.argparse_add_reader_args(arg_parser)
|
PcscSimLink.argparse_add_reader_args(arg_parser)
|
||||||
ModemATCommandLink.argparse_add_reader_args(arg_parser)
|
ModemATCommandLink.argparse_add_reader_args(arg_parser)
|
||||||
CalypsoSimLink.argparse_add_reader_args(arg_parser)
|
CalypsoSimLink.argparse_add_reader_args(arg_parser)
|
||||||
|
WsrcSimLink.argparse_add_reader_args(arg_parser)
|
||||||
arg_parser.add_argument('--apdu-trace', action='store_true',
|
arg_parser.add_argument('--apdu-trace', action='store_true',
|
||||||
help='Trace the command/response APDUs exchanged with the card')
|
help='Trace the command/response APDUs exchanged with the card')
|
||||||
|
|
||||||
@@ -355,6 +357,9 @@ def init_reader(opts, **kwargs) -> LinkBase:
|
|||||||
elif opts.modem_dev is not None:
|
elif opts.modem_dev is not None:
|
||||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||||
sl = ModemATCommandLink(opts, **kwargs)
|
sl = ModemATCommandLink(opts, **kwargs)
|
||||||
|
elif opts.wsrc_server_url is not None:
|
||||||
|
from pySim.transport.wsrc import WsrcSimLink
|
||||||
|
sl = WsrcSimLink(opts, **kwargs)
|
||||||
else: # Serial reader is default
|
else: # Serial reader is default
|
||||||
print("No reader/driver specified; falling back to default (Serial reader)")
|
print("No reader/driver specified; falling back to default (Serial reader)")
|
||||||
from pySim.transport.serial import SerialSimLink
|
from pySim.transport.serial import SerialSimLink
|
||||||
|
|||||||
99
pySim/transport/wsrc.py
Normal file
99
pySim/transport/wsrc.py
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
|
||||||
|
|
||||||
|
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
|
||||||
|
from pySim.transport import LinkBase
|
||||||
|
from pySim.utils import ResTuple
|
||||||
|
from pySim.wsrc import WSRC_DEFAULT_PORT_USER
|
||||||
|
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||||
|
|
||||||
|
class UserWsClientBlocking(WsClientBlocking):
|
||||||
|
def __init__(self, ws_uri: str, **kwargs):
|
||||||
|
super().__init__('user', ws_uri, **kwargs)
|
||||||
|
|
||||||
|
def select_card(self, id_type:str, id_str:str):
|
||||||
|
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
|
||||||
|
'select_card_ack')
|
||||||
|
return rx
|
||||||
|
|
||||||
|
def reset_card(self):
|
||||||
|
self.transceive_json('reset_req', {}, 'reset_resp')
|
||||||
|
|
||||||
|
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||||
|
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||||
|
return rx['response'], rx['sw']
|
||||||
|
|
||||||
|
|
||||||
|
class WsrcSimLink(LinkBase):
|
||||||
|
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
|
||||||
|
name = 'WSRC'
|
||||||
|
|
||||||
|
def __init__(self, opts: argparse.Namespace, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.identities = {}
|
||||||
|
self.server_url = opts.wsrc_server_url
|
||||||
|
if opts.wsrc_eid:
|
||||||
|
self.id_type = 'eid'
|
||||||
|
self.id_str = opts.wsrc_eid
|
||||||
|
elif opts.wsrc_iccid:
|
||||||
|
self.id_type = 'iccid'
|
||||||
|
self.id_str = opts.wsrc_iccid
|
||||||
|
self.client = UserWsClientBlocking(self.server_url)
|
||||||
|
self.client.connect()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
# FIXME: disconnect from server
|
||||||
|
pass
|
||||||
|
|
||||||
|
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
|
||||||
|
self.connect()
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
rx = self.client.select_card(self.id_type, self.id_str)
|
||||||
|
self.identities = rx['identities']
|
||||||
|
|
||||||
|
def get_atr(self) -> Hexstr:
|
||||||
|
return h2i(self.identities['ATR'])
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
self.__delete__()
|
||||||
|
|
||||||
|
def _reset_card(self):
|
||||||
|
self.client.reset_card()
|
||||||
|
return 1
|
||||||
|
|
||||||
|
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
|
||||||
|
return self.client.xceive_apdu_raw(pdu)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
||||||
|
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
|
||||||
|
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
|
||||||
|
can be accessed via a network.""")
|
||||||
|
wsrc_group.add_argument('--wsrc-server-url', default='ws://localhost:%u' % WSRC_DEFAULT_PORT_USER,
|
||||||
|
help='URI of the WSRC server to connect to')
|
||||||
|
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
|
||||||
|
help='ICCID of the card to open via WSRC')
|
||||||
|
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
|
||||||
|
help='EID of the card to open via WSRC')
|
||||||
@@ -18,22 +18,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||||||
"""
|
"""
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
|
|
||||||
from construct import Select, Const, Bit, Struct, Int16ub, FlagsEnum, GreedyString, ValidationError
|
from construct import Select, Const, Bit, Struct, Int16ub, FlagsEnum, ValidationError
|
||||||
from construct import Optional as COptional, Computed
|
from construct import Optional as COptional, Computed
|
||||||
|
|
||||||
from osmocom.construct import *
|
from osmocom.construct import *
|
||||||
from osmocom.utils import *
|
from osmocom.utils import *
|
||||||
from osmocom.tlv import *
|
from osmocom.tlv import BER_TLV_IE, flatten_dict_lists
|
||||||
from pySim.utils import *
|
from pySim.utils import *
|
||||||
from pySim.filesystem import *
|
|
||||||
from pySim.profile import CardProfile
|
|
||||||
from pySim import iso7816_4
|
|
||||||
|
|
||||||
# A UICC will usually also support 2G functionality. If this is the case, we
|
#from pySim.filesystem import *
|
||||||
# need to add DF_GSM and DF_TELECOM along with the UICC related files
|
#from pySim.profile import CardProfile
|
||||||
from pySim.ts_51_011 import AddonSIM, EF_ICCID, EF_PL
|
|
||||||
from pySim.gsm_r import AddonGSMR
|
|
||||||
from pySim.cdma_ruim import AddonRUIM
|
|
||||||
|
|
||||||
ts_102_22x_cmdset = CardCommandSet('TS 102 22x', [
|
ts_102_22x_cmdset = CardCommandSet('TS 102 22x', [
|
||||||
# TS 102 221 Section 10.1.2 Table 10.5 "Coding of Instruction Byte"
|
# TS 102 221 Section 10.1.2 Table 10.5 "Coding of Instruction Byte"
|
||||||
@@ -288,6 +282,12 @@ class FcpTemplate(BER_TLV_IE, tag=0x62, nested=[FileSize, TotalFileSize, FileDes
|
|||||||
PinStatusTemplate_DO]):
|
PinStatusTemplate_DO]):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def decode_select_response(data_hex: str) -> object:
|
||||||
|
"""ETSI TS 102 221 Section 11.1.1.3"""
|
||||||
|
t = FcpTemplate()
|
||||||
|
t.from_tlv(h2b(data_hex))
|
||||||
|
d = t.to_dict()
|
||||||
|
return flatten_dict_lists(d['fcp_template'])
|
||||||
|
|
||||||
def tlv_key_replace(inmap, indata):
|
def tlv_key_replace(inmap, indata):
|
||||||
def newkey(inmap, key):
|
def newkey(inmap, key):
|
||||||
@@ -634,361 +634,3 @@ NOT_DO = Nested_DO('not', 0xaf, NOT_Template)
|
|||||||
SC_DO = DataObjectChoice('security_condition', 'Security Condition',
|
SC_DO = DataObjectChoice('security_condition', 'Security Condition',
|
||||||
members=[Always_DO, Never_DO, SecCondByte_DO(), SecCondByte_DO(0x9e), CRT_DO(),
|
members=[Always_DO, Never_DO, SecCondByte_DO(), SecCondByte_DO(0x9e), CRT_DO(),
|
||||||
OR_DO, AND_DO, NOT_DO])
|
OR_DO, AND_DO, NOT_DO])
|
||||||
|
|
||||||
# TS 102 221 Section 13.1
|
|
||||||
class EF_DIR(LinFixedEF):
|
|
||||||
_test_de_encode = [
|
|
||||||
( '61294f10a0000000871002ffffffff890709000050055553696d31730ea00c80011781025f608203454150',
|
|
||||||
{ "application_template": [ { "application_id": h2b("a0000000871002ffffffff8907090000") },
|
|
||||||
{ "application_label": "USim1" },
|
|
||||||
{ "discretionary_template": h2b("a00c80011781025f608203454150") } ] }
|
|
||||||
),
|
|
||||||
( '61194f10a0000000871004ffffffff890709000050054953696d31',
|
|
||||||
{ "application_template": [ { "application_id": h2b("a0000000871004ffffffff8907090000") },
|
|
||||||
{ "application_label": "ISim1" } ] }
|
|
||||||
),
|
|
||||||
]
|
|
||||||
class ApplicationLabel(BER_TLV_IE, tag=0x50):
|
|
||||||
# TODO: UCS-2 coding option as per Annex A of TS 102 221
|
|
||||||
_construct = GreedyString('ascii')
|
|
||||||
|
|
||||||
# see https://github.com/PyCQA/pylint/issues/5794
|
|
||||||
#pylint: disable=undefined-variable
|
|
||||||
class ApplicationTemplate(BER_TLV_IE, tag=0x61,
|
|
||||||
nested=[iso7816_4.ApplicationId, ApplicationLabel, iso7816_4.FileReference,
|
|
||||||
iso7816_4.CommandApdu, iso7816_4.DiscretionaryData,
|
|
||||||
iso7816_4.DiscretionaryTemplate, iso7816_4.URL,
|
|
||||||
iso7816_4.ApplicationRelatedDOSet]):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __init__(self, fid='2f00', sfid=0x1e, name='EF.DIR', desc='Application Directory'):
|
|
||||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(5, 54))
|
|
||||||
self._tlv = EF_DIR.ApplicationTemplate
|
|
||||||
|
|
||||||
|
|
||||||
# TS 102 221 Section 13.4
|
|
||||||
class EF_ARR(LinFixedEF):
|
|
||||||
_test_de_encode = [
|
|
||||||
( '800101a40683010a950108800106900080016097008401d4a40683010a950108',
|
|
||||||
[ [ { "access_mode": [ "read_search_compare" ] },
|
|
||||||
{ "control_reference_template": "ADM1" } ],
|
|
||||||
[ { "access_mode": [ "write_append", "update_erase" ] },
|
|
||||||
{ "always": None } ],
|
|
||||||
[ { "access_mode": [ "delete_file", "terminate_ef" ] },
|
|
||||||
{ "never": None } ],
|
|
||||||
[ { "command_header": { "INS": 212 } },
|
|
||||||
{ "control_reference_template": "ADM1" } ]
|
|
||||||
] ),
|
|
||||||
( '80010190008001029700800118a40683010a9501088401d4a40683010a950108',
|
|
||||||
[ [ { "access_mode": [ "read_search_compare" ] },
|
|
||||||
{ "always": None } ],
|
|
||||||
[ { "access_mode": [ "update_erase" ] },
|
|
||||||
{ "never": None } ],
|
|
||||||
[ { "access_mode": [ "activate_file_or_record", "deactivate_file_or_record" ] },
|
|
||||||
{ "control_reference_template": "ADM1" } ],
|
|
||||||
[ { "command_header": { "INS": 212 } },
|
|
||||||
{ "control_reference_template": "ADM1" } ]
|
|
||||||
] ),
|
|
||||||
]
|
|
||||||
def __init__(self, fid='2f06', sfid=0x06, name='EF.ARR', desc='Access Rule Reference'):
|
|
||||||
super().__init__(fid, sfid=sfid, name=name, desc=desc)
|
|
||||||
# add those commands to the general commands of a TransparentEF
|
|
||||||
self.shell_commands += [self.AddlShellCommands()]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def flatten(inp: list):
|
|
||||||
"""Flatten the somewhat deep/complex/nested data returned from decoder."""
|
|
||||||
def sc_abbreviate(sc):
|
|
||||||
if 'always' in sc:
|
|
||||||
return 'always'
|
|
||||||
elif 'never' in sc:
|
|
||||||
return 'never'
|
|
||||||
elif 'control_reference_template' in sc:
|
|
||||||
return sc['control_reference_template']
|
|
||||||
else:
|
|
||||||
return sc
|
|
||||||
|
|
||||||
by_mode = {}
|
|
||||||
for t in inp:
|
|
||||||
am = t[0]
|
|
||||||
sc = t[1]
|
|
||||||
sc_abbr = sc_abbreviate(sc)
|
|
||||||
if 'access_mode' in am:
|
|
||||||
for m in am['access_mode']:
|
|
||||||
by_mode[m] = sc_abbr
|
|
||||||
elif 'command_header' in am:
|
|
||||||
ins = am['command_header']['INS']
|
|
||||||
if 'CLA' in am['command_header']:
|
|
||||||
cla = am['command_header']['CLA']
|
|
||||||
else:
|
|
||||||
cla = None
|
|
||||||
cmd = ts_102_22x_cmdset.lookup(ins, cla)
|
|
||||||
if cmd:
|
|
||||||
name = cmd.name.lower().replace(' ', '_')
|
|
||||||
by_mode[name] = sc_abbr
|
|
||||||
else:
|
|
||||||
raise ValueError
|
|
||||||
else:
|
|
||||||
raise ValueError
|
|
||||||
return by_mode
|
|
||||||
|
|
||||||
def _decode_record_bin(self, raw_bin_data, **kwargs):
|
|
||||||
# we can only guess if we should decode for EF or DF here :(
|
|
||||||
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
|
||||||
dec = arr_seq.decode_multi(raw_bin_data)
|
|
||||||
# we cannot pass the result through flatten() here, as we don't have a related
|
|
||||||
# 'un-flattening' decoder, and hence would be unable to encode :(
|
|
||||||
return dec[0]
|
|
||||||
|
|
||||||
def _encode_record_bin(self, in_json, **kwargs):
|
|
||||||
# we can only guess if we should decode for EF or DF here :(
|
|
||||||
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
|
||||||
return arr_seq.encode_multi(in_json)
|
|
||||||
|
|
||||||
@with_default_category('File-Specific Commands')
|
|
||||||
class AddlShellCommands(CommandSet):
|
|
||||||
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
|
|
||||||
def do_read_arr_record(self, opts):
|
|
||||||
"""Read one EF.ARR record in flattened, human-friendly form."""
|
|
||||||
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
|
|
||||||
data = self._cmd.lchan.selected_file.flatten(data)
|
|
||||||
self._cmd.poutput_json(data, opts.oneline)
|
|
||||||
|
|
||||||
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_recs_dec_parser)
|
|
||||||
def do_read_arr_records(self, opts):
|
|
||||||
"""Read + decode all EF.ARR records in flattened, human-friendly form."""
|
|
||||||
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
|
|
||||||
# collect all results in list so they are rendered as JSON list when printing
|
|
||||||
data_list = []
|
|
||||||
for recnr in range(1, 1 + num_of_rec):
|
|
||||||
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
|
|
||||||
data = self._cmd.lchan.selected_file.flatten(data)
|
|
||||||
data_list.append(data)
|
|
||||||
self._cmd.poutput_json(data_list, opts.oneline)
|
|
||||||
|
|
||||||
|
|
||||||
# TS 102 221 Section 13.6
|
|
||||||
class EF_UMPC(TransparentEF):
|
|
||||||
_test_de_encode = [
|
|
||||||
( '3cff02', { "max_current_mA": 60, "t_op_s": 255,
|
|
||||||
"addl_info": { "req_inc_idle_current": False, "support_uicc_suspend": True } } ),
|
|
||||||
( '320500', { "max_current_mA": 50, "t_op_s": 5, "addl_info": {"req_inc_idle_current": False,
|
|
||||||
"support_uicc_suspend": False } } ),
|
|
||||||
]
|
|
||||||
def __init__(self, fid='2f08', sfid=0x08, name='EF.UMPC', desc='UICC Maximum Power Consumption'):
|
|
||||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=(5, 5))
|
|
||||||
addl_info = FlagsEnum(Byte, req_inc_idle_current=1,
|
|
||||||
support_uicc_suspend=2)
|
|
||||||
self._construct = Struct(
|
|
||||||
'max_current_mA'/Int8ub, 't_op_s'/Int8ub, 'addl_info'/addl_info)
|
|
||||||
|
|
||||||
|
|
||||||
class CardProfileUICC(CardProfile):
|
|
||||||
|
|
||||||
ORDER = 10
|
|
||||||
|
|
||||||
def __init__(self, name='UICC'):
|
|
||||||
files = [
|
|
||||||
EF_DIR(),
|
|
||||||
EF_ICCID(),
|
|
||||||
EF_PL(),
|
|
||||||
EF_ARR(),
|
|
||||||
# FIXME: DF.CD
|
|
||||||
EF_UMPC(),
|
|
||||||
]
|
|
||||||
addons = [
|
|
||||||
AddonSIM,
|
|
||||||
AddonGSMR,
|
|
||||||
AddonRUIM,
|
|
||||||
]
|
|
||||||
sw = {
|
|
||||||
'Normal': {
|
|
||||||
'9000': 'Normal ending of the command',
|
|
||||||
'91xx': 'Normal ending of the command, with extra information from the proactive UICC containing a command for the terminal',
|
|
||||||
'92xx': 'Normal ending of the command, with extra information concerning an ongoing data transfer session',
|
|
||||||
},
|
|
||||||
'Postponed processing': {
|
|
||||||
'9300': 'SIM Application Toolkit is busy. Command cannot be executed at present, further normal commands are allowed',
|
|
||||||
},
|
|
||||||
'Warnings': {
|
|
||||||
'6200': 'No information given, state of non-volatile memory unchanged',
|
|
||||||
'6281': 'Part of returned data may be corrupted',
|
|
||||||
'6282': 'End of file/record reached before reading Le bytes or unsuccessful search',
|
|
||||||
'6283': 'Selected file invalidated/disabled; needs to be activated before use',
|
|
||||||
'6284': 'Selected file in termination state',
|
|
||||||
'62f1': 'More data available',
|
|
||||||
'62f2': 'More data available and proactive command pending',
|
|
||||||
'62f3': 'Response data available',
|
|
||||||
'63f1': 'More data expected',
|
|
||||||
'63f2': 'More data expected and proactive command pending',
|
|
||||||
'63cx': 'Command successful but after using an internal update retry routine X times',
|
|
||||||
},
|
|
||||||
'Execution errors': {
|
|
||||||
'6400': 'No information given, state of non-volatile memory unchanged',
|
|
||||||
'6500': 'No information given, state of non-volatile memory changed',
|
|
||||||
'6581': 'Memory problem',
|
|
||||||
},
|
|
||||||
'Checking errors': {
|
|
||||||
'6700': 'Wrong length',
|
|
||||||
'67xx': 'The interpretation of this status word is command dependent',
|
|
||||||
'6b00': 'Wrong parameter(s) P1-P2',
|
|
||||||
'6d00': 'Instruction code not supported or invalid',
|
|
||||||
'6e00': 'Class not supported',
|
|
||||||
'6f00': 'Technical problem, no precise diagnosis',
|
|
||||||
'6fxx': 'The interpretation of this status word is command dependent',
|
|
||||||
},
|
|
||||||
'Functions in CLA not supported': {
|
|
||||||
'6800': 'No information given',
|
|
||||||
'6881': 'Logical channel not supported',
|
|
||||||
'6882': 'Secure messaging not supported',
|
|
||||||
},
|
|
||||||
'Command not allowed': {
|
|
||||||
'6900': 'No information given',
|
|
||||||
'6981': 'Command incompatible with file structure',
|
|
||||||
'6982': 'Security status not satisfied',
|
|
||||||
'6983': 'Authentication/PIN method blocked',
|
|
||||||
'6984': 'Referenced data invalidated',
|
|
||||||
'6985': 'Conditions of use not satisfied',
|
|
||||||
'6986': 'Command not allowed (no EF selected)',
|
|
||||||
'6989': 'Command not allowed - secure channel - security not satisfied',
|
|
||||||
},
|
|
||||||
'Wrong parameters': {
|
|
||||||
'6a80': 'Incorrect parameters in the data field',
|
|
||||||
'6a81': 'Function not supported',
|
|
||||||
'6a82': 'File not found',
|
|
||||||
'6a83': 'Record not found',
|
|
||||||
'6a84': 'Not enough memory space',
|
|
||||||
'6a86': 'Incorrect parameters P1 to P2',
|
|
||||||
'6a87': 'Lc inconsistent with P1 to P2',
|
|
||||||
'6a88': 'Referenced data not found',
|
|
||||||
},
|
|
||||||
'Application errors': {
|
|
||||||
'9850': 'INCREASE cannot be performed, max value reached',
|
|
||||||
'9862': 'Authentication error, application specific',
|
|
||||||
'9863': 'Security session or association expired',
|
|
||||||
'9864': 'Minimum UICC suspension time is too long',
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
super().__init__(name, desc='ETSI TS 102 221', cla="00",
|
|
||||||
sel_ctrl="0004", files_in_mf=files, sw=sw,
|
|
||||||
shell_cmdsets = [self.AddlShellCommands()], addons = addons)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def decode_select_response(data_hex: str) -> object:
|
|
||||||
"""ETSI TS 102 221 Section 11.1.1.3"""
|
|
||||||
t = FcpTemplate()
|
|
||||||
t.from_tlv(h2b(data_hex))
|
|
||||||
d = t.to_dict()
|
|
||||||
return flatten_dict_lists(d['fcp_template'])
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
|
||||||
""" Try to access MF via UICC APDUs (3GPP TS 102.221), if this works, the
|
|
||||||
card is considered a UICC card."""
|
|
||||||
cls._mf_select_test(scc, "00", "0004", ["3f00"])
|
|
||||||
|
|
||||||
@with_default_category('TS 102 221 Specific Commands')
|
|
||||||
class AddlShellCommands(CommandSet):
|
|
||||||
suspend_uicc_parser = argparse.ArgumentParser()
|
|
||||||
suspend_uicc_parser.add_argument('--min-duration-secs', type=int, default=60,
|
|
||||||
help='Proposed minimum duration of suspension')
|
|
||||||
suspend_uicc_parser.add_argument('--max-duration-secs', type=int, default=24*60*60,
|
|
||||||
help='Proposed maximum duration of suspension')
|
|
||||||
|
|
||||||
# not ISO7816-4 but TS 102 221
|
|
||||||
@cmd2.with_argparser(suspend_uicc_parser)
|
|
||||||
def do_suspend_uicc(self, opts):
|
|
||||||
"""Perform the SUSPEND UICC command. Only supported on some UICC (check EF.UMPC)."""
|
|
||||||
(duration, token, sw) = self._cmd.card._scc.suspend_uicc(min_len_secs=opts.min_duration_secs,
|
|
||||||
max_len_secs=opts.max_duration_secs)
|
|
||||||
self._cmd.poutput(
|
|
||||||
'Negotiated Duration: %u secs, Token: %s, SW: %s' % (duration, token, sw))
|
|
||||||
|
|
||||||
resume_uicc_parser = argparse.ArgumentParser()
|
|
||||||
resume_uicc_parser.add_argument('TOKEN', type=str, help='Token provided during SUSPEND')
|
|
||||||
|
|
||||||
@cmd2.with_argparser(resume_uicc_parser)
|
|
||||||
def do_resume_uicc(self, opts):
|
|
||||||
"""Perform the REUSME UICC operation. Only supported on some UICC. Also: A power-cycle
|
|
||||||
of the card is required between SUSPEND and RESUME, and only very few non-RESUME
|
|
||||||
commands are permitted between SUSPEND and RESUME. See TS 102 221 Section 11.1.22."""
|
|
||||||
self._cmd.card._scc.resume_uicc(opts.TOKEN)
|
|
||||||
|
|
||||||
term_cap_parser = argparse.ArgumentParser()
|
|
||||||
# power group
|
|
||||||
tc_power_grp = term_cap_parser.add_argument_group('Terminal Power Supply')
|
|
||||||
tc_power_grp.add_argument('--used-supply-voltage-class', type=str, choices=['a','b','c','d','e'],
|
|
||||||
help='Actual used Supply voltage class')
|
|
||||||
tc_power_grp.add_argument('--maximum-available-power-supply', type=auto_uint8,
|
|
||||||
help='Maximum available power supply of the terminal')
|
|
||||||
tc_power_grp.add_argument('--actual-used-freq-100k', type=auto_uint8,
|
|
||||||
help='Actual used clock frequency (in units of 100kHz)')
|
|
||||||
# no separate groups for those two
|
|
||||||
tc_elc_grp = term_cap_parser.add_argument_group('Extended logical channels terminal support')
|
|
||||||
tc_elc_grp.add_argument('--extended-logical-channel', action='store_true',
|
|
||||||
help='Extended Logical Channel supported')
|
|
||||||
tc_aif_grp = term_cap_parser.add_argument_group('Additional interfaces support')
|
|
||||||
tc_aif_grp.add_argument('--uicc-clf', action='store_true',
|
|
||||||
help='Local User Interface in the Device (LUId) supported')
|
|
||||||
# eUICC group
|
|
||||||
tc_euicc_grp = term_cap_parser.add_argument_group('Additional Terminal capability indications related to eUICC')
|
|
||||||
tc_euicc_grp.add_argument('--lui-d', action='store_true',
|
|
||||||
help='Local User Interface in the Device (LUId) supported')
|
|
||||||
tc_euicc_grp.add_argument('--lpd-d', action='store_true',
|
|
||||||
help='Local Profile Download in the Device (LPDd) supported')
|
|
||||||
tc_euicc_grp.add_argument('--lds-d', action='store_true',
|
|
||||||
help='Local Discovery Service in the Device (LPDd) supported')
|
|
||||||
tc_euicc_grp.add_argument('--lui-e-scws', action='store_true',
|
|
||||||
help='LUIe based on SCWS supported')
|
|
||||||
tc_euicc_grp.add_argument('--metadata-update-alerting', action='store_true',
|
|
||||||
help='Metadata update alerting supported')
|
|
||||||
tc_euicc_grp.add_argument('--enterprise-capable-device', action='store_true',
|
|
||||||
help='Enterprise Capable Device')
|
|
||||||
tc_euicc_grp.add_argument('--lui-e-e4e', action='store_true',
|
|
||||||
help='LUIe using E4E (ENVELOPE tag E4) supported')
|
|
||||||
tc_euicc_grp.add_argument('--lpr', action='store_true',
|
|
||||||
help='LPR (LPA Proxy) supported')
|
|
||||||
|
|
||||||
@cmd2.with_argparser(term_cap_parser)
|
|
||||||
def do_terminal_capability(self, opts):
|
|
||||||
"""Perform the TERMINAL CAPABILITY function. Used to inform the UICC about terminal capability."""
|
|
||||||
ps_flags = {}
|
|
||||||
addl_if_flags = {}
|
|
||||||
euicc_flags = {}
|
|
||||||
|
|
||||||
opts_dict = vars(opts)
|
|
||||||
|
|
||||||
power_items = ['used_supply_voltage_class', 'maximum_available_power_supply', 'actual_used_freq_100k']
|
|
||||||
if any(opts_dict[x] for x in power_items):
|
|
||||||
if not all(opts_dict[x] for x in power_items):
|
|
||||||
raise argparse.ArgumentTypeError('If any of the Terminal Power Supply group options are used, all must be specified')
|
|
||||||
|
|
||||||
for k, v in opts_dict.items():
|
|
||||||
if k in AdditionalInterfacesSupport._construct.flags.keys():
|
|
||||||
addl_if_flags[k] = v
|
|
||||||
elif k in AdditionalTermCapEuicc._construct.flags.keys():
|
|
||||||
euicc_flags[k] = v
|
|
||||||
elif k in [f.name for f in TerminalPowerSupply._construct.subcons]:
|
|
||||||
if k == 'used_supply_voltage_class' and v:
|
|
||||||
v = {v: True}
|
|
||||||
ps_flags[k] = v
|
|
||||||
|
|
||||||
child_list = []
|
|
||||||
if any(x for x in ps_flags.values()):
|
|
||||||
child_list.append(TerminalPowerSupply(decoded=ps_flags))
|
|
||||||
|
|
||||||
if opts.extended_logical_channel:
|
|
||||||
child_list.append(ExtendedLchanTerminalSupport())
|
|
||||||
if any(x for x in addl_if_flags.values()):
|
|
||||||
child_list.append(AdditionalInterfacesSupport(decoded=addl_if_flags))
|
|
||||||
if any(x for x in euicc_flags.values()):
|
|
||||||
child_list.append(AdditionalTermCapEuicc(decoded=euicc_flags))
|
|
||||||
|
|
||||||
print(child_list)
|
|
||||||
tc = TerminalCapability(children=child_list)
|
|
||||||
self.terminal_capability(b2h(tc.to_tlv()))
|
|
||||||
|
|
||||||
def terminal_capability(self, data:Hexstr):
|
|
||||||
cmd_hex = "80AA0000%02x%s" % (len(data)//2, data)
|
|
||||||
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
|
|
||||||
|
|||||||
@@ -36,14 +36,13 @@ from osmocom.utils import is_hexstr
|
|||||||
from osmocom.tlv import *
|
from osmocom.tlv import *
|
||||||
from osmocom.construct import *
|
from osmocom.construct import *
|
||||||
|
|
||||||
import pySim.ts_102_221
|
from pySim.profile.ts_51_011 import EF_ACMmax, EF_AAeM, EF_eMLPP, EF_CMI, EF_PNN
|
||||||
from pySim.ts_51_011 import EF_ACMmax, EF_AAeM, EF_eMLPP, EF_CMI, EF_PNN
|
from pySim.profile.ts_51_011 import EF_MMSN, EF_MMSICP, EF_MMSUP, EF_MMSUCP, EF_VGCS, EF_VGCSS, EF_NIA
|
||||||
from pySim.ts_51_011 import EF_MMSN, EF_MMSICP, EF_MMSUP, EF_MMSUCP, EF_VGCS, EF_VGCSS, EF_NIA
|
from pySim.profile.ts_51_011 import EF_SMSR, EF_DCK, EF_EXT, EF_CNL, EF_OPL, EF_MBI, EF_MWIS
|
||||||
from pySim.ts_51_011 import EF_SMSR, EF_DCK, EF_EXT, EF_CNL, EF_OPL, EF_MBI, EF_MWIS
|
from pySim.profile.ts_51_011 import EF_CBMID, EF_CBMIR, EF_ADN, EF_CFIS, EF_SMS, EF_MSISDN, EF_SMSP, EF_SMSS
|
||||||
from pySim.ts_51_011 import EF_CBMID, EF_CBMIR, EF_ADN, EF_CFIS, EF_SMS, EF_MSISDN, EF_SMSP, EF_SMSS
|
from pySim.profile.ts_51_011 import EF_IMSI, EF_xPLMNwAcT, EF_SPN, EF_CBMI, EF_ACC, EF_PLMNsel
|
||||||
from pySim.ts_51_011 import EF_IMSI, EF_xPLMNwAcT, EF_SPN, EF_CBMI, EF_ACC, EF_PLMNsel
|
from pySim.profile.ts_51_011 import EF_Kc, EF_CPBCCH, EF_InvScan
|
||||||
from pySim.ts_51_011 import EF_Kc, EF_CPBCCH, EF_InvScan
|
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
|
||||||
from pySim.ts_102_221 import EF_ARR
|
|
||||||
from pySim.filesystem import *
|
from pySim.filesystem import *
|
||||||
from pySim.ts_31_102_telecom import DF_PHONEBOOK, EF_UServiceTable
|
from pySim.ts_31_102_telecom import DF_PHONEBOOK, EF_UServiceTable
|
||||||
from pySim.ts_31_103_shared import EF_IMSConfigData, EF_XCAPConfigData, EF_MuDMiDConfigData
|
from pySim.ts_31_103_shared import EF_IMSConfigData, EF_XCAPConfigData, EF_MuDMiDConfigData
|
||||||
@@ -1750,7 +1749,7 @@ class ADF_USIM(CardADF):
|
|||||||
self.add_files(files)
|
self.add_files(files)
|
||||||
|
|
||||||
def decode_select_response(self, data_hex):
|
def decode_select_response(self, data_hex):
|
||||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
|
return CardProfileUICC.decode_select_response(data_hex)
|
||||||
|
|
||||||
@with_default_category('Application-Specific Commands')
|
@with_default_category('Application-Specific Commands')
|
||||||
class AddlShellCommands(CommandSet):
|
class AddlShellCommands(CommandSet):
|
||||||
|
|||||||
@@ -27,12 +27,11 @@ from osmocom.utils import *
|
|||||||
from osmocom.tlv import *
|
from osmocom.tlv import *
|
||||||
from osmocom.construct import *
|
from osmocom.construct import *
|
||||||
from pySim.filesystem import *
|
from pySim.filesystem import *
|
||||||
from pySim.ts_51_011 import EF_AD, EF_SMS, EF_SMSS, EF_SMSR, EF_SMSP
|
from pySim.profile.ts_51_011 import EF_AD, EF_SMS, EF_SMSS, EF_SMSR, EF_SMSP
|
||||||
from pySim.ts_31_102 import ADF_USIM, EF_FromPreferred
|
from pySim.ts_31_102 import ADF_USIM, EF_FromPreferred
|
||||||
from pySim.ts_31_102_telecom import EF_UServiceTable
|
from pySim.ts_31_102_telecom import EF_UServiceTable
|
||||||
from pySim.ts_31_103_shared import *
|
from pySim.ts_31_103_shared import *
|
||||||
import pySim.ts_102_221
|
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
|
||||||
from pySim.ts_102_221 import EF_ARR
|
|
||||||
|
|
||||||
# Mapping between ISIM Service Number and its description
|
# Mapping between ISIM Service Number and its description
|
||||||
EF_IST_map = {
|
EF_IST_map = {
|
||||||
@@ -226,7 +225,7 @@ class ADF_ISIM(CardADF):
|
|||||||
self.shell_commands += [ADF_USIM.AddlShellCommands()]
|
self.shell_commands += [ADF_USIM.AddlShellCommands()]
|
||||||
|
|
||||||
def decode_select_response(self, data_hex):
|
def decode_select_response(self, data_hex):
|
||||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
|
return CardProfileUICC.decode_select_response(data_hex)
|
||||||
|
|
||||||
|
|
||||||
# TS 31.103 Section 7.1
|
# TS 31.103 Section 7.1
|
||||||
|
|||||||
@@ -24,9 +24,8 @@ from osmocom.utils import *
|
|||||||
from osmocom.tlv import *
|
from osmocom.tlv import *
|
||||||
from pySim.filesystem import *
|
from pySim.filesystem import *
|
||||||
from pySim.ts_31_102 import ADF_USIM
|
from pySim.ts_31_102 import ADF_USIM
|
||||||
from pySim.ts_51_011 import EF_IMSI, EF_AD
|
from pySim.profile.ts_51_011 import EF_IMSI, EF_AD
|
||||||
import pySim.ts_102_221
|
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
|
||||||
from pySim.ts_102_221 import EF_ARR
|
|
||||||
|
|
||||||
|
|
||||||
class ADF_HPSIM(CardADF):
|
class ADF_HPSIM(CardADF):
|
||||||
@@ -44,7 +43,7 @@ class ADF_HPSIM(CardADF):
|
|||||||
self.shell_commands += [ADF_USIM.AddlShellCommands()]
|
self.shell_commands += [ADF_USIM.AddlShellCommands()]
|
||||||
|
|
||||||
def decode_select_response(self, data_hex):
|
def decode_select_response(self, data_hex):
|
||||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
|
return CardProfileUICC.decode_select_response(data_hex)
|
||||||
|
|
||||||
|
|
||||||
# TS 31.104 Section 7.1
|
# TS 31.104 Section 7.1
|
||||||
|
|||||||
3
pySim/wsrc/__init__.py
Normal file
3
pySim/wsrc/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
|
||||||
|
WSRC_DEFAULT_PORT_USER = 4220
|
||||||
|
WSRC_DEFAULT_PORT_CARD = 4221
|
||||||
68
pySim/wsrc/client_blocking.py
Normal file
68
pySim/wsrc/client_blocking.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||||
|
perform commands on it."""
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
from websockets.sync.client import connect
|
||||||
|
from osmocom.utils import b2h
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class WsClientBlocking(abc.ABC):
|
||||||
|
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
|
||||||
|
|
||||||
|
def __init__(self, cltype: str, ws_uri: str):
|
||||||
|
self.client_type = cltype
|
||||||
|
self.ws_uri = ws_uri
|
||||||
|
self.ws = None
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
self.ws = connect(uri=self.ws_uri)
|
||||||
|
self.perform_outbound_hello()
|
||||||
|
|
||||||
|
def tx_json(self, msg_type: str, d: dict = {}):
|
||||||
|
"""JSON-Encode and transmit a message to the given websocket."""
|
||||||
|
d['msg_type'] = msg_type
|
||||||
|
d_js = json.dumps(d)
|
||||||
|
logger.debug("Tx: %s", d_js)
|
||||||
|
self.ws.send(d_js)
|
||||||
|
|
||||||
|
def tx_error(self, message: str):
|
||||||
|
event = {
|
||||||
|
'message': message,
|
||||||
|
}
|
||||||
|
self.tx_json('error', event)
|
||||||
|
|
||||||
|
def rx_json(self):
|
||||||
|
"""Receive a single message from the given websocket and JSON-decode it."""
|
||||||
|
rx = self.ws.recv()
|
||||||
|
rx_js = json.loads(rx)
|
||||||
|
logger.debug("Rx: %s", rx_js)
|
||||||
|
assert 'msg_type' in rx
|
||||||
|
return rx_js
|
||||||
|
|
||||||
|
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
|
||||||
|
self.tx_json(tx_msg_type, tx_d)
|
||||||
|
rx = self.rx_json()
|
||||||
|
assert rx['msg_type'] == rx_msg_type
|
||||||
|
return rx
|
||||||
|
|
||||||
|
def perform_outbound_hello(self, tx: dict = {}):
|
||||||
|
if not 'client_type' in tx:
|
||||||
|
tx['client_type'] = self.client_type
|
||||||
|
self.tx_json('hello', tx)
|
||||||
|
rx = self.rx_json()
|
||||||
|
assert rx['msg_type'] == 'hello_ack'
|
||||||
|
return rx
|
||||||
|
|
||||||
|
def rx_and_execute_cmd(self):
|
||||||
|
"""Receve and dispatch/execute a single command from the server."""
|
||||||
|
rx = self.rx_json()
|
||||||
|
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
|
||||||
|
if handler:
|
||||||
|
handler(rx)
|
||||||
|
else:
|
||||||
|
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
|
||||||
|
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])
|
||||||
@@ -25,10 +25,10 @@ import pySim.ts_102_221
|
|||||||
import pySim.ts_102_222
|
import pySim.ts_102_222
|
||||||
import pySim.ts_31_102
|
import pySim.ts_31_102
|
||||||
import pySim.ts_31_103
|
import pySim.ts_31_103
|
||||||
import pySim.ts_51_011
|
import pySim.profile.ts_51_011
|
||||||
import pySim.sysmocom_sja2
|
import pySim.sysmocom_sja2
|
||||||
import pySim.gsm_r
|
import pySim.profile.gsm_r
|
||||||
import pySim.cdma_ruim
|
import pySim.profile.cdma_ruim
|
||||||
|
|
||||||
from construct import Int8ub, Struct, Padding, this
|
from construct import Int8ub, Struct, Padding, this
|
||||||
from osmocom.tlv import BER_TLV_IE
|
from osmocom.tlv import BER_TLV_IE
|
||||||
|
|||||||
@@ -26,10 +26,10 @@ import pySim.ts_102_221
|
|||||||
import pySim.ts_102_222
|
import pySim.ts_102_222
|
||||||
import pySim.ts_31_102
|
import pySim.ts_31_102
|
||||||
import pySim.ts_31_103
|
import pySim.ts_31_103
|
||||||
import pySim.ts_51_011
|
import pySim.profile.ts_51_011
|
||||||
import pySim.sysmocom_sja2
|
import pySim.sysmocom_sja2
|
||||||
import pySim.gsm_r
|
import pySim.profile.gsm_r
|
||||||
import pySim.cdma_ruim
|
import pySim.profile.cdma_ruim
|
||||||
import pySim.global_platform
|
import pySim.global_platform
|
||||||
import pySim.global_platform.http
|
import pySim.global_platform.http
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user