WIP: initial step towards websocket-based remote card [reader] access

Change-Id: I588bf4b3d9891766dd688a6818057ca20fb26e3f
This commit is contained in:
Harald Welte
2024-09-08 11:50:04 +02:00
parent de8cc322f1
commit 671b0f19b6
8 changed files with 699 additions and 0 deletions

View File

@@ -328,11 +328,13 @@ def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
from pySim.transport.pcsc import PcscSimLink
from pySim.transport.modem_atcmd import ModemATCommandLink
from pySim.transport.calypso import CalypsoSimLink
from pySim.transport.wsrc import WsrcSimLink
SerialSimLink.argparse_add_reader_args(arg_parser)
PcscSimLink.argparse_add_reader_args(arg_parser)
ModemATCommandLink.argparse_add_reader_args(arg_parser)
CalypsoSimLink.argparse_add_reader_args(arg_parser)
WsrcSimLink.argparse_add_reader_args(arg_parser)
arg_parser.add_argument('--apdu-trace', action='store_true',
help='Trace the command/response APDUs exchanged with the card')
@@ -355,6 +357,9 @@ def init_reader(opts, **kwargs) -> LinkBase:
elif opts.modem_dev is not None:
from pySim.transport.modem_atcmd import ModemATCommandLink
sl = ModemATCommandLink(opts, **kwargs)
elif opts.wsrc_server_url is not None:
from pySim.transport.wsrc import WsrcSimLink
sl = WsrcSimLink(opts, **kwargs)
else: # Serial reader is default
print("No reader/driver specified; falling back to default (Serial reader)")
from pySim.transport.serial import SerialSimLink

99
pySim/transport/wsrc.py Normal file
View File

@@ -0,0 +1,99 @@
# Copyright (C) 2024 Harald Welte <laforge@gnumonks.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
from typing import Optional
from osmocom.utils import h2i, i2h, Hexstr, is_hexstr
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
from pySim.transport import LinkBase
from pySim.utils import ResTuple
from pySim.wsrc import WSRC_DEFAULT_PORT_USER
from pySim.wsrc.client_blocking import WsClientBlocking
class UserWsClientBlocking(WsClientBlocking):
def __init__(self, ws_uri: str, **kwargs):
super().__init__('user', ws_uri, **kwargs)
def select_card(self, id_type:str, id_str:str):
rx = self.transceive_json('select_card', {'id_type': id_type, 'id_str': id_str},
'select_card_ack')
return rx
def reset_card(self):
self.transceive_json('reset_req', {}, 'reset_resp')
def xceive_apdu_raw(self, cmd: Hexstr) -> ResTuple:
rx = self.transceive_json('c_apdu', {'command': cmd}, 'r_apdu')
return rx['response'], rx['sw']
class WsrcSimLink(LinkBase):
""" pySim: WSRC (WebSocket Remote Card) reader transport link."""
name = 'WSRC'
def __init__(self, opts: argparse.Namespace, **kwargs):
super().__init__(**kwargs)
self.identities = {}
self.server_url = opts.wsrc_server_url
if opts.wsrc_eid:
self.id_type = 'eid'
self.id_str = opts.wsrc_eid
elif opts.wsrc_iccid:
self.id_type = 'iccid'
self.id_str = opts.wsrc_iccid
self.client = UserWsClientBlocking(self.server_url)
self.client.connect()
def __del__(self):
# FIXME: disconnect from server
pass
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
self.connect()
def connect(self):
rx = self.client.select_card(self.id_type, self.id_str)
self.identities = rx['identities']
def get_atr(self) -> Hexstr:
return h2i(self.identities['ATR'])
def disconnect(self):
self.__delete__()
def _reset_card(self):
self.client.reset_card()
return 1
def _send_apdu_raw(self, pdu: Hexstr) -> ResTuple:
return self.client.xceive_apdu_raw(pdu)
def __str__(self) -> str:
return "WSRC[%s=%s]" % (self.id_type, self.id_str)
@staticmethod
def argparse_add_reader_args(arg_parser: argparse.ArgumentParser):
wsrc_group = arg_parser.add_argument_group('WebSocket Remote Card',
"""WebSocket Remote Card (WSRC) is a protoocl by which remot cards / card readers
can be accessed via a network.""")
wsrc_group.add_argument('--wsrc-server-url', default='ws://localhost:%u' % WSRC_DEFAULT_PORT_USER,
help='URI of the WSRC server to connect to')
wsrc_group.add_argument('--wsrc-iccid', type=is_hexstr,
help='ICCID of the card to open via WSRC')
wsrc_group.add_argument('--wsrc-eid', type=is_hexstr,
help='EID of the card to open via WSRC')

3
pySim/wsrc/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
WSRC_DEFAULT_PORT_USER = 4220
WSRC_DEFAULT_PORT_CARD = 4221

View File

@@ -0,0 +1,68 @@
"""Connect smartcard to a remote server, so the remote server can take control and
perform commands on it."""
import abc
import json
import logging
from typing import Optional
from websockets.sync.client import connect
from osmocom.utils import b2h
logger = logging.getLogger(__name__)
class WsClientBlocking(abc.ABC):
"""Generalized synchronous/blocking client for the WSRC (Web Socket Remote Card) protocol"""
def __init__(self, cltype: str, ws_uri: str):
self.client_type = cltype
self.ws_uri = ws_uri
self.ws = None
def connect(self):
self.ws = connect(uri=self.ws_uri)
self.perform_outbound_hello()
def tx_json(self, msg_type: str, d: dict = {}):
"""JSON-Encode and transmit a message to the given websocket."""
d['msg_type'] = msg_type
d_js = json.dumps(d)
logger.debug("Tx: %s", d_js)
self.ws.send(d_js)
def tx_error(self, message: str):
event = {
'message': message,
}
self.tx_json('error', event)
def rx_json(self):
"""Receive a single message from the given websocket and JSON-decode it."""
rx = self.ws.recv()
rx_js = json.loads(rx)
logger.debug("Rx: %s", rx_js)
assert 'msg_type' in rx
return rx_js
def transceive_json(self, tx_msg_type: str, tx_d: Optional[dict], rx_msg_type: str) -> dict:
self.tx_json(tx_msg_type, tx_d)
rx = self.rx_json()
assert rx['msg_type'] == rx_msg_type
return rx
def perform_outbound_hello(self, tx: dict = {}):
if not 'client_type' in tx:
tx['client_type'] = self.client_type
self.tx_json('hello', tx)
rx = self.rx_json()
assert rx['msg_type'] == 'hello_ack'
return rx
def rx_and_execute_cmd(self):
"""Receve and dispatch/execute a single command from the server."""
rx = self.rx_json()
handler = getattr(self, 'handle_rx_%s' % rx['msg_type'], None)
if handler:
handler(rx)
else:
logger.error('Received unknown/unsupported msg_type %s' % rx['msg_type'])
self.tx_error('Message type "%s" is not supported' % rx['msg_type'])