3 Commits

Author SHA1 Message Date
Daniel Willmann
3506940448 pySim-smpp2sim: Implement handle_send-/receivedata
Change-Id: Icec9265520a6ab20cb4764e9cab5cdab31841862
2026-01-23 00:38:17 +01:00
Daniel Willmann
637276472d Add script to send ota PDU via SMPP
Change-Id: Idda80d57c26a9b3a33766bd06fc1af54db56874c
2026-01-23 00:28:37 +01:00
Daniel Willmann
a105b55751 Add osmo-ras.py Server for RAM over HTTP
Change-Id: Ib8fdc8f00f0b5bcd3365eca49b611328343a7edb
2026-01-23 00:27:20 +01:00
3 changed files with 206 additions and 18 deletions

46
osmo-ras.py Executable file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python3
# Remote Application Server for Remote Application Management over HTTP
# See Amendment B of the GlobalPlatform Card Specification v2.2
#
# (C) 2025 sysmocom s.f.m.c.
# Author: Daniel Willmann <dwillmann@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from http.server import HTTPServer, SimpleHTTPRequestHandler
from ssl import PROTOCOL_TLS_SERVER, SSLContext, TLSVersion
context = SSLContext(PROTOCOL_TLS_SERVER)
context.maximum_version = TLSVersion.TLSv1_2
CIPHERS_1_0 = "TLS_PSK_WITH_3DES_EDE_CBC_SHA,TLS_PSK_WITH_AES_128_CBC_SHA,TLS_PSK_WITH_NULL_SHA"
CIPHERS_1_2 = "TLS_PSK_WITH_AES_128_CBC_SHA256,TLS_PSK_WITH_NULL_SHA256"
context.set_ciphers(CIPHERS_1_2)
# A table using the identity of the client:
psk_table = { 'ClientId_1': bytes.fromhex('c0ffee'),
'ClientId_2': bytes.fromhex('facade')
}
def get_psk(ident):
""" Get the PSK for the client """
print(f"Get PSK for {ident}")
return psk_table.get(ident, b'')
context.set_psk_server_callback(get_psk)
server = HTTPServer(("0.0.0.0", 8080), SimpleHTTPRequestHandler)
server.socket = context.wrap_socket(server.socket, server_side=True)
server.serve_forever()

View File

@@ -53,7 +53,7 @@ from pySim.cards import UiccCardBase
from pySim.exceptions import *
from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload, BearerDescription
from pySim.cat import DeviceIdentities, Address, OtherAddress, UiccTransportLevel, BufferSize
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength, EventDownload, EventList
from pySim.utils import b2h, h2b
logger = logging.getLogger(__name__)
@@ -71,24 +71,46 @@ class MyApduTracer(ApduTracer):
print("-> %s %s" % (cmd[:10], cmd[10:]))
print("<- %s: %s" % (sw, resp))
class TcpProtocol(protocol.Protocol):
def dataReceived(self, data):
pass
def connectionLost(self, reason):
pass
def tcp_connected_callback(p: protocol.Protocol):
"""called by twisted TCP client."""
logger.error("%s: connected!" % p)
for data in p.pending_tx:
p.transport.write(data)
class ProactChannel:
"""Representation of a single protective channel."""
class ProactChannel(protocol.Protocol):
"""Representation of a single proective channel."""
def __init__(self, channels: 'ProactChannels', chan_nr: int):
self.channels = channels
self.chan_nr = chan_nr
self.ep = None
self.pending_tx = []
self.pending_rx = bytearray()
def write(self, data: bytes):
if self.connected:
self.transport.write(data)
else:
self.pending_tx.append(data)
def dataReceived(self, data: bytes):
logger.error(f"Got data (len={len(data)}): {data}")
self.pending_rx.extend(data)
# Send ENVELOPE with EventDownload Data available
event_list_ie = EventList(decoded=[ EventList.Event.data_available])
channel_status_ie = ChannelStatus(decoded='8100')
channel_data_len_ie = ChannelDataLength(decoded=min(255,len(self.pending_rx)))
dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
event_dl = EventDownload(children=[event_list_ie, dev_ids, channel_status_ie, channel_data_len_ie])
# 3) send to the card
envelope_hex = b2h(event_dl.to_tlv())
logger.info("ENVELOPE Event: %s" % envelope_hex)
global g_ms
(data, sw) = g_ms.scc.envelope(envelope_hex)
logger.info("SW %s: %s" % (sw, data))
# FIXME: Handle result?!
def connectionLost(self, reason):
logger.error("connection lost: %s" % reason)
def close(self):
"""Close the channel."""
@@ -174,14 +196,13 @@ class Proact(ProactiveHandler):
raise ValueError('Unsupported protocol_type')
if other_addr_ie.decoded.get('type_of_address', None) != 'ipv4':
raise ValueError('Unsupported type_of_address')
ipv4_bytes = h2b(other_addr_ie.decoded['address'])
ipv4_bytes = other_addr_ie.decoded['address']
ipv4_str = '%u.%u.%u.%u' % (ipv4_bytes[0], ipv4_bytes[1], ipv4_bytes[2], ipv4_bytes[3])
port_nr = transp_lvl_ie.decoded['port_number']
print("%s:%u" % (ipv4_str, port_nr))
logger.error("OpenChannel opening with %s:%u" % (ipv4_str, port_nr))
channel = self.channels.channel_create()
channel.ep = endpoints.TCP4ClientEndpoint(reactor, ipv4_str, port_nr)
channel.prot = TcpProtocol()
d = endpoints.connectProtocol(channel.ep, channel.prot)
d = endpoints.connectProtocol(channel.ep, channel)
# FIXME: why is this never called despite the client showing the inbound connection?
d.addCallback(tcp_connected_callback)
@@ -213,6 +234,17 @@ class Proact(ProactiveHandler):
# ]}
logger.info("ReceiveData")
logger.info(pcmd)
dev_id_ie = Proact._find_first_element_of_type(pcmd.children, DeviceIdentities)
chan_data_len_ie = Proact._find_first_element_of_type(pcmd.children, ChannelDataLength)
len_requested = chan_data_len_ie.decoded
chan_str = dev_id_ie.decoded['dest_dev_id']
chan_nr = 1 # FIXME
chan = self.channels.channels.get(chan_nr, None)
requested = chan.pending_rx[:len_requested]
chan.pending_rx = chan.pending_rx[len_requested:]
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
# Terminal Response example: [
# {'command_details': {'command_number': 1,
# 'type_of_command': 'receive_data',
@@ -222,7 +254,8 @@ class Proact(ProactiveHandler):
# {'channel_data': '16030100040e000000'},
# {'channel_data_length': 0}
# ]
return self.prepare_response(pcmd) + []
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
return resp
def handle_SendData(self, pcmd: ProactiveCommand):
"""Send/write data received from the SIM to the socket."""
@@ -240,7 +273,10 @@ class Proact(ProactiveHandler):
chan_str = dev_id_ie.decoded['dest_dev_id']
chan_nr = 1 # FIXME
chan = self.channels.channels.get(chan_nr, None)
# FIXME chan.prot.transport.write(h2b(chan_data_ie.decoded))
# FIXME
logger.error(f"Chan data received: {chan_data_ie.decoded}")
chan.write(chan_data_ie.decoded)
#chan.write(h2b(chan_data_ie.decoded))
# Terminal Response example: [
# {'command_details': {'command_number': 1,
# 'type_of_command': 'send_data',
@@ -425,4 +461,3 @@ if __name__ == '__main__':
g_ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip, opts.smpp_system_id, opts.smpp_password)
g_ms.connect_to_card(tp)
reactor.run()

107
smpp_ota_apdu.py Executable file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
import logging
import sys
from pprint import pprint as pp
from pySim.ota import OtaKeyset, OtaDialectSms
from pySim.utils import b2h, h2b
import smpplib.gsm
import smpplib.client
import smpplib.consts
import argparse
logger = logging.getLogger(__name__)
# if you want to know what's happening
logging.basicConfig(level='DEBUG')
class Foo:
def smpp_rx_handler(self, pdu):
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
if pdu.short_message:
try:
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
except ValueError:
spi = self.spi.copy()
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
pp(dec)
return None
def __init__(self, kic, kid, idx, tar):
# Two parts, UCS2, SMS with UDH
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
# Print when obtain message_id
client.set_message_sent_handler(
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
#client.set_message_received_handler(
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
client.set_message_received_handler(self.smpp_rx_handler)
client.connect()
client.bind_transceiver(system_id='test', password='test')
self.client = client
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=idx, kic=h2b(kic),
algo_auth='triple_des_cbc2', kid_idx=idx, kid=h2b(kid))
self.ota_keyset.cntr = 0xdadb
self.tar = h2b(tar)
self.ota_dialect = OtaDialectSms()
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
def tx_sms_tpdu(self, tpdu: bytes):
self.client.send_message(
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure it is a byte string, not unicode:
source_addr='12',
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure thease two params are byte strings, not unicode:
destination_addr='23',
short_message=tpdu,
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
protocol_id=0x7f,
#registered_delivery=True,
)
def tx_c_apdu(self, apdu: bytes):
logger.info("C-APDU: %s" % b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
self.tx_sms_tpdu(tpdu)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--kic')
parser.add_argument('-d', '--kid')
parser.add_argument('-i', '--idx', type=int, default=1)
parser.add_argument('-t', '--tar', default='b00011')
parser.add_argument('apdu', default="", nargs='+')
args = parser.parse_args()
f = Foo(args.kic, args.kid, args.idx, args.tar)
print("initialized, sending APDU")
f.tx_c_apdu(h2b("".join(args.apdu)))
f.client.listen()