5 Commits

Author SHA1 Message Date
Harald Welte
a9bb63a2db pySim.commands: Move away from pytlv for FCP parsing
This is the last pytlv use in non-legacy code, let's replace our
open-coded raw FCP parsing with the proper FCP parser we have in
ts_102_2221.py.

Change-Id: Ic5c5eec3b654b25e70c23c020910353cc5919aa9
2024-11-22 11:17:05 +01:00
Harald Welte
a01e87da77 split pySim.profile.euicc from pySim.euicc
Change-Id: I2b922622669a2b5ad496ea22fa67656a44096f4a
2024-11-22 11:17:05 +01:00
Harald Welte
53e840ad86 move more profiles (gsm_r, cmda_ruim) to pySim.profile
Change-Id: Id86aa67e70f2a667d9c70ba51e07f653990adaa4
2024-11-22 11:17:05 +01:00
Harald Welte
d15c3d1319 ts_102_221: split CardProfileEUICC to pySim.profile.ts_102_221
This avoids circular imports

Change-Id: I35b565837949de2c1286133850cb8c1e8d5ca844
2024-11-22 11:17:05 +01:00
Harald Welte
671b0f19b6 WIP: initial step towards websocket-based remote card [reader] access
Change-Id: I588bf4b3d9891766dd688a6818057ca20fb26e3f
2024-11-22 11:17:01 +01:00
31 changed files with 1202 additions and 488 deletions

112
contrib/wsrc_card_client.py Executable file
View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""Connect smartcard to a remote server, so the remote server can take control and
perform commands on it."""
import sys
import json
import logging
import argparse
from osmocom.utils import b2h
import websockets
from pySim.transport import init_reader, argparse_add_reader_args, LinkBase
from pySim.commands import SimCardCommands
from pySim.wsrc.client_blocking import WsClientBlocking
from pySim.exceptions import NoCardError
from pySim.wsrc import WSRC_DEFAULT_PORT_CARD
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
class CardWsClientBlocking(WsClientBlocking):
"""Implementation of the card (reader) client of the WSRC (WebSocket Remote Card) protocol"""
def __init__(self, ws_uri, tp: LinkBase):
super().__init__('card', ws_uri)
self.tp = tp
def perform_outbound_hello(self):
hello_data = {
'atr': b2h(self.tp.get_atr()),
# TODO: include various card information in the HELLO message
}
super().perform_outbound_hello(hello_data)
def handle_rx_c_apdu(self, rx: dict):
"""handle an inbound APDU transceive command"""
data, sw = self.tp.send_apdu(rx['command'])
tx = {
'response': data,
'sw': sw,
}
self.tx_json('r_apdu', tx)
def handle_rx_disconnect(self, rx: dict):
"""server tells us to disconnect"""
self.tx_json('disconnect_ack')
# FIXME: tear down connection and/or terminate entire program
def handle_rx_state_notification(self, rx: dict):
logger.info("State Notification: %s" % rx['new_state'])
def handle_rx_print(self, rx: dict):
"""print a message (text) given by server to the local console/log"""
logger.info("SERVER MSG: %s" % rx['message'])
# no response
def handle_rx_reset_req(self, rx: dict):
"""server tells us to reset the card"""
self.tp.reset_card()
self.tx_json('reset_resp', {'atr': b2h(self.tp.get_atr())})
parser = argparse.ArgumentParser()
argparse_add_reader_args(parser)
parser.add_argument("--uri", default="ws://localhost:%u/" % (WSRC_DEFAULT_PORT_CARD),
help="URI of the sever to which to connect")
if __name__ == '__main__':
opts = parser.parse_args()
# open the card reader / slot
logger.info("Initializing Card Reader...")
try:
tp = init_reader(opts)
except Exception as e:
logger.fatal("Error opening reader: %s" % e)
sys.exit(1)
logger.info("Connecting to Card...")
try:
tp.connect()
except NoCardError as e:
logger.fatal("Error opening card! Is a card inserted in the reader?")
sys.exit(1)
scc = SimCardCommands(transport=tp)
logger.info("Detected Card with ATR: %s" % b2h(tp.get_atr()))
# TODO: gather various information about the card; print it
# create + connect the client to the server
cl = CardWsClientBlocking(opts.uri, tp)
logger.info("Connecting to remote server...")
try:
cl.connect()
logger.info("Successfully connected to Server")
except ConnectionRefusedError as e:
logger.fatal(e)
sys.exit(1)
try:
while True:
# endless loop: wait for inbound command from server + execute it
cl.rx_and_execute_cmd()
except websockets.exceptions.ConnectionClosedOK as e:
print(e)
sys.exit(1)
except KeyboardInterrupt as e:
print(e.__class__.__name__)
sys.exit(2)

354
contrib/wsrc_server.py Executable file
View File

@@ -0,0 +1,354 @@
#!/usr/bin/env python
import json
import asyncio
import logging
import argparse
from typing import Optional, Tuple
from websockets.asyncio.server import serve
from websockets.exceptions import ConnectionClosedError
from osmocom.utils import Hexstr, swap_nibbles
from pySim.utils import SwMatchstr, ResTuple, sw_match, dec_iccid
from pySim.exceptions import SwMatchError
from pySim.wsrc import WSRC_DEFAULT_PORT_USER, WSRC_DEFAULT_PORT_CARD
logging.basicConfig(format="[%(levelname)s] %(asctime)s %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
card_clients = set()
user_clients = set()
class WsClientLogAdapter(logging.LoggerAdapter):
"""LoggerAdapter adding context (for now: remote IP/Port of client) to log"""
def process(self, msg, kwargs):
return '%s([%s]:%u) %s' % (self.extra['type'], self.extra['remote_addr'][0],
self.extra['remote_addr'][1], msg), kwargs
class WsClient:
def __init__(self, websocket, hello: dict):
self.websocket = websocket
self.hello = hello
self.identity = {}
self.logger = WsClientLogAdapter(logger, {'type': self.__class__.__name__,
'remote_addr': websocket.remote_address})
def __str__(self):
return '%s([%s]:%u)' % (self.__class__.__name__, self.websocket.remote_address[0],
self.websocket.remote_address[1])
async def rx_json(self):
rx = await self.websocket.recv()
rx_js = json.loads(rx)
self.logger.debug("Rx: %s", rx_js)
assert 'msg_type' in rx
return rx_js
async def tx_json(self, msg_type:str, d: dict = {}):
"""Transmit a json-serializable dict to the peer"""
d['msg_type'] = msg_type
d_js = json.dumps(d)
self.logger.debug("Tx: %s", d_js)
await self.websocket.send(d_js)
async def tx_hello_ack(self):
await self.tx_json('hello_ack')
async def xceive_json(self, msg_type:str, d:dict = {}, exp_msg_type:Optional[str] = None) -> dict:
await self.tx_json(msg_type, d)
rx = await self.rx_json()
if exp_msg_type:
assert rx['msg_type'] == exp_msg_type
return rx;
async def tx_error(self, message: str):
"""Transmit an error message to the peer"""
event = {
"message": message,
}
self.logger.error("Transmitting error message: '%s'" % message)
await self.tx_json('error', event)
# async def ws_hdlr(self):
# """kind of a 'main' function for the websocket client: wait for incoming message,
# and handle it."""
# try:
# async for message in self.websocket:
# method = getattr(self, 'handle_rx_%s' % message['msg_type'], None)
# if not method:
# await self.tx_error("Unknonw msg_type: %s" % message['msg_type'])
# else:
# method(message)
# except ConnectionClosedError:
# # we handle this in the outer loop
# pass
class CardClient(WsClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.identity['ATR'] = self.hello['atr']
self.state = 'init'
def __str__(self):
eid = self.identity.get('EID', None)
if eid:
return '%s(EID=%s)' % (self.__class__.__name__, eid)
iccid = self.identity.get('ICCID', None)
if iccid:
return '%s(ICCID=%s)' % (self.__class__.__name__, iccid)
return super().__str__()
def listing_entry(self) -> dict:
return {'remote_addr': self.websocket.remote_address,
'identities': self.identity,
'state': self.state}
"""A websocket client that represents a reader/card. This is what we use to talk to a card"""
async def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
"""transceive a single APDU with the card"""
message = await self.xceive_json('c_apdu', {'command': cmd}, 'r_apdu')
return message['response'], message['sw']
async def xceive_apdu(self, pdu: Hexstr) -> ResTuple:
"""transceive an APDU with the card, handling T=0 GET_RESPONSE cases"""
prev_pdu = pdu
data, sw = await self.xceive_apdu_raw(pdu)
if sw is not None:
while (sw[0:2] in ['9f', '61', '62', '63']):
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2
# SW1=62: ETSI TS 102 221 7.3.1.1.4 Clause 4b): 62xx, 63xx, 9xxx != 9000
pdu_gr = pdu[0:2] + 'c00000' + sw[2:4]
prev_pdu = pdu_gr
d, sw = await self.xceive_apdu_raw(pdu_gr)
data += d
if sw[0:2] == '6c':
# SW1=6C: ETSI TS 102 221 Table 7.1: Procedure byte coding
pdu_gr = prev_pdu[0:8] + sw[2:4]
data, sw = await self.xceive_apdu_raw(pdu_gr)
return data, sw
async def xceive_apdu_checksw(self, pdu: Hexstr, sw: SwMatchstr = "9000") -> ResTuple:
"""like xceive_apdu, but checking the status word matches the expected pattern"""
rv = await self.xceive_apdu(pdu)
last_sw = rv[1]
if not sw_match(rv[1], sw):
raise SwMatchError(rv[1], sw.lower())
return rv
async def card_reset(self):
"""reset the card"""
rx = await self.xceive_json('reset_req', exp_msg_type='reset_resp')
self.identity['ATR'] = rx['atr']
async def notify_state(self):
"""notify the card client of a state [change]"""
await self.tx_json('state_notification', {'new_state': self.state})
async def get_iccid(self):
"""high-level method to obtain the ICCID of the card"""
await self.xceive_apdu_checksw('00a40000023f00') # SELECT MF
await self.xceive_apdu_checksw('00a40000022fe2') # SELECT EF.ICCID
res, sw = await self.xceive_apdu_checksw('00b0000000') # READ BINARY
return dec_iccid(res)
async def get_eid_sgp22(self):
"""high-level method to obtain the EID of a SGP.22 consumer eUICC"""
await self.xceive_apdu_checksw('00a4040410a0000005591010ffffffff8900000100')
res, sw = await self.xceive_apdu_checksw('80e2910006bf3e035c015a')
return res[-32:]
async def set_state(self, new_state:str):
self.logger.info("Card now in '%s' state" % new_state)
self.state = new_state
await self.notify_state()
async def identify(self):
# identify the card by asking for its EID and/or ICCID
try:
eid = await self.get_eid_sgp22()
self.logger.debug("EID: %s", eid)
self.identity['EID'] = eid
except SwMatchError:
pass
try:
iccid = await self.get_iccid()
self.logger.debug("ICCID: %s", iccid)
self.identity['ICCID'] = iccid
except SwMatchError:
pass
await self.set_state('ready')
async def associate_user(self, user):
assert self.state == 'ready'
self.user = user
await self.set_state('associated')
async def disassociate_user(self):
assert self.user
assert self.state == 'associated'
await self.set_state('ready')
@staticmethod
def find_client_for_id(id_type: str, id_str: str) -> Optional['CardClient']:
for c in card_clients:
if c.state != 'ready':
continue
c_id = c.identity.get(id_type.upper(), None)
if c_id and c_id.lower() == id_str.lower():
return c
return None
class UserClient(WsClient):
"""A websocket client representing a user application like pySim-shell."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.state = 'init'
self.card = None
async def associate_card(self, card: CardClient):
assert self.state == 'init'
self.card = card
self.state = 'associated'
async def disassociate_card(self):
assert self.state == 'associated'
self.card = None
self.state = 'init'
async def state_init(self):
"""Wait for incoming 'select_card' and process it."""
while True:
rx = await self.rx_json()
if rx['msg_type'] == 'select_card':
# look-up if the card can be found
card = CardClient.find_client_for_id(rx['id_type'], rx['id_str'])
if not card:
await self.tx_error('No CardClient found for %s == %s' % (rx['id_type'], rx['id_str']))
continue
# transition to next statee
await card.associate_user(self)
await self.associate_card(card)
await self.tx_json('select_card_ack', {'identities': card.identity})
break
elif rx['msg_type'] == 'list_cards':
res = [x.listing_entry() for x in card_clients]
await self.tx_json('list_cards_ack', {'cards': res})
else:
self.tx_error('Unknown message type %s' % rx['msg_type'])
async def state_selected(self):
while True:
rx = await self.rx_json()
if rx['msg_type'] == 'c_apdu':
rsp, sw = await self.card.xceive_apdu_raw(rx['command'])
await self.tx_json('r_apdu', {'response': rsp, 'sw': sw})
elif rx['msg_type'] == 'reset_req':
await self.card.card_reset()
await self.tx_json('reset_resp')
else:
self.logger.warning("Unknown/unsupported command '%s' received" % rx['msg_type'])
async def card_conn_hdlr(websocket):
"""Handler for incoming connection to 'card' port."""
# receive first message, which should be a 'hello'
rx_raw = await websocket.recv()
rx = json.loads(rx_raw)
assert rx['msg_type'] == 'hello'
client_type = rx['client_type']
if client_type != 'card':
logger.error("Rejecting client (unknown type %s) connection", client_type)
raise ValueError("client_type '%s' != expected 'card'" % client_type)
card = CardClient(websocket, rx)
card.logger.info("connection accepted")
# first go through identity phase
async with asyncio.timeout(30):
await card.tx_hello_ack()
card.logger.info("hello-handshake completed")
card_clients.add(card)
# first obtain the identity of the card
await card.identify()
# then go into the "main loop"
try:
# wait 'indefinitely'. We cannot call websocket.recv() here, as we will call another
# recv() while waiting for the R-APDU after forwarding one from the user client.
await websocket.wait_closed()
finally:
card.logger.info("connection closed")
card_clients.remove(card)
async def user_conn_hdlr(websocket):
"""Handler for incoming connection to 'user' port."""
# receive first message, which should be a 'hello'
rx_raw = await websocket.recv()
rx = json.loads(rx_raw)
assert rx['msg_type'] == 'hello'
client_type = rx['client_type']
if client_type != 'user':
logger.error("Rejecting client (unknown type %s) connection", client_type)
raise ValueError("client_type '%s' != expected 'card'" % client_type)
user = UserClient(websocket, rx)
user.logger.info("connection accepted")
# first go through hello phase
async with asyncio.timeout(10):
await user.tx_hello_ack()
user.logger.info("hello-handshake completed")
user_clients.add(user)
# first wait for the user to specify the select the card
try:
await user.state_init()
except ConnectionClosedError:
user.logger.info("connection closed")
user_clients.remove(user)
return
except asyncio.exceptions.CancelledError: # direct cause of TimeoutError
user.logger.error("User failed to transition to 'associated' state within 10s")
user_clients.remove(user)
return
except Exception as e:
user.logger.error("Unknown exception %s", e)
raise e
user_clients.remove(user)
return
# then perform APDU exchanges without any time limit
try:
await user.state_selected()
except ConnectionClosedError:
pass
finally:
user.logger.info("connection closed")
await user.card.disassociate_user()
await user.disassociate_card()
user_clients.remove(user)
parser = argparse.ArgumentParser(description="""Osmocom WebSocket Remote Card server""")
parser.add_argument('--user-bind-port', type=int, default=WSRC_DEFAULT_PORT_USER,
help="port number to bind for connections from users")
parser.add_argument('--user-bind-host', type=str, default='localhost',
help="local Host/IP to bind for connections from users")
parser.add_argument('--card-bind-port', type=int, default=WSRC_DEFAULT_PORT_CARD,
help="port number to bind for connections from cards")
parser.add_argument('--card-bind-host', type=str, default='localhost',
help="local Host/IP to bind for connections from cards")
if __name__ == '__main__':
opts = parser.parse_args()
async def main():
# we use different ports for user + card connections to ensure they can
# have different packet filter rules apply to them
async with serve(card_conn_hdlr, opts.card_bind_host, opts.card_bind_port), serve(user_conn_hdlr, opts.user_bind_host, opts.user_bind_port):
await asyncio.get_running_loop().create_future() # run forever
asyncio.run(main())

View File

@@ -43,6 +43,7 @@ pySim consists of several parts:
legacy
library
osmo-smdpp
wsrc
Indices and tables

57
docs/wsrc.rst Normal file
View File

@@ -0,0 +1,57 @@
WebSocket Remote Card (WSRC)
============================
WSRC (*Web Socket Remote Card*) is a mechanism by which card readers can be made remotely available
via a computer network. The transport mechanism is (as the name implies) a WebSocket. This transport
method was chosen to be as firewall/NAT friendly as possible.
WSRC Network Architecture
-------------------------
In a WSRC network, there are three major elements:
* The **WSRC Card Client** which exposes a locally attached smart card (usually via a Smart Card Reader)
to a remote *WSRC Server*
* The **WSRC Server** manges incoming connections from both *WSRC Card Clients* as well as *WSRC User Clients*
* The **WSRC User Client** is a user application, like for example pySim-shell, which is accessing a remote
card by connecting to the *WSRC Server* which relays the information to the selected *WSRC Card Client*
WSRC Protocol
-------------
The WSRC protocl consits of JSON objects being sent over a websocket. The websocket communication itself
is based on HTTP and should usually operate via TLS for security reasons.
The detailed protocol is currently still WIP. The plan is to document it here.
pySim implementations
---------------------
wsrc_server
~~~~~~~~~~~~~~~~
.. argparse::
:filename: ../contrib/wsrc_server.py
:func: parser
:prog: contrib/wsrc_server.py
wsrc_card_client
~~~~~~~~~~~~~~~~
.. argparse::
:filename: ../contrib/wsrc_card_client.py
:func: parser
:prog: contrib/wsrc_card_client.py
pySim-shell
~~~~~~~~~~~
pySim-shell can talk to a remote card via WSRC if you use the *wsrc transport*, for example like this:
::
./pySim-shell.py --wsrc-eid 89882119900000000000000000007280 --wsrc-serer-url ws://localhost:4220/
You can specify `--wsrc-eid` or `--wsrc-iccid` to identify the remote eUICC or UICC you would like to select.

View File

@@ -39,7 +39,7 @@ from pySim.commands import SimCardCommands
from pySim.transport import init_reader, argparse_add_reader_args
from pySim.legacy.cards import _cards_classes, card_detect
from pySim.utils import derive_milenage_opc, calculate_luhn, dec_iccid
from pySim.ts_51_011 import EF_AD
from pySim.profile.ts_51_011 import EF_AD
from pySim.legacy.ts_51_011 import EF
from pySim.card_handler import *
from pySim.utils import *

View File

@@ -31,7 +31,7 @@ import sys
from osmocom.utils import h2b, h2s, swap_nibbles, rpad
from pySim.ts_51_011 import EF_SST_map, EF_AD
from pySim.profile.ts_51_011 import EF_SST_map, EF_AD
from pySim.legacy.ts_51_011 import EF, DF
from pySim.ts_31_102 import EF_UST_map
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map

View File

@@ -60,7 +60,6 @@ from pySim.card_handler import CardHandler, CardHandlerAuto
from pySim.filesystem import CardMF, CardEF, CardDF, CardADF, LinFixedEF, TransparentEF, BerTlvEF
from pySim.ts_102_221 import pin_names
from pySim.ts_102_222 import Ts102222Commands
from pySim.gsm_r import DF_EIRENE
from pySim.cat import ProactiveCommand
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field

View File

@@ -13,7 +13,7 @@ from osmocom.utils import JsonEncoder
from pySim.cards import UiccCardBase
from pySim.commands import SimCardCommands
from pySim.profile import CardProfile
from pySim.ts_102_221 import CardProfileUICC
from pySim.profile.ts_102_221 import CardProfileUICC
from pySim.ts_31_102 import CardApplicationUSIM
from pySim.ts_31_103 import CardApplicationISIM
from pySim.euicc import CardApplicationISDR, CardApplicationECASD

View File

@@ -22,8 +22,8 @@ from pySim.filesystem import CardModel, CardApplication
from pySim.cards import card_detect, SimCardBase, UiccCardBase, CardBase
from pySim.runtime import RuntimeState
from pySim.profile import CardProfile
from pySim.cdma_ruim import CardProfileRUIM
from pySim.ts_102_221 import CardProfileUICC
from pySim.profile.cdma_ruim import CardProfileRUIM
from pySim.profile.ts_102_221 import CardProfileUICC
from pySim.utils import all_subclasses
from pySim.exceptions import SwMatchError
@@ -40,7 +40,7 @@ import pySim.ts_31_103
import pySim.ts_31_104
import pySim.ara_m
import pySim.global_platform
import pySim.euicc
import pySim.profile.euicc
def init_card(sl: LinkBase, skip_card_init: bool = False) -> Tuple[RuntimeState, SimCardBase]:
"""

View File

@@ -25,8 +25,8 @@
from typing import Optional, Tuple
from osmocom.utils import *
from pySim.ts_102_221 import EF_DIR, CardProfileUICC
from pySim.ts_51_011 import DF_GSM
from pySim.profile.ts_102_221 import EF_DIR, CardProfileUICC
from pySim.profile.ts_51_011 import DF_GSM
from pySim.utils import SwHexstr
from pySim.commands import Path, SimCardCommands

View File

@@ -32,6 +32,7 @@ from osmocom.tlv import bertlv_encode_len
from pySim.utils import sw_match, expand_hex, SwHexstr, ResTuple, SwMatchstr
from pySim.exceptions import SwMatchError
from pySim.transport import LinkBase
from pySim.ts_102_221 import decode_select_response
# A path can be either just a FID or a list of FID
Path = typing.Union[Hexstr, List[Hexstr]]
@@ -176,40 +177,6 @@ class SimCardCommands:
raise SwMatchError(sw, sw_exp.lower(), self._tp.sw_interpreter)
return (rsp, sw)
# Extract a single FCP item from TLV
def __parse_fcp(self, fcp: Hexstr):
# see also: ETSI TS 102 221, chapter 11.1.1.3.1 Response for MF,
# DF or ADF
from pytlv.TLV import TLV
tlvparser = TLV(['82', '83', '84', 'a5', '8a', '8b',
'8c', '80', 'ab', 'c6', '81', '88'])
# pytlv is case sensitive!
fcp = fcp.lower()
if fcp[0:2] != '62':
raise ValueError(
'Tag of the FCP template does not match, expected 62 but got %s' % fcp[0:2])
# Unfortunately the spec is not very clear if the FCP length is
# coded as one or two byte vale, so we have to try it out by
# checking if the length of the remaining TLV string matches
# what we get in the length field.
# See also ETSI TS 102 221, chapter 11.1.1.3.0 Base coding.
# TODO: this likely just is normal BER-TLV ("All data objects are BER-TLV except if otherwise # defined.")
exp_tlv_len = int(fcp[2:4], 16)
if len(fcp[4:]) // 2 == exp_tlv_len:
skip = 4
else:
exp_tlv_len = int(fcp[2:6], 16)
if len(fcp[4:]) // 2 == exp_tlv_len:
skip = 6
raise ValueError('Cannot determine length of TLV-length')
# Skip FCP tag and length
tlv = fcp[skip:]
return tlvparser.parse(tlv)
# Tell the length of a record by the card response
# USIMs respond with an FCP template, which is different
# from what SIMs responds. See also:
@@ -217,10 +184,8 @@ class SimCardCommands:
# SIM: GSM 11.11, chapter 9.2.1 SELECT
def __record_len(self, r) -> int:
if self.sel_ctrl == "0004":
tlv_parsed = self.__parse_fcp(r[-1])
file_descriptor = tlv_parsed['82']
# See also ETSI TS 102 221, chapter 11.1.1.4.3 File Descriptor
return int(file_descriptor[4:8], 16)
fcp_parsed = decode_select_response(r[-1])
return fcp_parsed['file_descriptor']['record_len']
else:
return int(r[-1][28:30], 16)
@@ -228,8 +193,8 @@ class SimCardCommands:
# above.
def __len(self, r) -> int:
if self.sel_ctrl == "0004":
tlv_parsed = self.__parse_fcp(r[-1])
return int(tlv_parsed['80'], 16)
fcp_parsed = decode_select_response(r[-1])
return fcp_parsed['file_size']
else:
return int(r[-1][4:8], 16)

View File

@@ -34,7 +34,6 @@ from osmocom.construct import *
from pySim.exceptions import SwMatchError
from pySim.utils import Hexstr, SwHexstr, SwMatchstr
from pySim.commands import SimCardCommands
from pySim.ts_102_221 import CardProfileUICC
import pySim.global_platform
# SGP.02 Section 2.2.2
@@ -557,43 +556,3 @@ class CardApplicationECASD(pySim.global_platform.CardApplicationSD):
@with_default_category('Application-Specific Commands')
class AddlShellCommands(CommandSet):
pass
class CardProfileEuiccSGP32(CardProfileUICC):
ORDER = 5
def __init__(self):
super().__init__(name='IoT eUICC (SGP.32)')
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
# try a command only supported by SGP.32
scc.cla_byte = "00"
scc.select_adf(AID_ISD_R)
CardApplicationISDR.store_data_tlv(scc, GetCertsReq(), GetCertsResp)
class CardProfileEuiccSGP22(CardProfileUICC):
ORDER = 6
def __init__(self):
super().__init__(name='Consumer eUICC (SGP.22)')
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
# try to read EID from ISD-R
scc.cla_byte = "00"
scc.select_adf(AID_ISD_R)
eid = CardApplicationISDR.get_eid(scc)
# TODO: Store EID identity?
class CardProfileEuiccSGP02(CardProfileUICC):
ORDER = 7
def __init__(self):
super().__init__(name='M2M eUICC (SGP.02)')
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
scc.cla_byte = "00"
scc.select_adf(AID_ECASD)
scc.get_data(0x5a)
# TODO: Store EID identity?

View File

@@ -16,7 +16,7 @@ from pySim.legacy.ts_51_011 import EF, DF
from pySim.legacy.ts_31_102 import EF_USIM_ADF_map
from pySim.legacy.ts_31_103 import EF_ISIM_ADF_map
from pySim.ts_51_011 import EF_AD, EF_SPN
from pySim.profile.ts_51_011 import EF_AD, EF_SPN
def format_addr(addr: str, addr_type: str) -> str:
"""

View File

@@ -148,7 +148,7 @@ def dec_st(st, table="sim") -> str:
from pySim.ts_31_102 import EF_UST_map
lookup_map = EF_UST_map
else:
from pySim.ts_51_011 import EF_SST_map
from pySim.profile.ts_51_011 import EF_SST_map
lookup_map = EF_SST_map
st_bytes = [st[i:i+2] for i in range(0, len(st), 2)]

View File

@@ -25,9 +25,9 @@ from osmocom.construct import *
from pySim.filesystem import *
from pySim.profile import CardProfile, CardProfileAddon
from pySim.ts_51_011 import CardProfileSIM
from pySim.ts_51_011 import DF_TELECOM, DF_GSM
from pySim.ts_51_011 import EF_ServiceTable
from pySim.profile.ts_51_011 import CardProfileSIM
from pySim.profile.ts_51_011 import DF_TELECOM, DF_GSM
from pySim.profile.ts_51_011 import EF_ServiceTable
# Mapping between CDMA Service Number and its description

58
pySim/profile/euicc.py Normal file
View File

@@ -0,0 +1,58 @@
# Copyright (C) 2023 Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.profile.ts_102_221 import CardProfileUICC
from pySim.commands import SimCardCommands
from pySim.euicc import CardApplicationISDR, AID_ISD_R, AID_ECASD, GetCertsReq, GetCertsResp
class CardProfileEuiccSGP32(CardProfileUICC):
ORDER = 5
def __init__(self):
super().__init__(name='IoT eUICC (SGP.32)')
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
# try a command only supported by SGP.32
scc.cla_byte = "00"
scc.select_adf(AID_ISD_R)
CardApplicationISDR.store_data_tlv(scc, GetCertsReq(), GetCertsResp)
class CardProfileEuiccSGP22(CardProfileUICC):
ORDER = 6
def __init__(self):
super().__init__(name='Consumer eUICC (SGP.22)')
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
# try to read EID from ISD-R
scc.cla_byte = "00"
scc.select_adf(AID_ISD_R)
eid = CardApplicationISDR.get_eid(scc)
# TODO: Store EID identity?
class CardProfileEuiccSGP02(CardProfileUICC):
ORDER = 7
def __init__(self):
super().__init__(name='M2M eUICC (SGP.02)')
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
scc.cla_byte = "00"
scc.select_adf(AID_ECASD)
scc.get_data(0x5a)
# TODO: Store EID identity?

394
pySim/profile/ts_102_221.py Normal file
View File

@@ -0,0 +1,394 @@
# coding=utf-8
"""Card Profile of ETSI TS 102 221, the core UICC spec.
(C) 2021-2024 by Harald Welte <laforge@osmocom.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from construct import Struct, FlagsEnum, GreedyString
from osmocom.construct import *
from osmocom.utils import *
from osmocom.tlv import BER_TLV_IE
from pySim.utils import *
from pySim.filesystem import *
from pySim.profile import CardProfile
from pySim import iso7816_4
from pySim.ts_102_221 import decode_select_response, ts_102_22x_cmdset
from pySim.ts_102_221 import AM_DO_EF, SC_DO, AdditionalInterfacesSupport, AdditionalTermCapEuicc
from pySim.ts_102_221 import TerminalPowerSupply, ExtendedLchanTerminalSupport, TerminalCapability
# A UICC will usually also support 2G functionality. If this is the case, we
# need to add DF_GSM and DF_TELECOM along with the UICC related files
from pySim.profile.ts_51_011 import AddonSIM, EF_ICCID, EF_PL
from pySim.profile.gsm_r import AddonGSMR
from pySim.profile.cdma_ruim import AddonRUIM
# TS 102 221 Section 13.1
class EF_DIR(LinFixedEF):
_test_de_encode = [
( '61294f10a0000000871002ffffffff890709000050055553696d31730ea00c80011781025f608203454150',
{ "application_template": [ { "application_id": h2b("a0000000871002ffffffff8907090000") },
{ "application_label": "USim1" },
{ "discretionary_template": h2b("a00c80011781025f608203454150") } ] }
),
( '61194f10a0000000871004ffffffff890709000050054953696d31',
{ "application_template": [ { "application_id": h2b("a0000000871004ffffffff8907090000") },
{ "application_label": "ISim1" } ] }
),
]
class ApplicationLabel(BER_TLV_IE, tag=0x50):
# TODO: UCS-2 coding option as per Annex A of TS 102 221
_construct = GreedyString('ascii')
# see https://github.com/PyCQA/pylint/issues/5794
#pylint: disable=undefined-variable
class ApplicationTemplate(BER_TLV_IE, tag=0x61,
nested=[iso7816_4.ApplicationId, ApplicationLabel, iso7816_4.FileReference,
iso7816_4.CommandApdu, iso7816_4.DiscretionaryData,
iso7816_4.DiscretionaryTemplate, iso7816_4.URL,
iso7816_4.ApplicationRelatedDOSet]):
pass
def __init__(self, fid='2f00', sfid=0x1e, name='EF.DIR', desc='Application Directory'):
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(5, 54))
self._tlv = EF_DIR.ApplicationTemplate
# TS 102 221 Section 13.4
class EF_ARR(LinFixedEF):
_test_de_encode = [
( '800101a40683010a950108800106900080016097008401d4a40683010a950108',
[ [ { "access_mode": [ "read_search_compare" ] },
{ "control_reference_template": "ADM1" } ],
[ { "access_mode": [ "write_append", "update_erase" ] },
{ "always": None } ],
[ { "access_mode": [ "delete_file", "terminate_ef" ] },
{ "never": None } ],
[ { "command_header": { "INS": 212 } },
{ "control_reference_template": "ADM1" } ]
] ),
( '80010190008001029700800118a40683010a9501088401d4a40683010a950108',
[ [ { "access_mode": [ "read_search_compare" ] },
{ "always": None } ],
[ { "access_mode": [ "update_erase" ] },
{ "never": None } ],
[ { "access_mode": [ "activate_file_or_record", "deactivate_file_or_record" ] },
{ "control_reference_template": "ADM1" } ],
[ { "command_header": { "INS": 212 } },
{ "control_reference_template": "ADM1" } ]
] ),
]
def __init__(self, fid='2f06', sfid=0x06, name='EF.ARR', desc='Access Rule Reference'):
super().__init__(fid, sfid=sfid, name=name, desc=desc)
# add those commands to the general commands of a TransparentEF
self.shell_commands += [self.AddlShellCommands()]
@staticmethod
def flatten(inp: list):
"""Flatten the somewhat deep/complex/nested data returned from decoder."""
def sc_abbreviate(sc):
if 'always' in sc:
return 'always'
elif 'never' in sc:
return 'never'
elif 'control_reference_template' in sc:
return sc['control_reference_template']
else:
return sc
by_mode = {}
for t in inp:
am = t[0]
sc = t[1]
sc_abbr = sc_abbreviate(sc)
if 'access_mode' in am:
for m in am['access_mode']:
by_mode[m] = sc_abbr
elif 'command_header' in am:
ins = am['command_header']['INS']
if 'CLA' in am['command_header']:
cla = am['command_header']['CLA']
else:
cla = None
cmd = ts_102_22x_cmdset.lookup(ins, cla)
if cmd:
name = cmd.name.lower().replace(' ', '_')
by_mode[name] = sc_abbr
else:
raise ValueError
else:
raise ValueError
return by_mode
def _decode_record_bin(self, raw_bin_data, **kwargs):
# we can only guess if we should decode for EF or DF here :(
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
dec = arr_seq.decode_multi(raw_bin_data)
# we cannot pass the result through flatten() here, as we don't have a related
# 'un-flattening' decoder, and hence would be unable to encode :(
return dec[0]
def _encode_record_bin(self, in_json, **kwargs):
# we can only guess if we should decode for EF or DF here :(
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
return arr_seq.encode_multi(in_json)
@with_default_category('File-Specific Commands')
class AddlShellCommands(CommandSet):
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
def do_read_arr_record(self, opts):
"""Read one EF.ARR record in flattened, human-friendly form."""
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
data = self._cmd.lchan.selected_file.flatten(data)
self._cmd.poutput_json(data, opts.oneline)
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_recs_dec_parser)
def do_read_arr_records(self, opts):
"""Read + decode all EF.ARR records in flattened, human-friendly form."""
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
# collect all results in list so they are rendered as JSON list when printing
data_list = []
for recnr in range(1, 1 + num_of_rec):
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
data = self._cmd.lchan.selected_file.flatten(data)
data_list.append(data)
self._cmd.poutput_json(data_list, opts.oneline)
# TS 102 221 Section 13.6
class EF_UMPC(TransparentEF):
_test_de_encode = [
( '3cff02', { "max_current_mA": 60, "t_op_s": 255,
"addl_info": { "req_inc_idle_current": False, "support_uicc_suspend": True } } ),
( '320500', { "max_current_mA": 50, "t_op_s": 5, "addl_info": {"req_inc_idle_current": False,
"support_uicc_suspend": False } } ),
]
def __init__(self, fid='2f08', sfid=0x08, name='EF.UMPC', desc='UICC Maximum Power Consumption'):
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=(5, 5))
addl_info = FlagsEnum(Byte, req_inc_idle_current=1,
support_uicc_suspend=2)
self._construct = Struct(
'max_current_mA'/Int8ub, 't_op_s'/Int8ub, 'addl_info'/addl_info)
class CardProfileUICC(CardProfile):
ORDER = 10
def __init__(self, name='UICC'):
files = [
EF_DIR(),
EF_ICCID(),
EF_PL(),
EF_ARR(),
# FIXME: DF.CD
EF_UMPC(),
]
addons = [
AddonSIM,
AddonGSMR,
AddonRUIM,
]
sw = {
'Normal': {
'9000': 'Normal ending of the command',
'91xx': 'Normal ending of the command, with extra information from the proactive UICC containing a command for the terminal',
'92xx': 'Normal ending of the command, with extra information concerning an ongoing data transfer session',
},
'Postponed processing': {
'9300': 'SIM Application Toolkit is busy. Command cannot be executed at present, further normal commands are allowed',
},
'Warnings': {
'6200': 'No information given, state of non-volatile memory unchanged',
'6281': 'Part of returned data may be corrupted',
'6282': 'End of file/record reached before reading Le bytes or unsuccessful search',
'6283': 'Selected file invalidated/disabled; needs to be activated before use',
'6284': 'Selected file in termination state',
'62f1': 'More data available',
'62f2': 'More data available and proactive command pending',
'62f3': 'Response data available',
'63f1': 'More data expected',
'63f2': 'More data expected and proactive command pending',
'63cx': 'Command successful but after using an internal update retry routine X times',
},
'Execution errors': {
'6400': 'No information given, state of non-volatile memory unchanged',
'6500': 'No information given, state of non-volatile memory changed',
'6581': 'Memory problem',
},
'Checking errors': {
'6700': 'Wrong length',
'67xx': 'The interpretation of this status word is command dependent',
'6b00': 'Wrong parameter(s) P1-P2',
'6d00': 'Instruction code not supported or invalid',
'6e00': 'Class not supported',
'6f00': 'Technical problem, no precise diagnosis',
'6fxx': 'The interpretation of this status word is command dependent',
},
'Functions in CLA not supported': {
'6800': 'No information given',
'6881': 'Logical channel not supported',
'6882': 'Secure messaging not supported',
},
'Command not allowed': {
'6900': 'No information given',
'6981': 'Command incompatible with file structure',
'6982': 'Security status not satisfied',
'6983': 'Authentication/PIN method blocked',
'6984': 'Referenced data invalidated',
'6985': 'Conditions of use not satisfied',
'6986': 'Command not allowed (no EF selected)',
'6989': 'Command not allowed - secure channel - security not satisfied',
},
'Wrong parameters': {
'6a80': 'Incorrect parameters in the data field',
'6a81': 'Function not supported',
'6a82': 'File not found',
'6a83': 'Record not found',
'6a84': 'Not enough memory space',
'6a86': 'Incorrect parameters P1 to P2',
'6a87': 'Lc inconsistent with P1 to P2',
'6a88': 'Referenced data not found',
},
'Application errors': {
'9850': 'INCREASE cannot be performed, max value reached',
'9862': 'Authentication error, application specific',
'9863': 'Security session or association expired',
'9864': 'Minimum UICC suspension time is too long',
},
}
super().__init__(name, desc='ETSI TS 102 221', cla="00",
sel_ctrl="0004", files_in_mf=files, sw=sw,
shell_cmdsets = [self.AddlShellCommands()], addons = addons)
@staticmethod
def decode_select_response(data_hex: str) -> object:
"""ETSI TS 102 221 Section 11.1.1.3"""
return decode_select_response(data_hex)
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
""" Try to access MF via UICC APDUs (3GPP TS 102.221), if this works, the
card is considered a UICC card."""
cls._mf_select_test(scc, "00", "0004", ["3f00"])
@with_default_category('TS 102 221 Specific Commands')
class AddlShellCommands(CommandSet):
suspend_uicc_parser = argparse.ArgumentParser()
suspend_uicc_parser.add_argument('--min-duration-secs', type=int, default=60,
help='Proposed minimum duration of suspension')
suspend_uicc_parser.add_argument('--max-duration-secs', type=int, default=24*60*60,
help='Proposed maximum duration of suspension')
# not ISO7816-4 but TS 102 221
@cmd2.with_argparser(suspend_uicc_parser)
def do_suspend_uicc(self, opts):
"""Perform the SUSPEND UICC command. Only supported on some UICC (check EF.UMPC)."""
(duration, token, sw) = self._cmd.card._scc.suspend_uicc(min_len_secs=opts.min_duration_secs,
max_len_secs=opts.max_duration_secs)
self._cmd.poutput(
'Negotiated Duration: %u secs, Token: %s, SW: %s' % (duration, token, sw))
resume_uicc_parser = argparse.ArgumentParser()
resume_uicc_parser.add_argument('TOKEN', type=str, help='Token provided during SUSPEND')
@cmd2.with_argparser(resume_uicc_parser)
def do_resume_uicc(self, opts):
"""Perform the REUSME UICC operation. Only supported on some UICC. Also: A power-cycle
of the card is required between SUSPEND and RESUME, and only very few non-RESUME
commands are permitted between SUSPEND and RESUME. See TS 102 221 Section 11.1.22."""
self._cmd.card._scc.resume_uicc(opts.TOKEN)
term_cap_parser = argparse.ArgumentParser()
# power group
tc_power_grp = term_cap_parser.add_argument_group('Terminal Power Supply')
tc_power_grp.add_argument('--used-supply-voltage-class', type=str, choices=['a','b','c','d','e'],
help='Actual used Supply voltage class')
tc_power_grp.add_argument('--maximum-available-power-supply', type=auto_uint8,
help='Maximum available power supply of the terminal')
tc_power_grp.add_argument('--actual-used-freq-100k', type=auto_uint8,
help='Actual used clock frequency (in units of 100kHz)')
# no separate groups for those two
tc_elc_grp = term_cap_parser.add_argument_group('Extended logical channels terminal support')
tc_elc_grp.add_argument('--extended-logical-channel', action='store_true',
help='Extended Logical Channel supported')
tc_aif_grp = term_cap_parser.add_argument_group('Additional interfaces support')
tc_aif_grp.add_argument('--uicc-clf', action='store_true',
help='Local User Interface in the Device (LUId) supported')
# eUICC group
tc_euicc_grp = term_cap_parser.add_argument_group('Additional Terminal capability indications related to eUICC')
tc_euicc_grp.add_argument('--lui-d', action='store_true',
help='Local User Interface in the Device (LUId) supported')
tc_euicc_grp.add_argument('--lpd-d', action='store_true',
help='Local Profile Download in the Device (LPDd) supported')
tc_euicc_grp.add_argument('--lds-d', action='store_true',
help='Local Discovery Service in the Device (LPDd) supported')
tc_euicc_grp.add_argument('--lui-e-scws', action='store_true',
help='LUIe based on SCWS supported')
tc_euicc_grp.add_argument('--metadata-update-alerting', action='store_true',
help='Metadata update alerting supported')
tc_euicc_grp.add_argument('--enterprise-capable-device', action='store_true',
help='Enterprise Capable Device')
tc_euicc_grp.add_argument('--lui-e-e4e', action='store_true',
help='LUIe using E4E (ENVELOPE tag E4) supported')
tc_euicc_grp.add_argument('--lpr', action='store_true',
help='LPR (LPA Proxy) supported')
@cmd2.with_argparser(term_cap_parser)
def do_terminal_capability(self, opts):
"""Perform the TERMINAL CAPABILITY function. Used to inform the UICC about terminal capability."""
ps_flags = {}
addl_if_flags = {}
euicc_flags = {}
opts_dict = vars(opts)
power_items = ['used_supply_voltage_class', 'maximum_available_power_supply', 'actual_used_freq_100k']
if any(opts_dict[x] for x in power_items):
if not all(opts_dict[x] for x in power_items):
raise argparse.ArgumentTypeError('If any of the Terminal Power Supply group options are used, all must be specified')
for k, v in opts_dict.items():
if k in AdditionalInterfacesSupport._construct.flags.keys():
addl_if_flags[k] = v
elif k in AdditionalTermCapEuicc._construct.flags.keys():
euicc_flags[k] = v
elif k in [f.name for f in TerminalPowerSupply._construct.subcons]:
if k == 'used_supply_voltage_class' and v:
v = {v: True}
ps_flags[k] = v
child_list = []
if any(x for x in ps_flags.values()):
child_list.append(TerminalPowerSupply(decoded=ps_flags))
if opts.extended_logical_channel:
child_list.append(ExtendedLchanTerminalSupport())
if any(x for x in addl_if_flags.values()):
child_list.append(AdditionalInterfacesSupport(decoded=addl_if_flags))
if any(x for x in euicc_flags.values()):
child_list.append(AdditionalTermCapEuicc(decoded=euicc_flags))
print(child_list)
tc = TerminalCapability(children=child_list)
self.terminal_capability(b2h(tc.to_tlv()))
def terminal_capability(self, data:Hexstr):
cmd_hex = "80AA0000%02x%s" % (len(data)//2, data)
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)

View File

@@ -43,7 +43,7 @@ from pySim.utils import dec_iccid, enc_iccid, dec_imsi, enc_imsi, dec_plmn, enc_
from pySim.profile import CardProfile, CardProfileAddon
from pySim.filesystem import *
from pySim.ts_31_102_telecom import DF_PHONEBOOK, DF_MULTIMEDIA, DF_MCS, DF_V2X
from pySim.gsm_r import AddonGSMR
from pySim.profile.gsm_r import AddonGSMR
# Mapping between SIM Service Number and its description
EF_SST_map = {

View File

@@ -25,6 +25,7 @@ from osmocom.utils import *
from osmocom.construct import *
from pySim.filesystem import *
from pySim.profile.ts_102_221 import CardProfileUICC
from pySim.runtime import RuntimeState
import pySim
@@ -180,7 +181,7 @@ class DF_SYSTEM(CardDF):
self.add_files(files)
def decode_select_response(self, resp_hex):
return pySim.ts_102_221.CardProfileUICC.decode_select_response(resp_hex)
return CardProfileUICC.decode_select_response(resp_hex)
class EF_USIM_SQN(TransparentEF):

View File

@@ -328,11 +328,13 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
from pySim.transport.pcsc import PcscSimLink
from pySim.transport.modem_atcmd import ModemATCommandLink
from pySim.transport.calypso import CalypsoSimLink
from pySim.transport.wsrc import WsrcSimLink
SerialSimLink.argparse_add_reader_args(arg_parser)
PcscSimLink.argparse_add_reader_args(arg_parser)
ModemATCommandLink.argparse_add_reader_args(arg_parser)
CalypsoSimLink.argparse_add_reader_args(arg_parser)
WsrcSimLink.argparse_add_reader_args(arg_parser)
arg_parser.add_argument('--apdu-trace', action='store_true',
help='Trace the command/response APDUs exchanged with the card')
@@ -355,6 +357,9 @@ def init_reader(opts, **kwargs) -> LinkBase:
elif opts.modem_dev is not None:
from pySim.transport.modem_atcmd import ModemATCommandLink
sl = ModemATCommandLink(opts, **kwargs)
elif opts.wsrc_server_url is not None:
from pySim.transport.wsrc import WsrcSimLink
sl = WsrcSimLink(opts, **kwargs)
else: # Serial reader is default
print("No reader/driver specified; falling back to default (Serial reader)")
from pySim.transport.serial import SerialSimLink

99
pySim/transport/wsrc.py Normal file
View File

@@ -0,0 +1,99 @@
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
from typing import Optional
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
from pySim.transport import LinkBase
from pySim.utils import ResTuple
from pySim.wsrc import WSRC_DEFAULT_PORT_USER
from pySim.wsrc.client_blocking import WsClientBlocking
class UserWsClientBlocking(WsClientBlocking):
def __init__(self, ws_uri: str, **kwargs):
super().__init__('user', ws_uri, **kwargs)
def select_card(self, id_type:str, id_str:str):
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
'select_card_ack')
return rx
def reset_card(self):
self.transceive_json('reset_req', {}, 'reset_resp')
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
return rx['response'], rx['sw']
class WsrcSimLink(LinkBase):
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
name = 'WSRC'
def __init__(self, opts: argparse.Namespace, **kwargs):
super().__init__(**kwargs)
self.identities = {}
self.server_url = opts.wsrc_server_url
if opts.wsrc_eid:
self.id_type = 'eid'
self.id_str = opts.wsrc_eid
elif opts.wsrc_iccid:
self.id_type = 'iccid'
self.id_str = opts.wsrc_iccid
self.client = UserWsClientBlocking(self.server_url)
self.client.connect()
def __del__(self):
# FIXME: disconnect from server
pass
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
self.connect()
def connect(self):
rx = self.client.select_card(self.id_type, self.id_str)
self.identities = rx['identities']
def get_atr(self) -> Hexstr:
return h2i(self.identities['ATR'])
def disconnect(self):
self.__delete__()
def _reset_card(self):
self.client.reset_card()
return 1
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
return self.client.xceive_apdu_raw(pdu)
def __str__(self) -> str:
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
@staticmethod
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
can be accessed via a network.""")
wsrc_group.add_argument('--wsrc-server-url', default='ws://localhost:%u' % WSRC_DEFAULT_PORT_USER,
help='URI of the WSRC server to connect to')
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
help='ICCID of the card to open via WSRC')
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
help='EID of the card to open via WSRC')

View File

@@ -18,22 +18,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from bidict import bidict
from construct import Select, Const, Bit, Struct, Int16ub, FlagsEnum, GreedyString, ValidationError
from construct import Select, Const, Bit, Struct, Int16ub, FlagsEnum, ValidationError
from construct import Optional as COptional, Computed
from osmocom.construct import *
from osmocom.utils import *
from osmocom.tlv import *
from osmocom.tlv import BER_TLV_IE, flatten_dict_lists
from pySim.utils import *
from pySim.filesystem import *
from pySim.profile import CardProfile
from pySim import iso7816_4
# A UICC will usually also support 2G functionality. If this is the case, we
# need to add DF_GSM and DF_TELECOM along with the UICC related files
from pySim.ts_51_011 import AddonSIM, EF_ICCID, EF_PL
from pySim.gsm_r import AddonGSMR
from pySim.cdma_ruim import AddonRUIM
#from pySim.filesystem import *
#from pySim.profile import CardProfile
ts_102_22x_cmdset = CardCommandSet('TS 102 22x', [
# TS 102 221 Section 10.1.2 Table 10.5 "Coding of Instruction Byte"
@@ -288,6 +282,12 @@ class FcpTemplate(BER_TLV_IE, tag=0x62, nested=[FileSize, TotalFileSize, FileDes
PinStatusTemplate_DO]):
pass
def decode_select_response(data_hex: str) -> object:
"""ETSI TS 102 221 Section 11.1.1.3"""
t = FcpTemplate()
t.from_tlv(h2b(data_hex))
d = t.to_dict()
return flatten_dict_lists(d['fcp_template'])
def tlv_key_replace(inmap, indata):
def newkey(inmap, key):
@@ -634,361 +634,3 @@ NOT_DO = Nested_DO('not', 0xaf, NOT_Template)
SC_DO = DataObjectChoice('security_condition', 'Security Condition',
members=[Always_DO, Never_DO, SecCondByte_DO(), SecCondByte_DO(0x9e), CRT_DO(),
OR_DO, AND_DO, NOT_DO])
# TS 102 221 Section 13.1
class EF_DIR(LinFixedEF):
_test_de_encode = [
( '61294f10a0000000871002ffffffff890709000050055553696d31730ea00c80011781025f608203454150',
{ "application_template": [ { "application_id": h2b("a0000000871002ffffffff8907090000") },
{ "application_label": "USim1" },
{ "discretionary_template": h2b("a00c80011781025f608203454150") } ] }
),
( '61194f10a0000000871004ffffffff890709000050054953696d31',
{ "application_template": [ { "application_id": h2b("a0000000871004ffffffff8907090000") },
{ "application_label": "ISim1" } ] }
),
]
class ApplicationLabel(BER_TLV_IE, tag=0x50):
# TODO: UCS-2 coding option as per Annex A of TS 102 221
_construct = GreedyString('ascii')
# see https://github.com/PyCQA/pylint/issues/5794
#pylint: disable=undefined-variable
class ApplicationTemplate(BER_TLV_IE, tag=0x61,
nested=[iso7816_4.ApplicationId, ApplicationLabel, iso7816_4.FileReference,
iso7816_4.CommandApdu, iso7816_4.DiscretionaryData,
iso7816_4.DiscretionaryTemplate, iso7816_4.URL,
iso7816_4.ApplicationRelatedDOSet]):
pass
def __init__(self, fid='2f00', sfid=0x1e, name='EF.DIR', desc='Application Directory'):
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(5, 54))
self._tlv = EF_DIR.ApplicationTemplate
# TS 102 221 Section 13.4
class EF_ARR(LinFixedEF):
_test_de_encode = [
( '800101a40683010a950108800106900080016097008401d4a40683010a950108',
[ [ { "access_mode": [ "read_search_compare" ] },
{ "control_reference_template": "ADM1" } ],
[ { "access_mode": [ "write_append", "update_erase" ] },
{ "always": None } ],
[ { "access_mode": [ "delete_file", "terminate_ef" ] },
{ "never": None } ],
[ { "command_header": { "INS": 212 } },
{ "control_reference_template": "ADM1" } ]
] ),
( '80010190008001029700800118a40683010a9501088401d4a40683010a950108',
[ [ { "access_mode": [ "read_search_compare" ] },
{ "always": None } ],
[ { "access_mode": [ "update_erase" ] },
{ "never": None } ],
[ { "access_mode": [ "activate_file_or_record", "deactivate_file_or_record" ] },
{ "control_reference_template": "ADM1" } ],
[ { "command_header": { "INS": 212 } },
{ "control_reference_template": "ADM1" } ]
] ),
]
def __init__(self, fid='2f06', sfid=0x06, name='EF.ARR', desc='Access Rule Reference'):
super().__init__(fid, sfid=sfid, name=name, desc=desc)
# add those commands to the general commands of a TransparentEF
self.shell_commands += [self.AddlShellCommands()]
@staticmethod
def flatten(inp: list):
"""Flatten the somewhat deep/complex/nested data returned from decoder."""
def sc_abbreviate(sc):
if 'always' in sc:
return 'always'
elif 'never' in sc:
return 'never'
elif 'control_reference_template' in sc:
return sc['control_reference_template']
else:
return sc
by_mode = {}
for t in inp:
am = t[0]
sc = t[1]
sc_abbr = sc_abbreviate(sc)
if 'access_mode' in am:
for m in am['access_mode']:
by_mode[m] = sc_abbr
elif 'command_header' in am:
ins = am['command_header']['INS']
if 'CLA' in am['command_header']:
cla = am['command_header']['CLA']
else:
cla = None
cmd = ts_102_22x_cmdset.lookup(ins, cla)
if cmd:
name = cmd.name.lower().replace(' ', '_')
by_mode[name] = sc_abbr
else:
raise ValueError
else:
raise ValueError
return by_mode
def _decode_record_bin(self, raw_bin_data, **kwargs):
# we can only guess if we should decode for EF or DF here :(
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
dec = arr_seq.decode_multi(raw_bin_data)
# we cannot pass the result through flatten() here, as we don't have a related
# 'un-flattening' decoder, and hence would be unable to encode :(
return dec[0]
def _encode_record_bin(self, in_json, **kwargs):
# we can only guess if we should decode for EF or DF here :(
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
return arr_seq.encode_multi(in_json)
@with_default_category('File-Specific Commands')
class AddlShellCommands(CommandSet):
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
def do_read_arr_record(self, opts):
"""Read one EF.ARR record in flattened, human-friendly form."""
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
data = self._cmd.lchan.selected_file.flatten(data)
self._cmd.poutput_json(data, opts.oneline)
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_recs_dec_parser)
def do_read_arr_records(self, opts):
"""Read + decode all EF.ARR records in flattened, human-friendly form."""
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
# collect all results in list so they are rendered as JSON list when printing
data_list = []
for recnr in range(1, 1 + num_of_rec):
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
data = self._cmd.lchan.selected_file.flatten(data)
data_list.append(data)
self._cmd.poutput_json(data_list, opts.oneline)
# TS 102 221 Section 13.6
class EF_UMPC(TransparentEF):
_test_de_encode = [
( '3cff02', { "max_current_mA": 60, "t_op_s": 255,
"addl_info": { "req_inc_idle_current": False, "support_uicc_suspend": True } } ),
( '320500', { "max_current_mA": 50, "t_op_s": 5, "addl_info": {"req_inc_idle_current": False,
"support_uicc_suspend": False } } ),
]
def __init__(self, fid='2f08', sfid=0x08, name='EF.UMPC', desc='UICC Maximum Power Consumption'):
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=(5, 5))
addl_info = FlagsEnum(Byte, req_inc_idle_current=1,
support_uicc_suspend=2)
self._construct = Struct(
'max_current_mA'/Int8ub, 't_op_s'/Int8ub, 'addl_info'/addl_info)
class CardProfileUICC(CardProfile):
ORDER = 10
def __init__(self, name='UICC'):
files = [
EF_DIR(),
EF_ICCID(),
EF_PL(),
EF_ARR(),
# FIXME: DF.CD
EF_UMPC(),
]
addons = [
AddonSIM,
AddonGSMR,
AddonRUIM,
]
sw = {
'Normal': {
'9000': 'Normal ending of the command',
'91xx': 'Normal ending of the command, with extra information from the proactive UICC containing a command for the terminal',
'92xx': 'Normal ending of the command, with extra information concerning an ongoing data transfer session',
},
'Postponed processing': {
'9300': 'SIM Application Toolkit is busy. Command cannot be executed at present, further normal commands are allowed',
},
'Warnings': {
'6200': 'No information given, state of non-volatile memory unchanged',
'6281': 'Part of returned data may be corrupted',
'6282': 'End of file/record reached before reading Le bytes or unsuccessful search',
'6283': 'Selected file invalidated/disabled; needs to be activated before use',
'6284': 'Selected file in termination state',
'62f1': 'More data available',
'62f2': 'More data available and proactive command pending',
'62f3': 'Response data available',
'63f1': 'More data expected',
'63f2': 'More data expected and proactive command pending',
'63cx': 'Command successful but after using an internal update retry routine X times',
},
'Execution errors': {
'6400': 'No information given, state of non-volatile memory unchanged',
'6500': 'No information given, state of non-volatile memory changed',
'6581': 'Memory problem',
},
'Checking errors': {
'6700': 'Wrong length',
'67xx': 'The interpretation of this status word is command dependent',
'6b00': 'Wrong parameter(s) P1-P2',
'6d00': 'Instruction code not supported or invalid',
'6e00': 'Class not supported',
'6f00': 'Technical problem, no precise diagnosis',
'6fxx': 'The interpretation of this status word is command dependent',
},
'Functions in CLA not supported': {
'6800': 'No information given',
'6881': 'Logical channel not supported',
'6882': 'Secure messaging not supported',
},
'Command not allowed': {
'6900': 'No information given',
'6981': 'Command incompatible with file structure',
'6982': 'Security status not satisfied',
'6983': 'Authentication/PIN method blocked',
'6984': 'Referenced data invalidated',
'6985': 'Conditions of use not satisfied',
'6986': 'Command not allowed (no EF selected)',
'6989': 'Command not allowed - secure channel - security not satisfied',
},
'Wrong parameters': {
'6a80': 'Incorrect parameters in the data field',
'6a81': 'Function not supported',
'6a82': 'File not found',
'6a83': 'Record not found',
'6a84': 'Not enough memory space',
'6a86': 'Incorrect parameters P1 to P2',
'6a87': 'Lc inconsistent with P1 to P2',
'6a88': 'Referenced data not found',
},
'Application errors': {
'9850': 'INCREASE cannot be performed, max value reached',
'9862': 'Authentication error, application specific',
'9863': 'Security session or association expired',
'9864': 'Minimum UICC suspension time is too long',
},
}
super().__init__(name, desc='ETSI TS 102 221', cla="00",
sel_ctrl="0004", files_in_mf=files, sw=sw,
shell_cmdsets = [self.AddlShellCommands()], addons = addons)
@staticmethod
def decode_select_response(data_hex: str) -> object:
"""ETSI TS 102 221 Section 11.1.1.3"""
t = FcpTemplate()
t.from_tlv(h2b(data_hex))
d = t.to_dict()
return flatten_dict_lists(d['fcp_template'])
@classmethod
def _try_match_card(cls, scc: SimCardCommands) -> None:
""" Try to access MF via UICC APDUs (3GPP TS 102.221), if this works, the
card is considered a UICC card."""
cls._mf_select_test(scc, "00", "0004", ["3f00"])
@with_default_category('TS 102 221 Specific Commands')
class AddlShellCommands(CommandSet):
suspend_uicc_parser = argparse.ArgumentParser()
suspend_uicc_parser.add_argument('--min-duration-secs', type=int, default=60,
help='Proposed minimum duration of suspension')
suspend_uicc_parser.add_argument('--max-duration-secs', type=int, default=24*60*60,
help='Proposed maximum duration of suspension')
# not ISO7816-4 but TS 102 221
@cmd2.with_argparser(suspend_uicc_parser)
def do_suspend_uicc(self, opts):
"""Perform the SUSPEND UICC command. Only supported on some UICC (check EF.UMPC)."""
(duration, token, sw) = self._cmd.card._scc.suspend_uicc(min_len_secs=opts.min_duration_secs,
max_len_secs=opts.max_duration_secs)
self._cmd.poutput(
'Negotiated Duration: %u secs, Token: %s, SW: %s' % (duration, token, sw))
resume_uicc_parser = argparse.ArgumentParser()
resume_uicc_parser.add_argument('TOKEN', type=str, help='Token provided during SUSPEND')
@cmd2.with_argparser(resume_uicc_parser)
def do_resume_uicc(self, opts):
"""Perform the REUSME UICC operation. Only supported on some UICC. Also: A power-cycle
of the card is required between SUSPEND and RESUME, and only very few non-RESUME
commands are permitted between SUSPEND and RESUME. See TS 102 221 Section 11.1.22."""
self._cmd.card._scc.resume_uicc(opts.TOKEN)
term_cap_parser = argparse.ArgumentParser()
# power group
tc_power_grp = term_cap_parser.add_argument_group('Terminal Power Supply')
tc_power_grp.add_argument('--used-supply-voltage-class', type=str, choices=['a','b','c','d','e'],
help='Actual used Supply voltage class')
tc_power_grp.add_argument('--maximum-available-power-supply', type=auto_uint8,
help='Maximum available power supply of the terminal')
tc_power_grp.add_argument('--actual-used-freq-100k', type=auto_uint8,
help='Actual used clock frequency (in units of 100kHz)')
# no separate groups for those two
tc_elc_grp = term_cap_parser.add_argument_group('Extended logical channels terminal support')
tc_elc_grp.add_argument('--extended-logical-channel', action='store_true',
help='Extended Logical Channel supported')
tc_aif_grp = term_cap_parser.add_argument_group('Additional interfaces support')
tc_aif_grp.add_argument('--uicc-clf', action='store_true',
help='Local User Interface in the Device (LUId) supported')
# eUICC group
tc_euicc_grp = term_cap_parser.add_argument_group('Additional Terminal capability indications related to eUICC')
tc_euicc_grp.add_argument('--lui-d', action='store_true',
help='Local User Interface in the Device (LUId) supported')
tc_euicc_grp.add_argument('--lpd-d', action='store_true',
help='Local Profile Download in the Device (LPDd) supported')
tc_euicc_grp.add_argument('--lds-d', action='store_true',
help='Local Discovery Service in the Device (LPDd) supported')
tc_euicc_grp.add_argument('--lui-e-scws', action='store_true',
help='LUIe based on SCWS supported')
tc_euicc_grp.add_argument('--metadata-update-alerting', action='store_true',
help='Metadata update alerting supported')
tc_euicc_grp.add_argument('--enterprise-capable-device', action='store_true',
help='Enterprise Capable Device')
tc_euicc_grp.add_argument('--lui-e-e4e', action='store_true',
help='LUIe using E4E (ENVELOPE tag E4) supported')
tc_euicc_grp.add_argument('--lpr', action='store_true',
help='LPR (LPA Proxy) supported')
@cmd2.with_argparser(term_cap_parser)
def do_terminal_capability(self, opts):
"""Perform the TERMINAL CAPABILITY function. Used to inform the UICC about terminal capability."""
ps_flags = {}
addl_if_flags = {}
euicc_flags = {}
opts_dict = vars(opts)
power_items = ['used_supply_voltage_class', 'maximum_available_power_supply', 'actual_used_freq_100k']
if any(opts_dict[x] for x in power_items):
if not all(opts_dict[x] for x in power_items):
raise argparse.ArgumentTypeError('If any of the Terminal Power Supply group options are used, all must be specified')
for k, v in opts_dict.items():
if k in AdditionalInterfacesSupport._construct.flags.keys():
addl_if_flags[k] = v
elif k in AdditionalTermCapEuicc._construct.flags.keys():
euicc_flags[k] = v
elif k in [f.name for f in TerminalPowerSupply._construct.subcons]:
if k == 'used_supply_voltage_class' and v:
v = {v: True}
ps_flags[k] = v
child_list = []
if any(x for x in ps_flags.values()):
child_list.append(TerminalPowerSupply(decoded=ps_flags))
if opts.extended_logical_channel:
child_list.append(ExtendedLchanTerminalSupport())
if any(x for x in addl_if_flags.values()):
child_list.append(AdditionalInterfacesSupport(decoded=addl_if_flags))
if any(x for x in euicc_flags.values()):
child_list.append(AdditionalTermCapEuicc(decoded=euicc_flags))
print(child_list)
tc = TerminalCapability(children=child_list)
self.terminal_capability(b2h(tc.to_tlv()))
def terminal_capability(self, data:Hexstr):
cmd_hex = "80AA0000%02x%s" % (len(data)//2, data)
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)

View File

@@ -36,14 +36,13 @@ from osmocom.utils import is_hexstr
from osmocom.tlv import *
from osmocom.construct import *
import pySim.ts_102_221
from pySim.ts_51_011 import EF_ACMmax, EF_AAeM, EF_eMLPP, EF_CMI, EF_PNN
from pySim.ts_51_011 import EF_MMSN, EF_MMSICP, EF_MMSUP, EF_MMSUCP, EF_VGCS, EF_VGCSS, EF_NIA
from pySim.ts_51_011 import EF_SMSR, EF_DCK, EF_EXT, EF_CNL, EF_OPL, EF_MBI, EF_MWIS
from pySim.ts_51_011 import EF_CBMID, EF_CBMIR, EF_ADN, EF_CFIS, EF_SMS, EF_MSISDN, EF_SMSP, EF_SMSS
from pySim.ts_51_011 import EF_IMSI, EF_xPLMNwAcT, EF_SPN, EF_CBMI, EF_ACC, EF_PLMNsel
from pySim.ts_51_011 import EF_Kc, EF_CPBCCH, EF_InvScan
from pySim.ts_102_221 import EF_ARR
from pySim.profile.ts_51_011 import EF_ACMmax, EF_AAeM, EF_eMLPP, EF_CMI, EF_PNN
from pySim.profile.ts_51_011 import EF_MMSN, EF_MMSICP, EF_MMSUP, EF_MMSUCP, EF_VGCS, EF_VGCSS, EF_NIA
from pySim.profile.ts_51_011 import EF_SMSR, EF_DCK, EF_EXT, EF_CNL, EF_OPL, EF_MBI, EF_MWIS
from pySim.profile.ts_51_011 import EF_CBMID, EF_CBMIR, EF_ADN, EF_CFIS, EF_SMS, EF_MSISDN, EF_SMSP, EF_SMSS
from pySim.profile.ts_51_011 import EF_IMSI, EF_xPLMNwAcT, EF_SPN, EF_CBMI, EF_ACC, EF_PLMNsel
from pySim.profile.ts_51_011 import EF_Kc, EF_CPBCCH, EF_InvScan
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
from pySim.filesystem import *
from pySim.ts_31_102_telecom import DF_PHONEBOOK, EF_UServiceTable
from pySim.ts_31_103_shared import EF_IMSConfigData, EF_XCAPConfigData, EF_MuDMiDConfigData
@@ -1750,7 +1749,7 @@ class ADF_USIM(CardADF):
self.add_files(files)
def decode_select_response(self, data_hex):
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
return CardProfileUICC.decode_select_response(data_hex)
@with_default_category('Application-Specific Commands')
class AddlShellCommands(CommandSet):

View File

@@ -27,12 +27,11 @@ from osmocom.utils import *
from osmocom.tlv import *
from osmocom.construct import *
from pySim.filesystem import *
from pySim.ts_51_011 import EF_AD, EF_SMS, EF_SMSS, EF_SMSR, EF_SMSP
from pySim.profile.ts_51_011 import EF_AD, EF_SMS, EF_SMSS, EF_SMSR, EF_SMSP
from pySim.ts_31_102 import ADF_USIM, EF_FromPreferred
from pySim.ts_31_102_telecom import EF_UServiceTable
from pySim.ts_31_103_shared import *
import pySim.ts_102_221
from pySim.ts_102_221 import EF_ARR
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
# Mapping between ISIM Service Number and its description
EF_IST_map = {
@@ -226,7 +225,7 @@ class ADF_ISIM(CardADF):
self.shell_commands += [ADF_USIM.AddlShellCommands()]
def decode_select_response(self, data_hex):
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
return CardProfileUICC.decode_select_response(data_hex)
# TS 31.103 Section 7.1

View File

@@ -24,9 +24,8 @@ from osmocom.utils import *
from osmocom.tlv import *
from pySim.filesystem import *
from pySim.ts_31_102 import ADF_USIM
from pySim.ts_51_011 import EF_IMSI, EF_AD
import pySim.ts_102_221
from pySim.ts_102_221 import EF_ARR
from pySim.profile.ts_51_011 import EF_IMSI, EF_AD
from pySim.profile.ts_102_221 import EF_ARR, CardProfileUICC
class ADF_HPSIM(CardADF):
@@ -44,7 +43,7 @@ class ADF_HPSIM(CardADF):
self.shell_commands += [ADF_USIM.AddlShellCommands()]
def decode_select_response(self, data_hex):
return pySim.ts_102_221.CardProfileUICC.decode_select_response(data_hex)
return CardProfileUICC.decode_select_response(data_hex)
# TS 31.104 Section 7.1

3
pySim/wsrc/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
WSRC_DEFAULT_PORT_USER = 4220
WSRC_DEFAULT_PORT_CARD = 4221

View File

@@ -0,0 +1,68 @@
"""Connect smartcard to a remote server, so the remote server can take control and
perform commands on it."""
import abc
import json
import logging
from typing import Optional
from websockets.sync.client import connect
from osmocom.utils import b2h
logger = logging.getLogger(__name__)
class WsClientBlocking(abc.ABC):
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
def __init__(self, cltype: str, ws_uri: str):
self.client_type = cltype
self.ws_uri = ws_uri
self.ws = None
def connect(self):
self.ws = connect(uri=self.ws_uri)
self.perform_outbound_hello()
def tx_json(self, msg_type: str, d: dict = {}):
"""JSON-Encode and transmit a message to the given websocket."""
d['msg_type'] = msg_type
d_js = json.dumps(d)
logger.debug("Tx: %s", d_js)
self.ws.send(d_js)
def tx_error(self, message: str):
event = {
'message': message,
}
self.tx_json('error', event)
def rx_json(self):
"""Receive a single message from the given websocket and JSON-decode it."""
rx = self.ws.recv()
rx_js = json.loads(rx)
logger.debug("Rx: %s", rx_js)
assert 'msg_type' in rx
return rx_js
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
self.tx_json(tx_msg_type, tx_d)
rx = self.rx_json()
assert rx['msg_type'] == rx_msg_type
return rx
def perform_outbound_hello(self, tx: dict = {}):
if not 'client_type' in tx:
tx['client_type'] = self.client_type
self.tx_json('hello', tx)
rx = self.rx_json()
assert rx['msg_type'] == 'hello_ack'
return rx
def rx_and_execute_cmd(self):
"""Receve and dispatch/execute a single command from the server."""
rx = self.rx_json()
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
if handler:
handler(rx)
else:
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])

View File

@@ -25,10 +25,10 @@ import pySim.ts_102_221
import pySim.ts_102_222
import pySim.ts_31_102
import pySim.ts_31_103
import pySim.ts_51_011
import pySim.profile.ts_51_011
import pySim.sysmocom_sja2
import pySim.gsm_r
import pySim.cdma_ruim
import pySim.profile.gsm_r
import pySim.profile.cdma_ruim
from construct import Int8ub, Struct, Padding, this
from osmocom.tlv import BER_TLV_IE

View File

@@ -26,10 +26,10 @@ import pySim.ts_102_221
import pySim.ts_102_222
import pySim.ts_31_102
import pySim.ts_31_103
import pySim.ts_51_011
import pySim.profile.ts_51_011
import pySim.sysmocom_sja2
import pySim.gsm_r
import pySim.cdma_ruim
import pySim.profile.gsm_r
import pySim.profile.cdma_ruim
import pySim.global_platform
import pySim.global_platform.http