Compare commits
5 Commits
lynxis/esi
...
laforge/ws
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9bb63a2db | ||
|
|
a01e87da77 | ||
|
|
53e840ad86 | ||
|
|
d15c3d1319 | ||
|
|
671b0f19b6 |
112
contrib/wsrc_card_client.py
Executable file
112
contrib/wsrc_card_client.py
Executable file
@@ -0,0 +1,112 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||
perform commands on it."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
import argparse
|
||||
from osmocom.utils import b2h
|
||||
import websockets
|
||||
|
||||
from pySim.transport import init_reader, argparse_add_reader_args, LinkBase
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||
from pySim.exceptions import NoCardError
|
||||
from pySim.wsrc import WSRC_DEFAULT_PORT_CARD
|
||||
|
||||
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class CardWsClientBlocking(WsClientBlocking):
|
||||
"""Implementation of the card (reader) client of the WSRC (WebSocket Remote Card) protocol"""
|
||||
|
||||
def __init__(self, ws_uri, tp: LinkBase):
|
||||
super().__init__('card', ws_uri)
|
||||
self.tp = tp
|
||||
|
||||
def perform_outbound_hello(self):
|
||||
hello_data = {
|
||||
'atr': b2h(self.tp.get_atr()),
|
||||
# TODO: include various card information in the HELLO message
|
||||
}
|
||||
super().perform_outbound_hello(hello_data)
|
||||
|
||||
def handle_rx_c_apdu(self, rx: dict):
|
||||
"""handle an inbound APDU transceive command"""
|
||||
data, sw = self.tp.send_apdu(rx['command'])
|
||||
tx = {
|
||||
'response': data,
|
||||
'sw': sw,
|
||||
}
|
||||
self.tx_json('r_apdu', tx)
|
||||
|
||||
def handle_rx_disconnect(self, rx: dict):
|
||||
"""server tells us to disconnect"""
|
||||
self.tx_json('disconnect_ack')
|
||||
# FIXME: tear down connection and/or terminate entire program
|
||||
|
||||
def handle_rx_state_notification(self, rx: dict):
|
||||
logger.info("State Notification: %s" % rx['new_state'])
|
||||
|
||||
def handle_rx_print(self, rx: dict):
|
||||
"""print a message (text) given by server to the local console/log"""
|
||||
logger.info("SERVER MSG: %s" % rx['message'])
|
||||
# no response
|
||||
|
||||
def handle_rx_reset_req(self, rx: dict):
|
||||
"""server tells us to reset the card"""
|
||||
self.tp.reset_card()
|
||||
self.tx_json('reset_resp', {'atr': b2h(self.tp.get_atr())})
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
argparse_add_reader_args(parser)
|
||||
parser.add_argument("--uri", default="ws://localhost:%u/" % (WSRC_DEFAULT_PORT_CARD),
|
||||
help="URI of the sever to which to connect")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = parser.parse_args()
|
||||
|
||||
# open the card reader / slot
|
||||
logger.info("Initializing Card Reader...")
|
||||
try:
|
||||
tp = init_reader(opts)
|
||||
except Exception as e:
|
||||
logger.fatal("Error opening reader: %s" % e)
|
||||
sys.exit(1)
|
||||
|
||||
logger.info("Connecting to Card...")
|
||||
try:
|
||||
tp.connect()
|
||||
except NoCardError as e:
|
||||
logger.fatal("Error opening card! Is a card inserted in the reader?")
|
||||
sys.exit(1)
|
||||
|
||||
scc = SimCardCommands(transport=tp)
|
||||
logger.info("Detected Card with ATR: %s" % b2h(tp.get_atr()))
|
||||
|
||||
# TODO: gather various information about the card; print it
|
||||
|
||||
# create + connect the client to the server
|
||||
cl = CardWsClientBlocking(opts.uri, tp)
|
||||
logger.info("Connecting to remote server...")
|
||||
try:
|
||||
cl.connect()
|
||||
logger.info("Successfully connected to Server")
|
||||
except ConnectionRefusedError as e:
|
||||
logger.fatal(e)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# endless loop: wait for inbound command from server + execute it
|
||||
cl.rx_and_execute_cmd()
|
||||
except websockets.exceptions.ConnectionClosedOK as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt as e:
|
||||
print(e.__class__.__name__)
|
||||
sys.exit(2)
|
||||
354
contrib/wsrc_server.py
Executable file
354
contrib/wsrc_server.py
Executable file
@@ -0,0 +1,354 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Optional, Tuple
|
||||
from websockets.asyncio.server import serve
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
from osmocom.utils import Hexstr, swap_nibbles
|
||||
|
||||
from pySim.utils import SwMatchstr, ResTuple, sw_match, dec_iccid
|
||||
from pySim.exceptions import SwMatchError
|
||||
from pySim.wsrc import WSRC_DEFAULT_PORT_USER, WSRC_DEFAULT_PORT_CARD
|
||||
|
||||
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
card_clients = set()
|
||||
user_clients = set()
|
||||
|
||||
class WsClientLogAdapter(logging.LoggerAdapter):
|
||||
"""LoggerAdapter adding context (for now: remote IP/Port of client) to log"""
|
||||
def process(self, msg, kwargs):
|
||||
return '%s([%s]:%u) %s' % (self.extra['type'], self.extra['remote_addr'][0],
|
||||
self.extra['remote_addr'][1], msg), kwargs
|
||||
|
||||
class WsClient:
|
||||
def __init__(self, websocket, hello: dict):
|
||||
self.websocket = websocket
|
||||
self.hello = hello
|
||||
self.identity = {}
|
||||
self.logger = WsClientLogAdapter(logger, {'type': self.__class__.__name__,
|
||||
'remote_addr': websocket.remote_address})
|
||||
|
||||
def __str__(self):
|
||||
return '%s([%s]:%u)' % (self.__class__.__name__, self.websocket.remote_address[0],
|
||||
self.websocket.remote_address[1])
|
||||
|
||||
async def rx_json(self):
|
||||
rx = await self.websocket.recv()
|
||||
rx_js = json.loads(rx)
|
||||
self.logger.debug("Rx: %s", rx_js)
|
||||
assert 'msg_type' in rx
|
||||
return rx_js
|
||||
|
||||
async def tx_json(self, msg_type:str, d: dict = {}):
|
||||
"""Transmit a json-serializable dict to the peer"""
|
||||
d['msg_type'] = msg_type
|
||||
d_js = json.dumps(d)
|
||||
self.logger.debug("Tx: %s", d_js)
|
||||
await self.websocket.send(d_js)
|
||||
|
||||
async def tx_hello_ack(self):
|
||||
await self.tx_json('hello_ack')
|
||||
|
||||
async def xceive_json(self, msg_type:str, d:dict = {}, exp_msg_type:Optional[str] = None) -> dict:
|
||||
await self.tx_json(msg_type, d)
|
||||
rx = await self.rx_json()
|
||||
if exp_msg_type:
|
||||
assert rx['msg_type'] == exp_msg_type
|
||||
return rx;
|
||||
|
||||
async def tx_error(self, message: str):
|
||||
"""Transmit an error message to the peer"""
|
||||
event = {
|
||||
"message": message,
|
||||
}
|
||||
self.logger.error("Transmitting error message: '%s'" % message)
|
||||
await self.tx_json('error', event)
|
||||
|
||||
# async def ws_hdlr(self):
|
||||
# """kind of a 'main' function for the websocket client: wait for incoming message,
|
||||
# and handle it."""
|
||||
# try:
|
||||
# async for message in self.websocket:
|
||||
# method = getattr(self, 'handle_rx_%s' % message['msg_type'], None)
|
||||
# if not method:
|
||||
# await self.tx_error("Unknonw msg_type: %s" % message['msg_type'])
|
||||
# else:
|
||||
# method(message)
|
||||
# except ConnectionClosedError:
|
||||
# # we handle this in the outer loop
|
||||
# pass
|
||||
|
||||
class CardClient(WsClient):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.identity['ATR'] = self.hello['atr']
|
||||
self.state = 'init'
|
||||
|
||||
def __str__(self):
|
||||
eid = self.identity.get('EID', None)
|
||||
if eid:
|
||||
return '%s(EID=%s)' % (self.__class__.__name__, eid)
|
||||
iccid = self.identity.get('ICCID', None)
|
||||
if iccid:
|
||||
return '%s(ICCID=%s)' % (self.__class__.__name__, iccid)
|
||||
return super().__str__()
|
||||
|
||||
def listing_entry(self) -> dict:
|
||||
return {'remote_addr': self.websocket.remote_address,
|
||||
'identities': self.identity,
|
||||
'state': self.state}
|
||||
|
||||
"""A websocket client that represents a reader/card. This is what we use to talk to a card"""
|
||||
async def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||
"""transceive a single APDU with the card"""
|
||||
message = await self.xceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||
return message['response'], message['sw']
|
||||
|
||||
async def xceive_apdu(self, pdu: Hexstr) -> ResTuple:
|
||||
"""transceive an APDU with the card, handling T=0 GET_RESPONSE cases"""
|
||||
prev_pdu = pdu
|
||||
data, sw = await self.xceive_apdu_raw(pdu)
|
||||
|
||||
if sw is not None:
|
||||
while (sw[0:2] in ['9f', '61', '62', '63']):
|
||||
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
|
||||
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
|
||||
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
|
||||
pdu_gr = pdu[0:2] + 'c00000' + sw[2:4]
|
||||
prev_pdu = pdu_gr
|
||||
d, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||
data += d
|
||||
if sw[0:2] == '6c':
|
||||
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
|
||||
pdu_gr = prev_pdu[0:8] + sw[2:4]
|
||||
data, sw = await self.xceive_apdu_raw(pdu_gr)
|
||||
|
||||
return data, sw
|
||||
|
||||
async def xceive_apdu_checksw(self, pdu: Hexstr, sw: SwMatchstr = "9000") -> ResTuple:
|
||||
"""like xceive_apdu, but checking the status word matches the expected pattern"""
|
||||
rv = await self.xceive_apdu(pdu)
|
||||
last_sw = rv[1]
|
||||
|
||||
if not sw_match(rv[1], sw):
|
||||
raise SwMatchError(rv[1], sw.lower())
|
||||
return rv
|
||||
|
||||
async def card_reset(self):
|
||||
"""reset the card"""
|
||||
rx = await self.xceive_json('reset_req', exp_msg_type='reset_resp')
|
||||
self.identity['ATR'] = rx['atr']
|
||||
|
||||
async def notify_state(self):
|
||||
"""notify the card client of a state [change]"""
|
||||
await self.tx_json('state_notification', {'new_state': self.state})
|
||||
|
||||
async def get_iccid(self):
|
||||
"""high-level method to obtain the ICCID of the card"""
|
||||
await self.xceive_apdu_checksw('00a40000023f00') # SELECT MF
|
||||
await self.xceive_apdu_checksw('00a40000022fe2') # SELECT EF.ICCID
|
||||
res, sw = await self.xceive_apdu_checksw('00b0000000') # READ BINARY
|
||||
return dec_iccid(res)
|
||||
|
||||
async def get_eid_sgp22(self):
|
||||
"""high-level method to obtain the EID of a SGP.22 consumer eUICC"""
|
||||
await self.xceive_apdu_checksw('00a4040410a0000005591010ffffffff8900000100')
|
||||
res, sw = await self.xceive_apdu_checksw('80e2910006bf3e035c015a')
|
||||
return res[-32:]
|
||||
|
||||
async def set_state(self, new_state:str):
|
||||
self.logger.info("Card now in '%s' state" % new_state)
|
||||
self.state = new_state
|
||||
await self.notify_state()
|
||||
|
||||
async def identify(self):
|
||||
# identify the card by asking for its EID and/or ICCID
|
||||
try:
|
||||
eid = await self.get_eid_sgp22()
|
||||
self.logger.debug("EID: %s", eid)
|
||||
self.identity['EID'] = eid
|
||||
except SwMatchError:
|
||||
pass
|
||||
try:
|
||||
iccid = await self.get_iccid()
|
||||
self.logger.debug("ICCID: %s", iccid)
|
||||
self.identity['ICCID'] = iccid
|
||||
except SwMatchError:
|
||||
pass
|
||||
await self.set_state('ready')
|
||||
|
||||
async def associate_user(self, user):
|
||||
assert self.state == 'ready'
|
||||
self.user = user
|
||||
await self.set_state('associated')
|
||||
|
||||
async def disassociate_user(self):
|
||||
assert self.user
|
||||
assert self.state == 'associated'
|
||||
await self.set_state('ready')
|
||||
|
||||
@staticmethod
|
||||
def find_client_for_id(id_type: str, id_str: str) -> Optional['CardClient']:
|
||||
for c in card_clients:
|
||||
if c.state != 'ready':
|
||||
continue
|
||||
c_id = c.identity.get(id_type.upper(), None)
|
||||
if c_id and c_id.lower() == id_str.lower():
|
||||
return c
|
||||
return None
|
||||
|
||||
class UserClient(WsClient):
|
||||
"""A websocket client representing a user application like pySim-shell."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.state = 'init'
|
||||
self.card = None
|
||||
|
||||
async def associate_card(self, card: CardClient):
|
||||
assert self.state == 'init'
|
||||
self.card = card
|
||||
self.state = 'associated'
|
||||
|
||||
async def disassociate_card(self):
|
||||
assert self.state == 'associated'
|
||||
self.card = None
|
||||
self.state = 'init'
|
||||
|
||||
async def state_init(self):
|
||||
"""Wait for incoming 'select_card' and process it."""
|
||||
while True:
|
||||
rx = await self.rx_json()
|
||||
if rx['msg_type'] == 'select_card':
|
||||
# look-up if the card can be found
|
||||
card = CardClient.find_client_for_id(rx['id_type'], rx['id_str'])
|
||||
if not card:
|
||||
await self.tx_error('No CardClient found for %s == %s' % (rx['id_type'], rx['id_str']))
|
||||
continue
|
||||
# transition to next statee
|
||||
await card.associate_user(self)
|
||||
await self.associate_card(card)
|
||||
await self.tx_json('select_card_ack', {'identities': card.identity})
|
||||
break
|
||||
elif rx['msg_type'] == 'list_cards':
|
||||
res = [x.listing_entry() for x in card_clients]
|
||||
await self.tx_json('list_cards_ack', {'cards': res})
|
||||
else:
|
||||
self.tx_error('Unknown message type %s' % rx['msg_type'])
|
||||
|
||||
async def state_selected(self):
|
||||
while True:
|
||||
rx = await self.rx_json()
|
||||
if rx['msg_type'] == 'c_apdu':
|
||||
rsp, sw = await self.card.xceive_apdu_raw(rx['command'])
|
||||
await self.tx_json('r_apdu', {'response': rsp, 'sw': sw})
|
||||
elif rx['msg_type'] == 'reset_req':
|
||||
await self.card.card_reset()
|
||||
await self.tx_json('reset_resp')
|
||||
else:
|
||||
self.logger.warning("Unknown/unsupported command '%s' received" % rx['msg_type'])
|
||||
|
||||
|
||||
|
||||
async def card_conn_hdlr(websocket):
|
||||
"""Handler for incoming connection to 'card' port."""
|
||||
# receive first message, which should be a 'hello'
|
||||
rx_raw = await websocket.recv()
|
||||
rx = json.loads(rx_raw)
|
||||
assert rx['msg_type'] == 'hello'
|
||||
client_type = rx['client_type']
|
||||
|
||||
if client_type != 'card':
|
||||
logger.error("Rejecting client (unknown type %s) connection", client_type)
|
||||
raise ValueError("client_type '%s' != expected 'card'" % client_type)
|
||||
|
||||
card = CardClient(websocket, rx)
|
||||
card.logger.info("connection accepted")
|
||||
# first go through identity phase
|
||||
async with asyncio.timeout(30):
|
||||
await card.tx_hello_ack()
|
||||
card.logger.info("hello-handshake completed")
|
||||
card_clients.add(card)
|
||||
# first obtain the identity of the card
|
||||
await card.identify()
|
||||
# then go into the "main loop"
|
||||
try:
|
||||
# wait 'indefinitely'. We cannot call websocket.recv() here, as we will call another
|
||||
# recv() while waiting for the R-APDU after forwarding one from the user client.
|
||||
await websocket.wait_closed()
|
||||
finally:
|
||||
card.logger.info("connection closed")
|
||||
card_clients.remove(card)
|
||||
|
||||
|
||||
async def user_conn_hdlr(websocket):
|
||||
"""Handler for incoming connection to 'user' port."""
|
||||
# receive first message, which should be a 'hello'
|
||||
rx_raw = await websocket.recv()
|
||||
rx = json.loads(rx_raw)
|
||||
assert rx['msg_type'] == 'hello'
|
||||
client_type = rx['client_type']
|
||||
|
||||
if client_type != 'user':
|
||||
logger.error("Rejecting client (unknown type %s) connection", client_type)
|
||||
raise ValueError("client_type '%s' != expected 'card'" % client_type)
|
||||
|
||||
user = UserClient(websocket, rx)
|
||||
user.logger.info("connection accepted")
|
||||
# first go through hello phase
|
||||
async with asyncio.timeout(10):
|
||||
await user.tx_hello_ack()
|
||||
user.logger.info("hello-handshake completed")
|
||||
user_clients.add(user)
|
||||
# first wait for the user to specify the select the card
|
||||
try:
|
||||
await user.state_init()
|
||||
except ConnectionClosedError:
|
||||
user.logger.info("connection closed")
|
||||
user_clients.remove(user)
|
||||
return
|
||||
except asyncio.exceptions.CancelledError: # direct cause of TimeoutError
|
||||
user.logger.error("User failed to transition to 'associated' state within 10s")
|
||||
user_clients.remove(user)
|
||||
return
|
||||
except Exception as e:
|
||||
user.logger.error("Unknown exception %s", e)
|
||||
raise e
|
||||
user_clients.remove(user)
|
||||
return
|
||||
# then perform APDU exchanges without any time limit
|
||||
try:
|
||||
await user.state_selected()
|
||||
except ConnectionClosedError:
|
||||
pass
|
||||
finally:
|
||||
user.logger.info("connection closed")
|
||||
await user.card.disassociate_user()
|
||||
await user.disassociate_card()
|
||||
user_clients.remove(user)
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description="""Osmocom WebSocket Remote Card server""")
|
||||
parser.add_argument('--user-bind-port', type=int, default=WSRC_DEFAULT_PORT_USER,
|
||||
help="port number to bind for connections from users")
|
||||
parser.add_argument('--user-bind-host', type=str, default='localhost',
|
||||
help="local Host/IP to bind for connections from users")
|
||||
parser.add_argument('--card-bind-port', type=int, default=WSRC_DEFAULT_PORT_CARD,
|
||||
help="port number to bind for connections from cards")
|
||||
parser.add_argument('--card-bind-host', type=str, default='localhost',
|
||||
help="local Host/IP to bind for connections from cards")
|
||||
|
||||
if __name__ == '__main__':
|
||||
opts = parser.parse_args()
|
||||
|
||||
async def main():
|
||||
# we use different ports for user + card connections to ensure they can
|
||||
# have different packet filter rules apply to them
|
||||
async with serve(card_conn_hdlr, opts.card_bind_host, opts.card_bind_port), serve(user_conn_hdlr, opts.user_bind_host, opts.user_bind_port):
|
||||
await asyncio.get_running_loop().create_future() # run forever
|
||||
|
||||
asyncio.run(main())
|
||||
@@ -43,6 +43,7 @@ pySim consists of several parts:
|
||||
legacy
|
||||
library
|
||||
osmo-smdpp
|
||||
wsrc
|
||||
|
||||
|
||||
Indices and tables
|
||||
|
||||
57
docs/wsrc.rst
Normal file
57
docs/wsrc.rst
Normal file
@@ -0,0 +1,57 @@
|
||||
WebSocket Remote Card (WSRC)
|
||||
============================
|
||||
|
||||
WSRC (*Web Socket Remote Card*) is a mechanism by which card readers can be made remotely available
|
||||
via a computer network. The transport mechanism is (as the name implies) a WebSocket. This transport
|
||||
method was chosen to be as firewall/NAT friendly as possible.
|
||||
|
||||
WSRC Network Architecture
|
||||
-------------------------
|
||||
|
||||
In a WSRC network, there are three major elements:
|
||||
|
||||
* The **WSRC Card Client** which exposes a locally attached smart card (usually via a Smart Card Reader)
|
||||
to a remote *WSRC Server*
|
||||
* The **WSRC Server** manges incoming connections from both *WSRC Card Clients* as well as *WSRC User Clients*
|
||||
* The **WSRC User Client** is a user application, like for example pySim-shell, which is accessing a remote
|
||||
card by connecting to the *WSRC Server* which relays the information to the selected *WSRC Card Client*
|
||||
|
||||
WSRC Protocol
|
||||
-------------
|
||||
|
||||
The WSRC protocl consits of JSON objects being sent over a websocket. The websocket communication itself
|
||||
is based on HTTP and should usually operate via TLS for security reasons.
|
||||
|
||||
The detailed protocol is currently still WIP. The plan is to document it here.
|
||||
|
||||
|
||||
pySim implementations
|
||||
---------------------
|
||||
|
||||
|
||||
wsrc_server
|
||||
~~~~~~~~~~~~~~~~
|
||||
.. argparse::
|
||||
:filename: ../contrib/wsrc_server.py
|
||||
:func: parser
|
||||
:prog: contrib/wsrc_server.py
|
||||
|
||||
|
||||
wsrc_card_client
|
||||
~~~~~~~~~~~~~~~~
|
||||
.. argparse::
|
||||
:filename: ../contrib/wsrc_card_client.py
|
||||
:func: parser
|
||||
:prog: contrib/wsrc_card_client.py
|
||||
|
||||
pySim-shell
|
||||
~~~~~~~~~~~
|
||||
|
||||
pySim-shell can talk to a remote card via WSRC if you use the *wsrc transport*, for example like this:
|
||||
|
||||
::
|
||||
|
||||
./pySim-shell.py --wsrc-eid 89882119900000000000000000007280 --wsrc-serer-url ws://localhost:4220/
|
||||
|
||||
|
||||
You can specify `--wsrc-eid` or `--wsrc-iccid` to identify the remote eUICC or UICC you would like to select.
|
||||
@@ -39,7 +39,7 @@ from pySim.commands import SimCardCommands
|
||||
from pySim.transport import init_reader, argparse_add_reader_args
|
||||
from pySim.legacy.cards import _cards_classes, card_detect
|
||||
from pySim.utils import derive_milenage_opc, calculate_luhn, dec_iccid
|
||||
from pySim.ts_51_011 import EF_AD
|
||||
from pySim.profile.ts_51_011 import EF_AD
|
||||
from pySim.legacy.ts_51_011 import EF
|
||||
from pySim.card_handler import *
|
||||
from pySim.utils import *
|
||||
|
||||
@@ -31,7 +31,7 @@ import sys
|
||||
|
||||
from osmocom.utils import h2b, h2s, swap_nibbles, rpad
|
||||
|
||||
from pySim.ts_51_011 import EF_SST_map, EF_AD
|
||||
from pySim.profile.ts_51_011 import EF_SST_map, EF_AD
|
||||
from pySim.legacy.ts_51_011 import EF, DF
|
||||
from pySim.ts_31_102 import EF_UST_map
|
||||
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map
|
||||
|
||||
@@ -60,7 +60,6 @@ from pySim.card_handler import CardHandler, CardHandlerAuto
|
||||
from pySim.filesystem import CardMF, CardEF, CardDF, CardADF, LinFixedEF, TransparentEF, BerTlvEF
|
||||
from pySim.ts_102_221 import pin_names
|
||||
from pySim.ts_102_222 import Ts102222Commands
|
||||
from pySim.gsm_r import DF_EIRENE
|
||||
from pySim.cat import ProactiveCommand
|
||||
|
||||
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
|
||||
|
||||
@@ -13,7 +13,7 @@ from osmocom.utils import JsonEncoder
|
||||
from pySim.cards import UiccCardBase
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.profile import CardProfile
|
||||
from pySim.ts_102_221 import CardProfileUICC
|
||||
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||
from pySim.ts_31_102 import CardApplicationUSIM
|
||||
from pySim.ts_31_103 import CardApplicationISIM
|
||||
from pySim.euicc import CardApplicationISDR, CardApplicationECASD
|
||||
|
||||
@@ -22,8 +22,8 @@ from pySim.filesystem import CardModel, CardApplication
|
||||
from pySim.cards import card_detect, SimCardBase, UiccCardBase, CardBase
|
||||
from pySim.runtime import RuntimeState
|
||||
from pySim.profile import CardProfile
|
||||
from pySim.cdma_ruim import CardProfileRUIM
|
||||
from pySim.ts_102_221 import CardProfileUICC
|
||||
from pySim.profile.cdma_ruim import CardProfileRUIM
|
||||
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||
from pySim.utils import all_subclasses
|
||||
from pySim.exceptions import SwMatchError
|
||||
|
||||
@@ -40,7 +40,7 @@ import pySim.ts_31_103
|
||||
import pySim.ts_31_104
|
||||
import pySim.ara_m
|
||||
import pySim.global_platform
|
||||
import pySim.euicc
|
||||
import pySim.profile.euicc
|
||||
|
||||
def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState, SimCardBase]:
|
||||
"""
|
||||
|
||||
@@ -25,8 +25,8 @@
|
||||
from typing import Optional, Tuple
|
||||
from osmocom.utils import *
|
||||
|
||||
from pySim.ts_102_221 import EF_DIR, CardProfileUICC
|
||||
from pySim.ts_51_011 import DF_GSM
|
||||
from pySim.profile.ts_102_221 import EF_DIR, CardProfileUICC
|
||||
from pySim.profile.ts_51_011 import DF_GSM
|
||||
from pySim.utils import SwHexstr
|
||||
from pySim.commands import Path, SimCardCommands
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ from osmocom.tlv import bertlv_encode_len
|
||||
from pySim.utils import sw_match, expand_hex, SwHexstr, ResTuple, SwMatchstr
|
||||
from pySim.exceptions import SwMatchError
|
||||
from pySim.transport import LinkBase
|
||||
from pySim.ts_102_221 import decode_select_response
|
||||
|
||||
# A path can be either just a FID or a list of FID
|
||||
Path = typing.Union[Hexstr, List[Hexstr]]
|
||||
@@ -176,40 +177,6 @@ class SimCardCommands:
|
||||
raise SwMatchError(sw, sw_exp.lower(), self._tp.sw_interpreter)
|
||||
return (rsp, sw)
|
||||
|
||||
# Extract a single FCP item from TLV
|
||||
def __parse_fcp(self, fcp: Hexstr):
|
||||
# see also: ETSI TS 102 221, chapter 11.1.1.3.1 Response for MF,
|
||||
# DF or ADF
|
||||
from pytlv.TLV import TLV
|
||||
tlvparser = TLV(['82', '83', '84', 'a5', '8a', '8b',
|
||||
'8c', '80', 'ab', 'c6', '81', '88'])
|
||||
|
||||
# pytlv is case sensitive!
|
||||
fcp = fcp.lower()
|
||||
|
||||
if fcp[0:2] != '62':
|
||||
raise ValueError(
|
||||
'Tag of the FCP template does not match, expected 62 but got %s' % fcp[0:2])
|
||||
|
||||
# Unfortunately the spec is not very clear if the FCP length is
|
||||
# coded as one or two byte vale, so we have to try it out by
|
||||
# checking if the length of the remaining TLV string matches
|
||||
# what we get in the length field.
|
||||
# See also ETSI TS 102 221, chapter 11.1.1.3.0 Base coding.
|
||||
# TODO: this likely just is normal BER-TLV ("All data objects are BER-TLV except if otherwise # defined.")
|
||||
exp_tlv_len = int(fcp[2:4], 16)
|
||||
if len(fcp[4:]) // 2 == exp_tlv_len:
|
||||
skip = 4
|
||||
else:
|
||||
exp_tlv_len = int(fcp[2:6], 16)
|
||||
if len(fcp[4:]) // 2 == exp_tlv_len:
|
||||
skip = 6
|
||||
raise ValueError('Cannot determine length of TLV-length')
|
||||
|
||||
# Skip FCP tag and length
|
||||
tlv = fcp[skip:]
|
||||
return tlvparser.parse(tlv)
|
||||
|
||||
# Tell the length of a record by the card response
|
||||
# USIMs respond with an FCP template, which is different
|
||||
# from what SIMs responds. See also:
|
||||
@@ -217,10 +184,8 @@ class SimCardCommands:
|
||||
# SIM: GSM 11.11, chapter 9.2.1 SELECT
|
||||
def __record_len(self, r) -> int:
|
||||
if self.sel_ctrl == "0004":
|
||||
tlv_parsed = self.__parse_fcp(r[-1])
|
||||
file_descriptor = tlv_parsed['82']
|
||||
# See also ETSI TS 102 221, chapter 11.1.1.4.3 File Descriptor
|
||||
return int(file_descriptor[4:8], 16)
|
||||
fcp_parsed = decode_select_response(r[-1])
|
||||
return fcp_parsed['file_descriptor']['record_len']
|
||||
else:
|
||||
return int(r[-1][28:30], 16)
|
||||
|
||||
@@ -228,8 +193,8 @@ class SimCardCommands:
|
||||
# above.
|
||||
def __len(self, r) -> int:
|
||||
if self.sel_ctrl == "0004":
|
||||
tlv_parsed = self.__parse_fcp(r[-1])
|
||||
return int(tlv_parsed['80'], 16)
|
||||
fcp_parsed = decode_select_response(r[-1])
|
||||
return fcp_parsed['file_size']
|
||||
else:
|
||||
return int(r[-1][4:8], 16)
|
||||
|
||||
|
||||
@@ -34,7 +34,6 @@ from osmocom.construct import *
|
||||
from pySim.exceptions import SwMatchError
|
||||
from pySim.utils import Hexstr, SwHexstr, SwMatchstr
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.ts_102_221 import CardProfileUICC
|
||||
import pySim.global_platform
|
||||
|
||||
# SGP.02 Section 2.2.2
|
||||
@@ -557,43 +556,3 @@ class CardApplicationECASD(pySim.global_platform.CardApplicationSD):
|
||||
@with_default_category('Application-Specific Commands')
|
||||
class AddlShellCommands(CommandSet):
|
||||
pass
|
||||
|
||||
class CardProfileEuiccSGP32(CardProfileUICC):
|
||||
ORDER = 5
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(name='IoT eUICC (SGP.32)')
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
# try a command only supported by SGP.32
|
||||
scc.cla_byte = "00"
|
||||
scc.select_adf(AID_ISD_R)
|
||||
CardApplicationISDR.store_data_tlv(scc, GetCertsReq(), GetCertsResp)
|
||||
|
||||
class CardProfileEuiccSGP22(CardProfileUICC):
|
||||
ORDER = 6
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(name='Consumer eUICC (SGP.22)')
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
# try to read EID from ISD-R
|
||||
scc.cla_byte = "00"
|
||||
scc.select_adf(AID_ISD_R)
|
||||
eid = CardApplicationISDR.get_eid(scc)
|
||||
# TODO: Store EID identity?
|
||||
|
||||
class CardProfileEuiccSGP02(CardProfileUICC):
|
||||
ORDER = 7
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(name='M2M eUICC (SGP.02)')
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
scc.cla_byte = "00"
|
||||
scc.select_adf(AID_ECASD)
|
||||
scc.get_data(0x5a)
|
||||
# TODO: Store EID identity?
|
||||
|
||||
@@ -16,7 +16,7 @@ from pySim.legacy.ts_51_011 import EF, DF
|
||||
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map
|
||||
from pySim.legacy.ts_31_103 import EF_ISIM_ADF_map
|
||||
|
||||
from pySim.ts_51_011 import EF_AD, EF_SPN
|
||||
from pySim.profile.ts_51_011 import EF_AD, EF_SPN
|
||||
|
||||
def format_addr(addr: str, addr_type: str) -> str:
|
||||
"""
|
||||
|
||||
@@ -148,7 +148,7 @@ def dec_st(st, table="sim") -> str:
|
||||
from pySim.ts_31_102 import EF_UST_map
|
||||
lookup_map = EF_UST_map
|
||||
else:
|
||||
from pySim.ts_51_011 import EF_SST_map
|
||||
from pySim.profile.ts_51_011 import EF_SST_map
|
||||
lookup_map = EF_SST_map
|
||||
|
||||
st_bytes = [st[i:i+2] for i in range(0, len(st), 2)]
|
||||
|
||||
@@ -25,9 +25,9 @@ from osmocom.construct import *
|
||||
|
||||
from pySim.filesystem import *
|
||||
from pySim.profile import CardProfile, CardProfileAddon
|
||||
from pySim.ts_51_011 import CardProfileSIM
|
||||
from pySim.ts_51_011 import DF_TELECOM, DF_GSM
|
||||
from pySim.ts_51_011 import EF_ServiceTable
|
||||
from pySim.profile.ts_51_011 import CardProfileSIM
|
||||
from pySim.profile.ts_51_011 import DF_TELECOM, DF_GSM
|
||||
from pySim.profile.ts_51_011 import EF_ServiceTable
|
||||
|
||||
|
||||
# Mapping between CDMA Service Number and its description
|
||||
58
pySim/profile/euicc.py
Normal file
58
pySim/profile/euicc.py
Normal file
@@ -0,0 +1,58 @@
|
||||
# Copyright (C) 2023 Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||
from pySim.commands import SimCardCommands
|
||||
from pySim.euicc import CardApplicationISDR, AID_ISD_R, AID_ECASD, GetCertsReq, GetCertsResp
|
||||
|
||||
class CardProfileEuiccSGP32(CardProfileUICC):
|
||||
ORDER = 5
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(name='IoT eUICC (SGP.32)')
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
# try a command only supported by SGP.32
|
||||
scc.cla_byte = "00"
|
||||
scc.select_adf(AID_ISD_R)
|
||||
CardApplicationISDR.store_data_tlv(scc, GetCertsReq(), GetCertsResp)
|
||||
|
||||
class CardProfileEuiccSGP22(CardProfileUICC):
|
||||
ORDER = 6
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(name='Consumer eUICC (SGP.22)')
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
# try to read EID from ISD-R
|
||||
scc.cla_byte = "00"
|
||||
scc.select_adf(AID_ISD_R)
|
||||
eid = CardApplicationISDR.get_eid(scc)
|
||||
# TODO: Store EID identity?
|
||||
|
||||
class CardProfileEuiccSGP02(CardProfileUICC):
|
||||
ORDER = 7
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(name='M2M eUICC (SGP.02)')
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
scc.cla_byte = "00"
|
||||
scc.select_adf(AID_ECASD)
|
||||
scc.get_data(0x5a)
|
||||
# TODO: Store EID identity?
|
||||
394
pySim/profile/ts_102_221.py
Normal file
394
pySim/profile/ts_102_221.py
Normal file
@@ -0,0 +1,394 @@
|
||||
# coding=utf-8
|
||||
"""Card Profile of ETSI TS 102 221, the core UICC spec.
|
||||
|
||||
(C) 2021-2024 by Harald Welte <laforge@osmocom.org>
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
from construct import Struct, FlagsEnum, GreedyString
|
||||
|
||||
from osmocom.construct import *
|
||||
from osmocom.utils import *
|
||||
from osmocom.tlv import BER_TLV_IE
|
||||
|
||||
from pySim.utils import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.profile import CardProfile
|
||||
from pySim import iso7816_4
|
||||
from pySim.ts_102_221 import decode_select_response, ts_102_22x_cmdset
|
||||
from pySim.ts_102_221 import AM_DO_EF, SC_DO, AdditionalInterfacesSupport, AdditionalTermCapEuicc
|
||||
from pySim.ts_102_221 import TerminalPowerSupply, ExtendedLchanTerminalSupport, TerminalCapability
|
||||
|
||||
# A UICC will usually also support 2G functionality. If this is the case, we
|
||||
# need to add DF_GSM and DF_TELECOM along with the UICC related files
|
||||
from pySim.profile.ts_51_011 import AddonSIM, EF_ICCID, EF_PL
|
||||
from pySim.profile.gsm_r import AddonGSMR
|
||||
from pySim.profile.cdma_ruim import AddonRUIM
|
||||
|
||||
|
||||
# TS 102 221 Section 13.1
|
||||
class EF_DIR(LinFixedEF):
|
||||
_test_de_encode = [
|
||||
( '61294f10a0000000871002ffffffff890709000050055553696d31730ea00c80011781025f608203454150',
|
||||
{ "application_template": [ { "application_id": h2b("a0000000871002ffffffff8907090000") },
|
||||
{ "application_label": "USim1" },
|
||||
{ "discretionary_template": h2b("a00c80011781025f608203454150") } ] }
|
||||
),
|
||||
( '61194f10a0000000871004ffffffff890709000050054953696d31',
|
||||
{ "application_template": [ { "application_id": h2b("a0000000871004ffffffff8907090000") },
|
||||
{ "application_label": "ISim1" } ] }
|
||||
),
|
||||
]
|
||||
class ApplicationLabel(BER_TLV_IE, tag=0x50):
|
||||
# TODO: UCS-2 coding option as per Annex A of TS 102 221
|
||||
_construct = GreedyString('ascii')
|
||||
|
||||
# see https://github.com/PyCQA/pylint/issues/5794
|
||||
#pylint: disable=undefined-variable
|
||||
class ApplicationTemplate(BER_TLV_IE, tag=0x61,
|
||||
nested=[iso7816_4.ApplicationId, ApplicationLabel, iso7816_4.FileReference,
|
||||
iso7816_4.CommandApdu, iso7816_4.DiscretionaryData,
|
||||
iso7816_4.DiscretionaryTemplate, iso7816_4.URL,
|
||||
iso7816_4.ApplicationRelatedDOSet]):
|
||||
pass
|
||||
|
||||
def __init__(self, fid='2f00', sfid=0x1e, name='EF.DIR', desc='Application Directory'):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(5, 54))
|
||||
self._tlv = EF_DIR.ApplicationTemplate
|
||||
|
||||
|
||||
# TS 102 221 Section 13.4
|
||||
class EF_ARR(LinFixedEF):
|
||||
_test_de_encode = [
|
||||
( '800101a40683010a950108800106900080016097008401d4a40683010a950108',
|
||||
[ [ { "access_mode": [ "read_search_compare" ] },
|
||||
{ "control_reference_template": "ADM1" } ],
|
||||
[ { "access_mode": [ "write_append", "update_erase" ] },
|
||||
{ "always": None } ],
|
||||
[ { "access_mode": [ "delete_file", "terminate_ef" ] },
|
||||
{ "never": None } ],
|
||||
[ { "command_header": { "INS": 212 } },
|
||||
{ "control_reference_template": "ADM1" } ]
|
||||
] ),
|
||||
( '80010190008001029700800118a40683010a9501088401d4a40683010a950108',
|
||||
[ [ { "access_mode": [ "read_search_compare" ] },
|
||||
{ "always": None } ],
|
||||
[ { "access_mode": [ "update_erase" ] },
|
||||
{ "never": None } ],
|
||||
[ { "access_mode": [ "activate_file_or_record", "deactivate_file_or_record" ] },
|
||||
{ "control_reference_template": "ADM1" } ],
|
||||
[ { "command_header": { "INS": 212 } },
|
||||
{ "control_reference_template": "ADM1" } ]
|
||||
] ),
|
||||
]
|
||||
def __init__(self, fid='2f06', sfid=0x06, name='EF.ARR', desc='Access Rule Reference'):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc)
|
||||
# add those commands to the general commands of a TransparentEF
|
||||
self.shell_commands += [self.AddlShellCommands()]
|
||||
|
||||
@staticmethod
|
||||
def flatten(inp: list):
|
||||
"""Flatten the somewhat deep/complex/nested data returned from decoder."""
|
||||
def sc_abbreviate(sc):
|
||||
if 'always' in sc:
|
||||
return 'always'
|
||||
elif 'never' in sc:
|
||||
return 'never'
|
||||
elif 'control_reference_template' in sc:
|
||||
return sc['control_reference_template']
|
||||
else:
|
||||
return sc
|
||||
|
||||
by_mode = {}
|
||||
for t in inp:
|
||||
am = t[0]
|
||||
sc = t[1]
|
||||
sc_abbr = sc_abbreviate(sc)
|
||||
if 'access_mode' in am:
|
||||
for m in am['access_mode']:
|
||||
by_mode[m] = sc_abbr
|
||||
elif 'command_header' in am:
|
||||
ins = am['command_header']['INS']
|
||||
if 'CLA' in am['command_header']:
|
||||
cla = am['command_header']['CLA']
|
||||
else:
|
||||
cla = None
|
||||
cmd = ts_102_22x_cmdset.lookup(ins, cla)
|
||||
if cmd:
|
||||
name = cmd.name.lower().replace(' ', '_')
|
||||
by_mode[name] = sc_abbr
|
||||
else:
|
||||
raise ValueError
|
||||
else:
|
||||
raise ValueError
|
||||
return by_mode
|
||||
|
||||
def _decode_record_bin(self, raw_bin_data, **kwargs):
|
||||
# we can only guess if we should decode for EF or DF here :(
|
||||
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
||||
dec = arr_seq.decode_multi(raw_bin_data)
|
||||
# we cannot pass the result through flatten() here, as we don't have a related
|
||||
# 'un-flattening' decoder, and hence would be unable to encode :(
|
||||
return dec[0]
|
||||
|
||||
def _encode_record_bin(self, in_json, **kwargs):
|
||||
# we can only guess if we should decode for EF or DF here :(
|
||||
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
||||
return arr_seq.encode_multi(in_json)
|
||||
|
||||
@with_default_category('File-Specific Commands')
|
||||
class AddlShellCommands(CommandSet):
|
||||
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
|
||||
def do_read_arr_record(self, opts):
|
||||
"""Read one EF.ARR record in flattened, human-friendly form."""
|
||||
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
|
||||
data = self._cmd.lchan.selected_file.flatten(data)
|
||||
self._cmd.poutput_json(data, opts.oneline)
|
||||
|
||||
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_recs_dec_parser)
|
||||
def do_read_arr_records(self, opts):
|
||||
"""Read + decode all EF.ARR records in flattened, human-friendly form."""
|
||||
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
|
||||
# collect all results in list so they are rendered as JSON list when printing
|
||||
data_list = []
|
||||
for recnr in range(1, 1 + num_of_rec):
|
||||
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
|
||||
data = self._cmd.lchan.selected_file.flatten(data)
|
||||
data_list.append(data)
|
||||
self._cmd.poutput_json(data_list, opts.oneline)
|
||||
|
||||
|
||||
# TS 102 221 Section 13.6
|
||||
class EF_UMPC(TransparentEF):
|
||||
_test_de_encode = [
|
||||
( '3cff02', { "max_current_mA": 60, "t_op_s": 255,
|
||||
"addl_info": { "req_inc_idle_current": False, "support_uicc_suspend": True } } ),
|
||||
( '320500', { "max_current_mA": 50, "t_op_s": 5, "addl_info": {"req_inc_idle_current": False,
|
||||
"support_uicc_suspend": False } } ),
|
||||
]
|
||||
def __init__(self, fid='2f08', sfid=0x08, name='EF.UMPC', desc='UICC Maximum Power Consumption'):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=(5, 5))
|
||||
addl_info = FlagsEnum(Byte, req_inc_idle_current=1,
|
||||
support_uicc_suspend=2)
|
||||
self._construct = Struct(
|
||||
'max_current_mA'/Int8ub, 't_op_s'/Int8ub, 'addl_info'/addl_info)
|
||||
|
||||
|
||||
class CardProfileUICC(CardProfile):
|
||||
|
||||
ORDER = 10
|
||||
|
||||
def __init__(self, name='UICC'):
|
||||
files = [
|
||||
EF_DIR(),
|
||||
EF_ICCID(),
|
||||
EF_PL(),
|
||||
EF_ARR(),
|
||||
# FIXME: DF.CD
|
||||
EF_UMPC(),
|
||||
]
|
||||
addons = [
|
||||
AddonSIM,
|
||||
AddonGSMR,
|
||||
AddonRUIM,
|
||||
]
|
||||
sw = {
|
||||
'Normal': {
|
||||
'9000': 'Normal ending of the command',
|
||||
'91xx': 'Normal ending of the command, with extra information from the proactive UICC containing a command for the terminal',
|
||||
'92xx': 'Normal ending of the command, with extra information concerning an ongoing data transfer session',
|
||||
},
|
||||
'Postponed processing': {
|
||||
'9300': 'SIM Application Toolkit is busy. Command cannot be executed at present, further normal commands are allowed',
|
||||
},
|
||||
'Warnings': {
|
||||
'6200': 'No information given, state of non-volatile memory unchanged',
|
||||
'6281': 'Part of returned data may be corrupted',
|
||||
'6282': 'End of file/record reached before reading Le bytes or unsuccessful search',
|
||||
'6283': 'Selected file invalidated/disabled; needs to be activated before use',
|
||||
'6284': 'Selected file in termination state',
|
||||
'62f1': 'More data available',
|
||||
'62f2': 'More data available and proactive command pending',
|
||||
'62f3': 'Response data available',
|
||||
'63f1': 'More data expected',
|
||||
'63f2': 'More data expected and proactive command pending',
|
||||
'63cx': 'Command successful but after using an internal update retry routine X times',
|
||||
},
|
||||
'Execution errors': {
|
||||
'6400': 'No information given, state of non-volatile memory unchanged',
|
||||
'6500': 'No information given, state of non-volatile memory changed',
|
||||
'6581': 'Memory problem',
|
||||
},
|
||||
'Checking errors': {
|
||||
'6700': 'Wrong length',
|
||||
'67xx': 'The interpretation of this status word is command dependent',
|
||||
'6b00': 'Wrong parameter(s) P1-P2',
|
||||
'6d00': 'Instruction code not supported or invalid',
|
||||
'6e00': 'Class not supported',
|
||||
'6f00': 'Technical problem, no precise diagnosis',
|
||||
'6fxx': 'The interpretation of this status word is command dependent',
|
||||
},
|
||||
'Functions in CLA not supported': {
|
||||
'6800': 'No information given',
|
||||
'6881': 'Logical channel not supported',
|
||||
'6882': 'Secure messaging not supported',
|
||||
},
|
||||
'Command not allowed': {
|
||||
'6900': 'No information given',
|
||||
'6981': 'Command incompatible with file structure',
|
||||
'6982': 'Security status not satisfied',
|
||||
'6983': 'Authentication/PIN method blocked',
|
||||
'6984': 'Referenced data invalidated',
|
||||
'6985': 'Conditions of use not satisfied',
|
||||
'6986': 'Command not allowed (no EF selected)',
|
||||
'6989': 'Command not allowed - secure channel - security not satisfied',
|
||||
},
|
||||
'Wrong parameters': {
|
||||
'6a80': 'Incorrect parameters in the data field',
|
||||
'6a81': 'Function not supported',
|
||||
'6a82': 'File not found',
|
||||
'6a83': 'Record not found',
|
||||
'6a84': 'Not enough memory space',
|
||||
'6a86': 'Incorrect parameters P1 to P2',
|
||||
'6a87': 'Lc inconsistent with P1 to P2',
|
||||
'6a88': 'Referenced data not found',
|
||||
},
|
||||
'Application errors': {
|
||||
'9850': 'INCREASE cannot be performed, max value reached',
|
||||
'9862': 'Authentication error, application specific',
|
||||
'9863': 'Security session or association expired',
|
||||
'9864': 'Minimum UICC suspension time is too long',
|
||||
},
|
||||
}
|
||||
|
||||
super().__init__(name, desc='ETSI TS 102 221', cla="00",
|
||||
sel_ctrl="0004", files_in_mf=files, sw=sw,
|
||||
shell_cmdsets = [self.AddlShellCommands()], addons = addons)
|
||||
|
||||
@staticmethod
|
||||
def decode_select_response(data_hex: str) -> object:
|
||||
"""ETSI TS 102 221 Section 11.1.1.3"""
|
||||
return decode_select_response(data_hex)
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
""" Try to access MF via UICC APDUs (3GPP TS 102.221), if this works, the
|
||||
card is considered a UICC card."""
|
||||
cls._mf_select_test(scc, "00", "0004", ["3f00"])
|
||||
|
||||
@with_default_category('TS 102 221 Specific Commands')
|
||||
class AddlShellCommands(CommandSet):
|
||||
suspend_uicc_parser = argparse.ArgumentParser()
|
||||
suspend_uicc_parser.add_argument('--min-duration-secs', type=int, default=60,
|
||||
help='Proposed minimum duration of suspension')
|
||||
suspend_uicc_parser.add_argument('--max-duration-secs', type=int, default=24*60*60,
|
||||
help='Proposed maximum duration of suspension')
|
||||
|
||||
# not ISO7816-4 but TS 102 221
|
||||
@cmd2.with_argparser(suspend_uicc_parser)
|
||||
def do_suspend_uicc(self, opts):
|
||||
"""Perform the SUSPEND UICC command. Only supported on some UICC (check EF.UMPC)."""
|
||||
(duration, token, sw) = self._cmd.card._scc.suspend_uicc(min_len_secs=opts.min_duration_secs,
|
||||
max_len_secs=opts.max_duration_secs)
|
||||
self._cmd.poutput(
|
||||
'Negotiated Duration: %u secs, Token: %s, SW: %s' % (duration, token, sw))
|
||||
|
||||
resume_uicc_parser = argparse.ArgumentParser()
|
||||
resume_uicc_parser.add_argument('TOKEN', type=str, help='Token provided during SUSPEND')
|
||||
|
||||
@cmd2.with_argparser(resume_uicc_parser)
|
||||
def do_resume_uicc(self, opts):
|
||||
"""Perform the REUSME UICC operation. Only supported on some UICC. Also: A power-cycle
|
||||
of the card is required between SUSPEND and RESUME, and only very few non-RESUME
|
||||
commands are permitted between SUSPEND and RESUME. See TS 102 221 Section 11.1.22."""
|
||||
self._cmd.card._scc.resume_uicc(opts.TOKEN)
|
||||
|
||||
term_cap_parser = argparse.ArgumentParser()
|
||||
# power group
|
||||
tc_power_grp = term_cap_parser.add_argument_group('Terminal Power Supply')
|
||||
tc_power_grp.add_argument('--used-supply-voltage-class', type=str, choices=['a','b','c','d','e'],
|
||||
help='Actual used Supply voltage class')
|
||||
tc_power_grp.add_argument('--maximum-available-power-supply', type=auto_uint8,
|
||||
help='Maximum available power supply of the terminal')
|
||||
tc_power_grp.add_argument('--actual-used-freq-100k', type=auto_uint8,
|
||||
help='Actual used clock frequency (in units of 100kHz)')
|
||||
# no separate groups for those two
|
||||
tc_elc_grp = term_cap_parser.add_argument_group('Extended logical channels terminal support')
|
||||
tc_elc_grp.add_argument('--extended-logical-channel', action='store_true',
|
||||
help='Extended Logical Channel supported')
|
||||
tc_aif_grp = term_cap_parser.add_argument_group('Additional interfaces support')
|
||||
tc_aif_grp.add_argument('--uicc-clf', action='store_true',
|
||||
help='Local User Interface in the Device (LUId) supported')
|
||||
# eUICC group
|
||||
tc_euicc_grp = term_cap_parser.add_argument_group('Additional Terminal capability indications related to eUICC')
|
||||
tc_euicc_grp.add_argument('--lui-d', action='store_true',
|
||||
help='Local User Interface in the Device (LUId) supported')
|
||||
tc_euicc_grp.add_argument('--lpd-d', action='store_true',
|
||||
help='Local Profile Download in the Device (LPDd) supported')
|
||||
tc_euicc_grp.add_argument('--lds-d', action='store_true',
|
||||
help='Local Discovery Service in the Device (LPDd) supported')
|
||||
tc_euicc_grp.add_argument('--lui-e-scws', action='store_true',
|
||||
help='LUIe based on SCWS supported')
|
||||
tc_euicc_grp.add_argument('--metadata-update-alerting', action='store_true',
|
||||
help='Metadata update alerting supported')
|
||||
tc_euicc_grp.add_argument('--enterprise-capable-device', action='store_true',
|
||||
help='Enterprise Capable Device')
|
||||
tc_euicc_grp.add_argument('--lui-e-e4e', action='store_true',
|
||||
help='LUIe using E4E (ENVELOPE tag E4) supported')
|
||||
tc_euicc_grp.add_argument('--lpr', action='store_true',
|
||||
help='LPR (LPA Proxy) supported')
|
||||
|
||||
@cmd2.with_argparser(term_cap_parser)
|
||||
def do_terminal_capability(self, opts):
|
||||
"""Perform the TERMINAL CAPABILITY function. Used to inform the UICC about terminal capability."""
|
||||
ps_flags = {}
|
||||
addl_if_flags = {}
|
||||
euicc_flags = {}
|
||||
|
||||
opts_dict = vars(opts)
|
||||
|
||||
power_items = ['used_supply_voltage_class', 'maximum_available_power_supply', 'actual_used_freq_100k']
|
||||
if any(opts_dict[x] for x in power_items):
|
||||
if not all(opts_dict[x] for x in power_items):
|
||||
raise argparse.ArgumentTypeError('If any of the Terminal Power Supply group options are used, all must be specified')
|
||||
|
||||
for k, v in opts_dict.items():
|
||||
if k in AdditionalInterfacesSupport._construct.flags.keys():
|
||||
addl_if_flags[k] = v
|
||||
elif k in AdditionalTermCapEuicc._construct.flags.keys():
|
||||
euicc_flags[k] = v
|
||||
elif k in [f.name for f in TerminalPowerSupply._construct.subcons]:
|
||||
if k == 'used_supply_voltage_class' and v:
|
||||
v = {v: True}
|
||||
ps_flags[k] = v
|
||||
|
||||
child_list = []
|
||||
if any(x for x in ps_flags.values()):
|
||||
child_list.append(TerminalPowerSupply(decoded=ps_flags))
|
||||
|
||||
if opts.extended_logical_channel:
|
||||
child_list.append(ExtendedLchanTerminalSupport())
|
||||
if any(x for x in addl_if_flags.values()):
|
||||
child_list.append(AdditionalInterfacesSupport(decoded=addl_if_flags))
|
||||
if any(x for x in euicc_flags.values()):
|
||||
child_list.append(AdditionalTermCapEuicc(decoded=euicc_flags))
|
||||
|
||||
print(child_list)
|
||||
tc = TerminalCapability(children=child_list)
|
||||
self.terminal_capability(b2h(tc.to_tlv()))
|
||||
|
||||
def terminal_capability(self, data:Hexstr):
|
||||
cmd_hex = "80AA0000%02x%s" % (len(data)//2, data)
|
||||
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
|
||||
@@ -43,7 +43,7 @@ from pySim.utils import dec_iccid, enc_iccid, dec_imsi, enc_imsi, dec_plmn, enc_
|
||||
from pySim.profile import CardProfile, CardProfileAddon
|
||||
from pySim.filesystem import *
|
||||
from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X
|
||||
from pySim.gsm_r import AddonGSMR
|
||||
from pySim.profile.gsm_r import AddonGSMR
|
||||
|
||||
# Mapping between SIM Service Number and its description
|
||||
EF_SST_map = {
|
||||
@@ -25,6 +25,7 @@ from osmocom.utils import *
|
||||
from osmocom.construct import *
|
||||
|
||||
from pySim.filesystem import *
|
||||
from pySim.profile.ts_102_221 import CardProfileUICC
|
||||
from pySim.runtime import RuntimeState
|
||||
import pySim
|
||||
|
||||
@@ -180,7 +181,7 @@ class DF_SYSTEM(CardDF):
|
||||
self.add_files(files)
|
||||
|
||||
def decode_select_response(self, resp_hex):
|
||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(resp_hex)
|
||||
return CardProfileUICC.decode_select_response(resp_hex)
|
||||
|
||||
|
||||
class EF_USIM_SQN(TransparentEF):
|
||||
|
||||
@@ -328,11 +328,13 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
||||
from pySim.transport.pcsc import PcscSimLink
|
||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
from pySim.transport.calypso import CalypsoSimLink
|
||||
from pySim.transport.wsrc import WsrcSimLink
|
||||
|
||||
SerialSimLink.argparse_add_reader_args(arg_parser)
|
||||
PcscSimLink.argparse_add_reader_args(arg_parser)
|
||||
ModemATCommandLink.argparse_add_reader_args(arg_parser)
|
||||
CalypsoSimLink.argparse_add_reader_args(arg_parser)
|
||||
WsrcSimLink.argparse_add_reader_args(arg_parser)
|
||||
arg_parser.add_argument('--apdu-trace', action='store_true',
|
||||
help='Trace the command/response APDUs exchanged with the card')
|
||||
|
||||
@@ -355,6 +357,9 @@ def init_reader(opts, **kwargs) -> LinkBase:
|
||||
elif opts.modem_dev is not None:
|
||||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
sl = ModemATCommandLink(opts, **kwargs)
|
||||
elif opts.wsrc_server_url is not None:
|
||||
from pySim.transport.wsrc import WsrcSimLink
|
||||
sl = WsrcSimLink(opts, **kwargs)
|
||||
else: # Serial reader is default
|
||||
print("No reader/driver specified; falling back to default (Serial reader)")
|
||||
from pySim.transport.serial import SerialSimLink
|
||||
|
||||
99
pySim/transport/wsrc.py
Normal file
99
pySim/transport/wsrc.py
Normal file
@@ -0,0 +1,99 @@
|
||||
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import argparse
|
||||
from typing import Optional
|
||||
|
||||
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
|
||||
|
||||
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
|
||||
from pySim.transport import LinkBase
|
||||
from pySim.utils import ResTuple
|
||||
from pySim.wsrc import WSRC_DEFAULT_PORT_USER
|
||||
from pySim.wsrc.client_blocking import WsClientBlocking
|
||||
|
||||
class UserWsClientBlocking(WsClientBlocking):
|
||||
def __init__(self, ws_uri: str, **kwargs):
|
||||
super().__init__('user', ws_uri, **kwargs)
|
||||
|
||||
def select_card(self, id_type:str, id_str:str):
|
||||
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
|
||||
'select_card_ack')
|
||||
return rx
|
||||
|
||||
def reset_card(self):
|
||||
self.transceive_json('reset_req', {}, 'reset_resp')
|
||||
|
||||
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
|
||||
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
|
||||
return rx['response'], rx['sw']
|
||||
|
||||
|
||||
class WsrcSimLink(LinkBase):
|
||||
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
|
||||
name = 'WSRC'
|
||||
|
||||
def __init__(self, opts: argparse.Namespace, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.identities = {}
|
||||
self.server_url = opts.wsrc_server_url
|
||||
if opts.wsrc_eid:
|
||||
self.id_type = 'eid'
|
||||
self.id_str = opts.wsrc_eid
|
||||
elif opts.wsrc_iccid:
|
||||
self.id_type = 'iccid'
|
||||
self.id_str = opts.wsrc_iccid
|
||||
self.client = UserWsClientBlocking(self.server_url)
|
||||
self.client.connect()
|
||||
|
||||
def __del__(self):
|
||||
# FIXME: disconnect from server
|
||||
pass
|
||||
|
||||
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
|
||||
self.connect()
|
||||
|
||||
def connect(self):
|
||||
rx = self.client.select_card(self.id_type, self.id_str)
|
||||
self.identities = rx['identities']
|
||||
|
||||
def get_atr(self) -> Hexstr:
|
||||
return h2i(self.identities['ATR'])
|
||||
|
||||
def disconnect(self):
|
||||
self.__delete__()
|
||||
|
||||
def _reset_card(self):
|
||||
self.client.reset_card()
|
||||
return 1
|
||||
|
||||
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
|
||||
return self.client.xceive_apdu_raw(pdu)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
|
||||
|
||||
@staticmethod
|
||||
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
|
||||
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
|
||||
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
|
||||
can be accessed via a network.""")
|
||||
wsrc_group.add_argument('--wsrc-server-url', default='ws://localhost:%u' % WSRC_DEFAULT_PORT_USER,
|
||||
help='URI of the WSRC server to connect to')
|
||||
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
|
||||
help='ICCID of the card to open via WSRC')
|
||||
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
|
||||
help='EID of the card to open via WSRC')
|
||||
@@ -18,22 +18,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
"""
|
||||
from bidict import bidict
|
||||
|
||||
from construct import Select, Const, Bit, Struct, Int16ub, FlagsEnum, GreedyString, ValidationError
|
||||
from construct import Select, Const, Bit, Struct, Int16ub, FlagsEnum, ValidationError
|
||||
from construct import Optional as COptional, Computed
|
||||
|
||||
from osmocom.construct import *
|
||||
from osmocom.utils import *
|
||||
from osmocom.tlv import *
|
||||
from osmocom.tlv import BER_TLV_IE, flatten_dict_lists
|
||||
from pySim.utils import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.profile import CardProfile
|
||||
from pySim import iso7816_4
|
||||
|
||||
# A UICC will usually also support 2G functionality. If this is the case, we
|
||||
# need to add DF_GSM and DF_TELECOM along with the UICC related files
|
||||
from pySim.ts_51_011 import AddonSIM, EF_ICCID, EF_PL
|
||||
from pySim.gsm_r import AddonGSMR
|
||||
from pySim.cdma_ruim import AddonRUIM
|
||||
#from pySim.filesystem import *
|
||||
#from pySim.profile import CardProfile
|
||||
|
||||
ts_102_22x_cmdset = CardCommandSet('TS 102 22x', [
|
||||
# TS 102 221 Section 10.1.2 Table 10.5 "Coding of Instruction Byte"
|
||||
@@ -288,6 +282,12 @@ class FcpTemplate(BER_TLV_IE, tag=0x62, nested=[FileSize, TotalFileSize, FileDes
|
||||
PinStatusTemplate_DO]):
|
||||
pass
|
||||
|
||||
def decode_select_response(data_hex: str) -> object:
|
||||
"""ETSI TS 102 221 Section 11.1.1.3"""
|
||||
t = FcpTemplate()
|
||||
t.from_tlv(h2b(data_hex))
|
||||
d = t.to_dict()
|
||||
return flatten_dict_lists(d['fcp_template'])
|
||||
|
||||
def tlv_key_replace(inmap, indata):
|
||||
def newkey(inmap, key):
|
||||
@@ -634,361 +634,3 @@ NOT_DO = Nested_DO('not', 0xaf, NOT_Template)
|
||||
SC_DO = DataObjectChoice('security_condition', 'Security Condition',
|
||||
members=[Always_DO, Never_DO, SecCondByte_DO(), SecCondByte_DO(0x9e), CRT_DO(),
|
||||
OR_DO, AND_DO, NOT_DO])
|
||||
|
||||
# TS 102 221 Section 13.1
|
||||
class EF_DIR(LinFixedEF):
|
||||
_test_de_encode = [
|
||||
( '61294f10a0000000871002ffffffff890709000050055553696d31730ea00c80011781025f608203454150',
|
||||
{ "application_template": [ { "application_id": h2b("a0000000871002ffffffff8907090000") },
|
||||
{ "application_label": "USim1" },
|
||||
{ "discretionary_template": h2b("a00c80011781025f608203454150") } ] }
|
||||
),
|
||||
( '61194f10a0000000871004ffffffff890709000050054953696d31',
|
||||
{ "application_template": [ { "application_id": h2b("a0000000871004ffffffff8907090000") },
|
||||
{ "application_label": "ISim1" } ] }
|
||||
),
|
||||
]
|
||||
class ApplicationLabel(BER_TLV_IE, tag=0x50):
|
||||
# TODO: UCS-2 coding option as per Annex A of TS 102 221
|
||||
_construct = GreedyString('ascii')
|
||||
|
||||
# see https://github.com/PyCQA/pylint/issues/5794
|
||||
#pylint: disable=undefined-variable
|
||||
class ApplicationTemplate(BER_TLV_IE, tag=0x61,
|
||||
nested=[iso7816_4.ApplicationId, ApplicationLabel, iso7816_4.FileReference,
|
||||
iso7816_4.CommandApdu, iso7816_4.DiscretionaryData,
|
||||
iso7816_4.DiscretionaryTemplate, iso7816_4.URL,
|
||||
iso7816_4.ApplicationRelatedDOSet]):
|
||||
pass
|
||||
|
||||
def __init__(self, fid='2f00', sfid=0x1e, name='EF.DIR', desc='Application Directory'):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(5, 54))
|
||||
self._tlv = EF_DIR.ApplicationTemplate
|
||||
|
||||
|
||||
# TS 102 221 Section 13.4
|
||||
class EF_ARR(LinFixedEF):
|
||||
_test_de_encode = [
|
||||
( '800101a40683010a950108800106900080016097008401d4a40683010a950108',
|
||||
[ [ { "access_mode": [ "read_search_compare" ] },
|
||||
{ "control_reference_template": "ADM1" } ],
|
||||
[ { "access_mode": [ "write_append", "update_erase" ] },
|
||||
{ "always": None } ],
|
||||
[ { "access_mode": [ "delete_file", "terminate_ef" ] },
|
||||
{ "never": None } ],
|
||||
[ { "command_header": { "INS": 212 } },
|
||||
{ "control_reference_template": "ADM1" } ]
|
||||
] ),
|
||||
( '80010190008001029700800118a40683010a9501088401d4a40683010a950108',
|
||||
[ [ { "access_mode": [ "read_search_compare" ] },
|
||||
{ "always": None } ],
|
||||
[ { "access_mode": [ "update_erase" ] },
|
||||
{ "never": None } ],
|
||||
[ { "access_mode": [ "activate_file_or_record", "deactivate_file_or_record" ] },
|
||||
{ "control_reference_template": "ADM1" } ],
|
||||
[ { "command_header": { "INS": 212 } },
|
||||
{ "control_reference_template": "ADM1" } ]
|
||||
] ),
|
||||
]
|
||||
def __init__(self, fid='2f06', sfid=0x06, name='EF.ARR', desc='Access Rule Reference'):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc)
|
||||
# add those commands to the general commands of a TransparentEF
|
||||
self.shell_commands += [self.AddlShellCommands()]
|
||||
|
||||
@staticmethod
|
||||
def flatten(inp: list):
|
||||
"""Flatten the somewhat deep/complex/nested data returned from decoder."""
|
||||
def sc_abbreviate(sc):
|
||||
if 'always' in sc:
|
||||
return 'always'
|
||||
elif 'never' in sc:
|
||||
return 'never'
|
||||
elif 'control_reference_template' in sc:
|
||||
return sc['control_reference_template']
|
||||
else:
|
||||
return sc
|
||||
|
||||
by_mode = {}
|
||||
for t in inp:
|
||||
am = t[0]
|
||||
sc = t[1]
|
||||
sc_abbr = sc_abbreviate(sc)
|
||||
if 'access_mode' in am:
|
||||
for m in am['access_mode']:
|
||||
by_mode[m] = sc_abbr
|
||||
elif 'command_header' in am:
|
||||
ins = am['command_header']['INS']
|
||||
if 'CLA' in am['command_header']:
|
||||
cla = am['command_header']['CLA']
|
||||
else:
|
||||
cla = None
|
||||
cmd = ts_102_22x_cmdset.lookup(ins, cla)
|
||||
if cmd:
|
||||
name = cmd.name.lower().replace(' ', '_')
|
||||
by_mode[name] = sc_abbr
|
||||
else:
|
||||
raise ValueError
|
||||
else:
|
||||
raise ValueError
|
||||
return by_mode
|
||||
|
||||
def _decode_record_bin(self, raw_bin_data, **kwargs):
|
||||
# we can only guess if we should decode for EF or DF here :(
|
||||
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
||||
dec = arr_seq.decode_multi(raw_bin_data)
|
||||
# we cannot pass the result through flatten() here, as we don't have a related
|
||||
# 'un-flattening' decoder, and hence would be unable to encode :(
|
||||
return dec[0]
|
||||
|
||||
def _encode_record_bin(self, in_json, **kwargs):
|
||||
# we can only guess if we should decode for EF or DF here :(
|
||||
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
|
||||
return arr_seq.encode_multi(in_json)
|
||||
|
||||
@with_default_category('File-Specific Commands')
|
||||
class AddlShellCommands(CommandSet):
|
||||
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
|
||||
def do_read_arr_record(self, opts):
|
||||
"""Read one EF.ARR record in flattened, human-friendly form."""
|
||||
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
|
||||
data = self._cmd.lchan.selected_file.flatten(data)
|
||||
self._cmd.poutput_json(data, opts.oneline)
|
||||
|
||||
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_recs_dec_parser)
|
||||
def do_read_arr_records(self, opts):
|
||||
"""Read + decode all EF.ARR records in flattened, human-friendly form."""
|
||||
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
|
||||
# collect all results in list so they are rendered as JSON list when printing
|
||||
data_list = []
|
||||
for recnr in range(1, 1 + num_of_rec):
|
||||
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
|
||||
data = self._cmd.lchan.selected_file.flatten(data)
|
||||
data_list.append(data)
|
||||
self._cmd.poutput_json(data_list, opts.oneline)
|
||||
|
||||
|
||||
# TS 102 221 Section 13.6
|
||||
class EF_UMPC(TransparentEF):
|
||||
_test_de_encode = [
|
||||
( '3cff02', { "max_current_mA": 60, "t_op_s": 255,
|
||||
"addl_info": { "req_inc_idle_current": False, "support_uicc_suspend": True } } ),
|
||||
( '320500', { "max_current_mA": 50, "t_op_s": 5, "addl_info": {"req_inc_idle_current": False,
|
||||
"support_uicc_suspend": False } } ),
|
||||
]
|
||||
def __init__(self, fid='2f08', sfid=0x08, name='EF.UMPC', desc='UICC Maximum Power Consumption'):
|
||||
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=(5, 5))
|
||||
addl_info = FlagsEnum(Byte, req_inc_idle_current=1,
|
||||
support_uicc_suspend=2)
|
||||
self._construct = Struct(
|
||||
'max_current_mA'/Int8ub, 't_op_s'/Int8ub, 'addl_info'/addl_info)
|
||||
|
||||
|
||||
class CardProfileUICC(CardProfile):
|
||||
|
||||
ORDER = 10
|
||||
|
||||
def __init__(self, name='UICC'):
|
||||
files = [
|
||||
EF_DIR(),
|
||||
EF_ICCID(),
|
||||
EF_PL(),
|
||||
EF_ARR(),
|
||||
# FIXME: DF.CD
|
||||
EF_UMPC(),
|
||||
]
|
||||
addons = [
|
||||
AddonSIM,
|
||||
AddonGSMR,
|
||||
AddonRUIM,
|
||||
]
|
||||
sw = {
|
||||
'Normal': {
|
||||
'9000': 'Normal ending of the command',
|
||||
'91xx': 'Normal ending of the command, with extra information from the proactive UICC containing a command for the terminal',
|
||||
'92xx': 'Normal ending of the command, with extra information concerning an ongoing data transfer session',
|
||||
},
|
||||
'Postponed processing': {
|
||||
'9300': 'SIM Application Toolkit is busy. Command cannot be executed at present, further normal commands are allowed',
|
||||
},
|
||||
'Warnings': {
|
||||
'6200': 'No information given, state of non-volatile memory unchanged',
|
||||
'6281': 'Part of returned data may be corrupted',
|
||||
'6282': 'End of file/record reached before reading Le bytes or unsuccessful search',
|
||||
'6283': 'Selected file invalidated/disabled; needs to be activated before use',
|
||||
'6284': 'Selected file in termination state',
|
||||
'62f1': 'More data available',
|
||||
'62f2': 'More data available and proactive command pending',
|
||||
'62f3': 'Response data available',
|
||||
'63f1': 'More data expected',
|
||||
'63f2': 'More data expected and proactive command pending',
|
||||
'63cx': 'Command successful but after using an internal update retry routine X times',
|
||||
},
|
||||
'Execution errors': {
|
||||
'6400': 'No information given, state of non-volatile memory unchanged',
|
||||
'6500': 'No information given, state of non-volatile memory changed',
|
||||
'6581': 'Memory problem',
|
||||
},
|
||||
'Checking errors': {
|
||||
'6700': 'Wrong length',
|
||||
'67xx': 'The interpretation of this status word is command dependent',
|
||||
'6b00': 'Wrong parameter(s) P1-P2',
|
||||
'6d00': 'Instruction code not supported or invalid',
|
||||
'6e00': 'Class not supported',
|
||||
'6f00': 'Technical problem, no precise diagnosis',
|
||||
'6fxx': 'The interpretation of this status word is command dependent',
|
||||
},
|
||||
'Functions in CLA not supported': {
|
||||
'6800': 'No information given',
|
||||
'6881': 'Logical channel not supported',
|
||||
'6882': 'Secure messaging not supported',
|
||||
},
|
||||
'Command not allowed': {
|
||||
'6900': 'No information given',
|
||||
'6981': 'Command incompatible with file structure',
|
||||
'6982': 'Security status not satisfied',
|
||||
'6983': 'Authentication/PIN method blocked',
|
||||
'6984': 'Referenced data invalidated',
|
||||
'6985': 'Conditions of use not satisfied',
|
||||
'6986': 'Command not allowed (no EF selected)',
|
||||
'6989': 'Command not allowed - secure channel - security not satisfied',
|
||||
},
|
||||
'Wrong parameters': {
|
||||
'6a80': 'Incorrect parameters in the data field',
|
||||
'6a81': 'Function not supported',
|
||||
'6a82': 'File not found',
|
||||
'6a83': 'Record not found',
|
||||
'6a84': 'Not enough memory space',
|
||||
'6a86': 'Incorrect parameters P1 to P2',
|
||||
'6a87': 'Lc inconsistent with P1 to P2',
|
||||
'6a88': 'Referenced data not found',
|
||||
},
|
||||
'Application errors': {
|
||||
'9850': 'INCREASE cannot be performed, max value reached',
|
||||
'9862': 'Authentication error, application specific',
|
||||
'9863': 'Security session or association expired',
|
||||
'9864': 'Minimum UICC suspension time is too long',
|
||||
},
|
||||
}
|
||||
|
||||
super().__init__(name, desc='ETSI TS 102 221', cla="00",
|
||||
sel_ctrl="0004", files_in_mf=files, sw=sw,
|
||||
shell_cmdsets = [self.AddlShellCommands()], addons = addons)
|
||||
|
||||
@staticmethod
|
||||
def decode_select_response(data_hex: str) -> object:
|
||||
"""ETSI TS 102 221 Section 11.1.1.3"""
|
||||
t = FcpTemplate()
|
||||
t.from_tlv(h2b(data_hex))
|
||||
d = t.to_dict()
|
||||
return flatten_dict_lists(d['fcp_template'])
|
||||
|
||||
@classmethod
|
||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||
""" Try to access MF via UICC APDUs (3GPP TS 102.221), if this works, the
|
||||
card is considered a UICC card."""
|
||||
cls._mf_select_test(scc, "00", "0004", ["3f00"])
|
||||
|
||||
@with_default_category('TS 102 221 Specific Commands')
|
||||
class AddlShellCommands(CommandSet):
|
||||
suspend_uicc_parser = argparse.ArgumentParser()
|
||||
suspend_uicc_parser.add_argument('--min-duration-secs', type=int, default=60,
|
||||
help='Proposed minimum duration of suspension')
|
||||
suspend_uicc_parser.add_argument('--max-duration-secs', type=int, default=24*60*60,
|
||||
help='Proposed maximum duration of suspension')
|
||||
|
||||
# not ISO7816-4 but TS 102 221
|
||||
@cmd2.with_argparser(suspend_uicc_parser)
|
||||
def do_suspend_uicc(self, opts):
|
||||
"""Perform the SUSPEND UICC command. Only supported on some UICC (check EF.UMPC)."""
|
||||
(duration, token, sw) = self._cmd.card._scc.suspend_uicc(min_len_secs=opts.min_duration_secs,
|
||||
max_len_secs=opts.max_duration_secs)
|
||||
self._cmd.poutput(
|
||||
'Negotiated Duration: %u secs, Token: %s, SW: %s' % (duration, token, sw))
|
||||
|
||||
resume_uicc_parser = argparse.ArgumentParser()
|
||||
resume_uicc_parser.add_argument('TOKEN', type=str, help='Token provided during SUSPEND')
|
||||
|
||||
@cmd2.with_argparser(resume_uicc_parser)
|
||||
def do_resume_uicc(self, opts):
|
||||
"""Perform the REUSME UICC operation. Only supported on some UICC. Also: A power-cycle
|
||||
of the card is required between SUSPEND and RESUME, and only very few non-RESUME
|
||||
commands are permitted between SUSPEND and RESUME. See TS 102 221 Section 11.1.22."""
|
||||
self._cmd.card._scc.resume_uicc(opts.TOKEN)
|
||||
|
||||
term_cap_parser = argparse.ArgumentParser()
|
||||
# power group
|
||||
tc_power_grp = term_cap_parser.add_argument_group('Terminal Power Supply')
|
||||
tc_power_grp.add_argument('--used-supply-voltage-class', type=str, choices=['a','b','c','d','e'],
|
||||
help='Actual used Supply voltage class')
|
||||
tc_power_grp.add_argument('--maximum-available-power-supply', type=auto_uint8,
|
||||
help='Maximum available power supply of the terminal')
|
||||
tc_power_grp.add_argument('--actual-used-freq-100k', type=auto_uint8,
|
||||
help='Actual used clock frequency (in units of 100kHz)')
|
||||
# no separate groups for those two
|
||||
tc_elc_grp = term_cap_parser.add_argument_group('Extended logical channels terminal support')
|
||||
tc_elc_grp.add_argument('--extended-logical-channel', action='store_true',
|
||||
help='Extended Logical Channel supported')
|
||||
tc_aif_grp = term_cap_parser.add_argument_group('Additional interfaces support')
|
||||
tc_aif_grp.add_argument('--uicc-clf', action='store_true',
|
||||
help='Local User Interface in the Device (LUId) supported')
|
||||
# eUICC group
|
||||
tc_euicc_grp = term_cap_parser.add_argument_group('Additional Terminal capability indications related to eUICC')
|
||||
tc_euicc_grp.add_argument('--lui-d', action='store_true',
|
||||
help='Local User Interface in the Device (LUId) supported')
|
||||
tc_euicc_grp.add_argument('--lpd-d', action='store_true',
|
||||
help='Local Profile Download in the Device (LPDd) supported')
|
||||
tc_euicc_grp.add_argument('--lds-d', action='store_true',
|
||||
help='Local Discovery Service in the Device (LPDd) supported')
|
||||
tc_euicc_grp.add_argument('--lui-e-scws', action='store_true',
|
||||
help='LUIe based on SCWS supported')
|
||||
tc_euicc_grp.add_argument('--metadata-update-alerting', action='store_true',
|
||||
help='Metadata update alerting supported')
|
||||
tc_euicc_grp.add_argument('--enterprise-capable-device', action='store_true',
|
||||
help='Enterprise Capable Device')
|
||||
tc_euicc_grp.add_argument('--lui-e-e4e', action='store_true',
|
||||
help='LUIe using E4E (ENVELOPE tag E4) supported')
|
||||
tc_euicc_grp.add_argument('--lpr', action='store_true',
|
||||
help='LPR (LPA Proxy) supported')
|
||||
|
||||
@cmd2.with_argparser(term_cap_parser)
|
||||
def do_terminal_capability(self, opts):
|
||||
"""Perform the TERMINAL CAPABILITY function. Used to inform the UICC about terminal capability."""
|
||||
ps_flags = {}
|
||||
addl_if_flags = {}
|
||||
euicc_flags = {}
|
||||
|
||||
opts_dict = vars(opts)
|
||||
|
||||
power_items = ['used_supply_voltage_class', 'maximum_available_power_supply', 'actual_used_freq_100k']
|
||||
if any(opts_dict[x] for x in power_items):
|
||||
if not all(opts_dict[x] for x in power_items):
|
||||
raise argparse.ArgumentTypeError('If any of the Terminal Power Supply group options are used, all must be specified')
|
||||
|
||||
for k, v in opts_dict.items():
|
||||
if k in AdditionalInterfacesSupport._construct.flags.keys():
|
||||
addl_if_flags[k] = v
|
||||
elif k in AdditionalTermCapEuicc._construct.flags.keys():
|
||||
euicc_flags[k] = v
|
||||
elif k in [f.name for f in TerminalPowerSupply._construct.subcons]:
|
||||
if k == 'used_supply_voltage_class' and v:
|
||||
v = {v: True}
|
||||
ps_flags[k] = v
|
||||
|
||||
child_list = []
|
||||
if any(x for x in ps_flags.values()):
|
||||
child_list.append(TerminalPowerSupply(decoded=ps_flags))
|
||||
|
||||
if opts.extended_logical_channel:
|
||||
child_list.append(ExtendedLchanTerminalSupport())
|
||||
if any(x for x in addl_if_flags.values()):
|
||||
child_list.append(AdditionalInterfacesSupport(decoded=addl_if_flags))
|
||||
if any(x for x in euicc_flags.values()):
|
||||
child_list.append(AdditionalTermCapEuicc(decoded=euicc_flags))
|
||||
|
||||
print(child_list)
|
||||
tc = TerminalCapability(children=child_list)
|
||||
self.terminal_capability(b2h(tc.to_tlv()))
|
||||
|
||||
def terminal_capability(self, data:Hexstr):
|
||||
cmd_hex = "80AA0000%02x%s" % (len(data)//2, data)
|
||||
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
|
||||
|
||||
@@ -36,14 +36,13 @@ from osmocom.utils import is_hexstr
|
||||
from osmocom.tlv import *
|
||||
from osmocom.construct import *
|
||||
|
||||
import pySim.ts_102_221
|
||||
from pySim.ts_51_011 import EF_ACMmax, EF_AAeM, EF_eMLPP, EF_CMI, EF_PNN
|
||||
from pySim.ts_51_011 import EF_MMSN, EF_MMSICP, EF_MMSUP, EF_MMSUCP, EF_VGCS, EF_VGCSS, EF_NIA
|
||||
from pySim.ts_51_011 import EF_SMSR, EF_DCK, EF_EXT, EF_CNL, EF_OPL, EF_MBI, EF_MWIS
|
||||
from pySim.ts_51_011 import EF_CBMID, EF_CBMIR, EF_ADN, EF_CFIS, EF_SMS, EF_MSISDN, EF_SMSP, EF_SMSS
|
||||
from pySim.ts_51_011 import EF_IMSI, EF_xPLMNwAcT, EF_SPN, EF_CBMI, EF_ACC, EF_PLMNsel
|
||||
from pySim.ts_51_011 import EF_Kc, EF_CPBCCH, EF_InvScan
|
||||
from pySim.ts_102_221 import EF_ARR
|
||||
from pySim.profile.ts_51_011 import EF_ACMmax, EF_AAeM, EF_eMLPP, EF_CMI, EF_PNN
|
||||
from pySim.profile.ts_51_011 import EF_MMSN, EF_MMSICP, EF_MMSUP, EF_MMSUCP, EF_VGCS, EF_VGCSS, EF_NIA
|
||||
from pySim.profile.ts_51_011 import EF_SMSR, EF_DCK, EF_EXT, EF_CNL, EF_OPL, EF_MBI, EF_MWIS
|
||||
from pySim.profile.ts_51_011 import EF_CBMID, EF_CBMIR, EF_ADN, EF_CFIS, EF_SMS, EF_MSISDN, EF_SMSP, EF_SMSS
|
||||
from pySim.profile.ts_51_011 import EF_IMSI, EF_xPLMNwAcT, EF_SPN, EF_CBMI, EF_ACC, EF_PLMNsel
|
||||
from pySim.profile.ts_51_011 import EF_Kc, EF_CPBCCH, EF_InvScan
|
||||
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
|
||||
from pySim.filesystem import *
|
||||
from pySim.ts_31_102_telecom import DF_PHONEBOOK, EF_UServiceTable
|
||||
from pySim.ts_31_103_shared import EF_IMSConfigData, EF_XCAPConfigData, EF_MuDMiDConfigData
|
||||
@@ -1750,7 +1749,7 @@ class ADF_USIM(CardADF):
|
||||
self.add_files(files)
|
||||
|
||||
def decode_select_response(self, data_hex):
|
||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
|
||||
return CardProfileUICC.decode_select_response(data_hex)
|
||||
|
||||
@with_default_category('Application-Specific Commands')
|
||||
class AddlShellCommands(CommandSet):
|
||||
|
||||
@@ -27,12 +27,11 @@ from osmocom.utils import *
|
||||
from osmocom.tlv import *
|
||||
from osmocom.construct import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.ts_51_011 import EF_AD, EF_SMS, EF_SMSS, EF_SMSR, EF_SMSP
|
||||
from pySim.profile.ts_51_011 import EF_AD, EF_SMS, EF_SMSS, EF_SMSR, EF_SMSP
|
||||
from pySim.ts_31_102 import ADF_USIM, EF_FromPreferred
|
||||
from pySim.ts_31_102_telecom import EF_UServiceTable
|
||||
from pySim.ts_31_103_shared import *
|
||||
import pySim.ts_102_221
|
||||
from pySim.ts_102_221 import EF_ARR
|
||||
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
|
||||
|
||||
# Mapping between ISIM Service Number and its description
|
||||
EF_IST_map = {
|
||||
@@ -226,7 +225,7 @@ class ADF_ISIM(CardADF):
|
||||
self.shell_commands += [ADF_USIM.AddlShellCommands()]
|
||||
|
||||
def decode_select_response(self, data_hex):
|
||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
|
||||
return CardProfileUICC.decode_select_response(data_hex)
|
||||
|
||||
|
||||
# TS 31.103 Section 7.1
|
||||
|
||||
@@ -24,9 +24,8 @@ from osmocom.utils import *
|
||||
from osmocom.tlv import *
|
||||
from pySim.filesystem import *
|
||||
from pySim.ts_31_102 import ADF_USIM
|
||||
from pySim.ts_51_011 import EF_IMSI, EF_AD
|
||||
import pySim.ts_102_221
|
||||
from pySim.ts_102_221 import EF_ARR
|
||||
from pySim.profile.ts_51_011 import EF_IMSI, EF_AD
|
||||
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
|
||||
|
||||
|
||||
class ADF_HPSIM(CardADF):
|
||||
@@ -44,7 +43,7 @@ class ADF_HPSIM(CardADF):
|
||||
self.shell_commands += [ADF_USIM.AddlShellCommands()]
|
||||
|
||||
def decode_select_response(self, data_hex):
|
||||
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
|
||||
return CardProfileUICC.decode_select_response(data_hex)
|
||||
|
||||
|
||||
# TS 31.104 Section 7.1
|
||||
|
||||
3
pySim/wsrc/__init__.py
Normal file
3
pySim/wsrc/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
|
||||
WSRC_DEFAULT_PORT_USER = 4220
|
||||
WSRC_DEFAULT_PORT_CARD = 4221
|
||||
68
pySim/wsrc/client_blocking.py
Normal file
68
pySim/wsrc/client_blocking.py
Normal file
@@ -0,0 +1,68 @@
|
||||
"""Connect smartcard to a remote server, so the remote server can take control and
|
||||
perform commands on it."""
|
||||
|
||||
import abc
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
from websockets.sync.client import connect
|
||||
from osmocom.utils import b2h
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WsClientBlocking(abc.ABC):
|
||||
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
|
||||
|
||||
def __init__(self, cltype: str, ws_uri: str):
|
||||
self.client_type = cltype
|
||||
self.ws_uri = ws_uri
|
||||
self.ws = None
|
||||
|
||||
def connect(self):
|
||||
self.ws = connect(uri=self.ws_uri)
|
||||
self.perform_outbound_hello()
|
||||
|
||||
def tx_json(self, msg_type: str, d: dict = {}):
|
||||
"""JSON-Encode and transmit a message to the given websocket."""
|
||||
d['msg_type'] = msg_type
|
||||
d_js = json.dumps(d)
|
||||
logger.debug("Tx: %s", d_js)
|
||||
self.ws.send(d_js)
|
||||
|
||||
def tx_error(self, message: str):
|
||||
event = {
|
||||
'message': message,
|
||||
}
|
||||
self.tx_json('error', event)
|
||||
|
||||
def rx_json(self):
|
||||
"""Receive a single message from the given websocket and JSON-decode it."""
|
||||
rx = self.ws.recv()
|
||||
rx_js = json.loads(rx)
|
||||
logger.debug("Rx: %s", rx_js)
|
||||
assert 'msg_type' in rx
|
||||
return rx_js
|
||||
|
||||
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
|
||||
self.tx_json(tx_msg_type, tx_d)
|
||||
rx = self.rx_json()
|
||||
assert rx['msg_type'] == rx_msg_type
|
||||
return rx
|
||||
|
||||
def perform_outbound_hello(self, tx: dict = {}):
|
||||
if not 'client_type' in tx:
|
||||
tx['client_type'] = self.client_type
|
||||
self.tx_json('hello', tx)
|
||||
rx = self.rx_json()
|
||||
assert rx['msg_type'] == 'hello_ack'
|
||||
return rx
|
||||
|
||||
def rx_and_execute_cmd(self):
|
||||
"""Receve and dispatch/execute a single command from the server."""
|
||||
rx = self.rx_json()
|
||||
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
|
||||
if handler:
|
||||
handler(rx)
|
||||
else:
|
||||
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
|
||||
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])
|
||||
@@ -25,10 +25,10 @@ import pySim.ts_102_221
|
||||
import pySim.ts_102_222
|
||||
import pySim.ts_31_102
|
||||
import pySim.ts_31_103
|
||||
import pySim.ts_51_011
|
||||
import pySim.profile.ts_51_011
|
||||
import pySim.sysmocom_sja2
|
||||
import pySim.gsm_r
|
||||
import pySim.cdma_ruim
|
||||
import pySim.profile.gsm_r
|
||||
import pySim.profile.cdma_ruim
|
||||
|
||||
from construct import Int8ub, Struct, Padding, this
|
||||
from osmocom.tlv import BER_TLV_IE
|
||||
|
||||
@@ -26,10 +26,10 @@ import pySim.ts_102_221
|
||||
import pySim.ts_102_222
|
||||
import pySim.ts_31_102
|
||||
import pySim.ts_31_103
|
||||
import pySim.ts_51_011
|
||||
import pySim.profile.ts_51_011
|
||||
import pySim.sysmocom_sja2
|
||||
import pySim.gsm_r
|
||||
import pySim.cdma_ruim
|
||||
import pySim.profile.gsm_r
|
||||
import pySim.profile.cdma_ruim
|
||||
import pySim.global_platform
|
||||
import pySim.global_platform.http
|
||||
|
||||
|
||||
Reference in New Issue
Block a user