Compare commits
3 Commits
master
...
daniel/ota
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3506940448 | ||
|
|
637276472d | ||
|
|
a105b55751 |
46
osmo-ras.py
Executable file
46
osmo-ras.py
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Remote Application Server for Remote Application Management over HTTP
|
||||
# See Amendment B of the GlobalPlatform Card Specification v2.2
|
||||
#
|
||||
# (C) 2025 sysmocom s.f.m.c.
|
||||
# Author: Daniel Willmann <dwillmann@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from http.server import HTTPServer, SimpleHTTPRequestHandler
|
||||
from ssl import PROTOCOL_TLS_SERVER, SSLContext, TLSVersion
|
||||
|
||||
|
||||
context = SSLContext(PROTOCOL_TLS_SERVER)
|
||||
context.maximum_version = TLSVersion.TLSv1_2
|
||||
CIPHERS_1_0 = "TLS_PSK_WITH_3DES_EDE_CBC_SHA,TLS_PSK_WITH_AES_128_CBC_SHA,TLS_PSK_WITH_NULL_SHA"
|
||||
CIPHERS_1_2 = "TLS_PSK_WITH_AES_128_CBC_SHA256,TLS_PSK_WITH_NULL_SHA256"
|
||||
context.set_ciphers(CIPHERS_1_2)
|
||||
|
||||
# A table using the identity of the client:
|
||||
psk_table = { 'ClientId_1': bytes.fromhex('c0ffee'),
|
||||
'ClientId_2': bytes.fromhex('facade')
|
||||
}
|
||||
|
||||
def get_psk(ident):
|
||||
""" Get the PSK for the client """
|
||||
print(f"Get PSK for {ident}")
|
||||
return psk_table.get(ident, b'')
|
||||
|
||||
context.set_psk_server_callback(get_psk)
|
||||
|
||||
server = HTTPServer(("0.0.0.0", 8080), SimpleHTTPRequestHandler)
|
||||
server.socket = context.wrap_socket(server.socket, server_side=True)
|
||||
server.serve_forever()
|
||||
@@ -53,7 +53,7 @@ from pySim.cards import UiccCardBase
|
||||
from pySim.exceptions import *
|
||||
from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload, BearerDescription
|
||||
from pySim.cat import DeviceIdentities, Address, OtherAddress, UiccTransportLevel, BufferSize
|
||||
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength
|
||||
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength, EventDownload, EventList
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -71,24 +71,46 @@ class MyApduTracer(ApduTracer):
|
||||
print("-> %s %s" % (cmd[:10], cmd[10:]))
|
||||
print("<- %s: %s" % (sw, resp))
|
||||
|
||||
class TcpProtocol(protocol.Protocol):
|
||||
def dataReceived(self, data):
|
||||
pass
|
||||
|
||||
def connectionLost(self, reason):
|
||||
pass
|
||||
|
||||
|
||||
def tcp_connected_callback(p: protocol.Protocol):
|
||||
"""called by twisted TCP client."""
|
||||
logger.error("%s: connected!" % p)
|
||||
for data in p.pending_tx:
|
||||
p.transport.write(data)
|
||||
|
||||
class ProactChannel:
|
||||
"""Representation of a single protective channel."""
|
||||
class ProactChannel(protocol.Protocol):
|
||||
"""Representation of a single proective channel."""
|
||||
def __init__(self, channels: 'ProactChannels', chan_nr: int):
|
||||
self.channels = channels
|
||||
self.chan_nr = chan_nr
|
||||
self.ep = None
|
||||
self.pending_tx = []
|
||||
self.pending_rx = bytearray()
|
||||
|
||||
def write(self, data: bytes):
|
||||
if self.connected:
|
||||
self.transport.write(data)
|
||||
else:
|
||||
self.pending_tx.append(data)
|
||||
|
||||
def dataReceived(self, data: bytes):
|
||||
logger.error(f"Got data (len={len(data)}): {data}")
|
||||
self.pending_rx.extend(data)
|
||||
# Send ENVELOPE with EventDownload Data available
|
||||
event_list_ie = EventList(decoded=[ EventList.Event.data_available])
|
||||
channel_status_ie = ChannelStatus(decoded='8100')
|
||||
channel_data_len_ie = ChannelDataLength(decoded=min(255,len(self.pending_rx)))
|
||||
dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
|
||||
event_dl = EventDownload(children=[event_list_ie, dev_ids, channel_status_ie, channel_data_len_ie])
|
||||
# 3) send to the card
|
||||
envelope_hex = b2h(event_dl.to_tlv())
|
||||
logger.info("ENVELOPE Event: %s" % envelope_hex)
|
||||
global g_ms
|
||||
(data, sw) = g_ms.scc.envelope(envelope_hex)
|
||||
logger.info("SW %s: %s" % (sw, data))
|
||||
# FIXME: Handle result?!
|
||||
|
||||
def connectionLost(self, reason):
|
||||
logger.error("connection lost: %s" % reason)
|
||||
|
||||
def close(self):
|
||||
"""Close the channel."""
|
||||
@@ -174,14 +196,13 @@ class Proact(ProactiveHandler):
|
||||
raise ValueError('Unsupported protocol_type')
|
||||
if other_addr_ie.decoded.get('type_of_address', None) != 'ipv4':
|
||||
raise ValueError('Unsupported type_of_address')
|
||||
ipv4_bytes = h2b(other_addr_ie.decoded['address'])
|
||||
ipv4_bytes = other_addr_ie.decoded['address']
|
||||
ipv4_str = '%u.%u.%u.%u' % (ipv4_bytes[0], ipv4_bytes[1], ipv4_bytes[2], ipv4_bytes[3])
|
||||
port_nr = transp_lvl_ie.decoded['port_number']
|
||||
print("%s:%u" % (ipv4_str, port_nr))
|
||||
logger.error("OpenChannel opening with %s:%u" % (ipv4_str, port_nr))
|
||||
channel = self.channels.channel_create()
|
||||
channel.ep = endpoints.TCP4ClientEndpoint(reactor, ipv4_str, port_nr)
|
||||
channel.prot = TcpProtocol()
|
||||
d = endpoints.connectProtocol(channel.ep, channel.prot)
|
||||
d = endpoints.connectProtocol(channel.ep, channel)
|
||||
# FIXME: why is this never called despite the client showing the inbound connection?
|
||||
d.addCallback(tcp_connected_callback)
|
||||
|
||||
@@ -213,6 +234,17 @@ class Proact(ProactiveHandler):
|
||||
# ]}
|
||||
logger.info("ReceiveData")
|
||||
logger.info(pcmd)
|
||||
dev_id_ie = Proact._find_first_element_of_type(pcmd.children, DeviceIdentities)
|
||||
chan_data_len_ie = Proact._find_first_element_of_type(pcmd.children, ChannelDataLength)
|
||||
len_requested = chan_data_len_ie.decoded
|
||||
chan_str = dev_id_ie.decoded['dest_dev_id']
|
||||
chan_nr = 1 # FIXME
|
||||
chan = self.channels.channels.get(chan_nr, None)
|
||||
|
||||
requested = chan.pending_rx[:len_requested]
|
||||
chan.pending_rx = chan.pending_rx[len_requested:]
|
||||
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
|
||||
|
||||
# Terminal Response example: [
|
||||
# {'command_details': {'command_number': 1,
|
||||
# 'type_of_command': 'receive_data',
|
||||
@@ -222,7 +254,8 @@ class Proact(ProactiveHandler):
|
||||
# {'channel_data': '16030100040e000000'},
|
||||
# {'channel_data_length': 0}
|
||||
# ]
|
||||
return self.prepare_response(pcmd) + []
|
||||
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
|
||||
return resp
|
||||
|
||||
def handle_SendData(self, pcmd: ProactiveCommand):
|
||||
"""Send/write data received from the SIM to the socket."""
|
||||
@@ -240,7 +273,10 @@ class Proact(ProactiveHandler):
|
||||
chan_str = dev_id_ie.decoded['dest_dev_id']
|
||||
chan_nr = 1 # FIXME
|
||||
chan = self.channels.channels.get(chan_nr, None)
|
||||
# FIXME chan.prot.transport.write(h2b(chan_data_ie.decoded))
|
||||
# FIXME
|
||||
logger.error(f"Chan data received: {chan_data_ie.decoded}")
|
||||
chan.write(chan_data_ie.decoded)
|
||||
#chan.write(h2b(chan_data_ie.decoded))
|
||||
# Terminal Response example: [
|
||||
# {'command_details': {'command_number': 1,
|
||||
# 'type_of_command': 'send_data',
|
||||
@@ -425,4 +461,3 @@ if __name__ == '__main__':
|
||||
g_ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip, opts.smpp_system_id, opts.smpp_password)
|
||||
g_ms.connect_to_card(tp)
|
||||
reactor.run()
|
||||
|
||||
|
||||
107
smpp_ota_apdu.py
Executable file
107
smpp_ota_apdu.py
Executable file
@@ -0,0 +1,107 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
import sys
|
||||
from pprint import pprint as pp
|
||||
|
||||
from pySim.ota import OtaKeyset, OtaDialectSms
|
||||
from pySim.utils import b2h, h2b
|
||||
|
||||
import smpplib.gsm
|
||||
import smpplib.client
|
||||
import smpplib.consts
|
||||
import argparse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# if you want to know what's happening
|
||||
logging.basicConfig(level='DEBUG')
|
||||
|
||||
|
||||
|
||||
class Foo:
|
||||
def smpp_rx_handler(self, pdu):
|
||||
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
|
||||
if pdu.short_message:
|
||||
try:
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
|
||||
except ValueError:
|
||||
spi = self.spi.copy()
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
|
||||
pp(dec)
|
||||
return None
|
||||
|
||||
def __init__(self, kic, kid, idx, tar):
|
||||
# Two parts, UCS2, SMS with UDH
|
||||
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
|
||||
|
||||
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
|
||||
|
||||
# Print when obtain message_id
|
||||
client.set_message_sent_handler(
|
||||
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
|
||||
#client.set_message_received_handler(
|
||||
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
|
||||
client.set_message_received_handler(self.smpp_rx_handler)
|
||||
|
||||
client.connect()
|
||||
client.bind_transceiver(system_id='test', password='test')
|
||||
|
||||
self.client = client
|
||||
|
||||
|
||||
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=idx, kic=h2b(kic),
|
||||
algo_auth='triple_des_cbc2', kid_idx=idx, kid=h2b(kid))
|
||||
self.ota_keyset.cntr = 0xdadb
|
||||
self.tar = h2b(tar)
|
||||
|
||||
self.ota_dialect = OtaDialectSms()
|
||||
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
|
||||
|
||||
def tx_sms_tpdu(self, tpdu: bytes):
|
||||
self.client.send_message(
|
||||
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure it is a byte string, not unicode:
|
||||
source_addr='12',
|
||||
|
||||
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||
# Make sure thease two params are byte strings, not unicode:
|
||||
destination_addr='23',
|
||||
short_message=tpdu,
|
||||
|
||||
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
|
||||
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
|
||||
protocol_id=0x7f,
|
||||
#registered_delivery=True,
|
||||
)
|
||||
|
||||
def tx_c_apdu(self, apdu: bytes):
|
||||
logger.info("C-APDU: %s" % b2h(apdu))
|
||||
# translate to Secured OTA RFM
|
||||
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||
# add user data header
|
||||
tpdu = b'\x02\x70\x00' + secured
|
||||
# send via SMPP
|
||||
self.tx_sms_tpdu(tpdu)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument('-c', '--kic')
|
||||
parser.add_argument('-d', '--kid')
|
||||
parser.add_argument('-i', '--idx', type=int, default=1)
|
||||
parser.add_argument('-t', '--tar', default='b00011')
|
||||
parser.add_argument('apdu', default="", nargs='+')
|
||||
args = parser.parse_args()
|
||||
|
||||
f = Foo(args.kic, args.kid, args.idx, args.tar)
|
||||
print("initialized, sending APDU")
|
||||
f.tx_c_apdu(h2b("".join(args.apdu)))
|
||||
|
||||
f.client.listen()
|
||||
Reference in New Issue
Block a user