3 Commits

Author SHA1 Message Date
Daniel Willmann
3506940448 pySim-smpp2sim: Implement handle_send-/receivedata
Change-Id: Icec9265520a6ab20cb4764e9cab5cdab31841862
2026-01-23 00:38:17 +01:00
Daniel Willmann
637276472d Add script to send ota PDU via SMPP
Change-Id: Idda80d57c26a9b3a33766bd06fc1af54db56874c
2026-01-23 00:28:37 +01:00
Daniel Willmann
a105b55751 Add osmo-ras.py Server for RAM over HTTP
Change-Id: Ib8fdc8f00f0b5bcd3365eca49b611328343a7edb
2026-01-23 00:27:20 +01:00
11 changed files with 217 additions and 57 deletions

View File

@@ -329,7 +329,7 @@ def do_info(pes: ProfileElementSequence, opts):
print("Security domain Instance AID: %s" % b2h(sd.decoded['instance']['instanceAID']))
# FIXME: 'applicationSpecificParametersC9' parsing to figure out enabled SCP
for key in sd.keys:
print("\t%s" % repr(key))
print("\tKVN=0x%02x, KID=0x%02x, %s" % (key.key_version_number, key.key_identifier, key.key_components))
# RFM
print()

46
osmo-ras.py Executable file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python3
# Remote Application Server for Remote Application Management over HTTP
# See Amendment B of the GlobalPlatform Card Specification v2.2
#
# (C) 2025 sysmocom s.f.m.c.
# Author: Daniel Willmann <dwillmann@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from http.server import HTTPServer, SimpleHTTPRequestHandler
from ssl import PROTOCOL_TLS_SERVER, SSLContext, TLSVersion
context = SSLContext(PROTOCOL_TLS_SERVER)
context.maximum_version = TLSVersion.TLSv1_2
CIPHERS_1_0 = "TLS_PSK_WITH_3DES_EDE_CBC_SHA,TLS_PSK_WITH_AES_128_CBC_SHA,TLS_PSK_WITH_NULL_SHA"
CIPHERS_1_2 = "TLS_PSK_WITH_AES_128_CBC_SHA256,TLS_PSK_WITH_NULL_SHA256"
context.set_ciphers(CIPHERS_1_2)
# A table using the identity of the client:
psk_table = { 'ClientId_1': bytes.fromhex('c0ffee'),
'ClientId_2': bytes.fromhex('facade')
}
def get_psk(ident):
""" Get the PSK for the client """
print(f"Get PSK for {ident}")
return psk_table.get(ident, b'')
context.set_psk_server_callback(get_psk)
server = HTTPServer(("0.0.0.0", 8080), SimpleHTTPRequestHandler)
server.socket = context.wrap_socket(server.socket, server_side=True)
server.serve_forever()

View File

@@ -618,7 +618,7 @@ class SmDppHttpServer:
# Verify EID is within permitted range of EUM certificate
if not validate_eid_range(ss.eid, eum_cert):
logger.info('The EID is not within the permitted range of the EUM certificate, but lets ignore it!')
raise ApiError('8.1.4', '6.1', 'EID is not within the permitted range of the EUM certificate')
# Verify that the serverChallenge attached to the ongoing RSP session matches the
# serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -

View File

@@ -53,7 +53,7 @@ from pySim.cards import UiccCardBase
from pySim.exceptions import *
from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload, BearerDescription
from pySim.cat import DeviceIdentities, Address, OtherAddress, UiccTransportLevel, BufferSize
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength, EventDownload, EventList
from pySim.utils import b2h, h2b
logger = logging.getLogger(__name__)
@@ -71,24 +71,46 @@ class MyApduTracer(ApduTracer):
print("-> %s %s" % (cmd[:10], cmd[10:]))
print("<- %s: %s" % (sw, resp))
class TcpProtocol(protocol.Protocol):
def dataReceived(self, data):
pass
def connectionLost(self, reason):
pass
def tcp_connected_callback(p: protocol.Protocol):
"""called by twisted TCP client."""
logger.error("%s: connected!" % p)
for data in p.pending_tx:
p.transport.write(data)
class ProactChannel:
"""Representation of a single protective channel."""
class ProactChannel(protocol.Protocol):
"""Representation of a single proective channel."""
def __init__(self, channels: 'ProactChannels', chan_nr: int):
self.channels = channels
self.chan_nr = chan_nr
self.ep = None
self.pending_tx = []
self.pending_rx = bytearray()
def write(self, data: bytes):
if self.connected:
self.transport.write(data)
else:
self.pending_tx.append(data)
def dataReceived(self, data: bytes):
logger.error(f"Got data (len={len(data)}): {data}")
self.pending_rx.extend(data)
# Send ENVELOPE with EventDownload Data available
event_list_ie = EventList(decoded=[ EventList.Event.data_available])
channel_status_ie = ChannelStatus(decoded='8100')
channel_data_len_ie = ChannelDataLength(decoded=min(255,len(self.pending_rx)))
dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
event_dl = EventDownload(children=[event_list_ie, dev_ids, channel_status_ie, channel_data_len_ie])
# 3) send to the card
envelope_hex = b2h(event_dl.to_tlv())
logger.info("ENVELOPE Event: %s" % envelope_hex)
global g_ms
(data, sw) = g_ms.scc.envelope(envelope_hex)
logger.info("SW %s: %s" % (sw, data))
# FIXME: Handle result?!
def connectionLost(self, reason):
logger.error("connection lost: %s" % reason)
def close(self):
"""Close the channel."""
@@ -174,14 +196,13 @@ class Proact(ProactiveHandler):
raise ValueError('Unsupported protocol_type')
if other_addr_ie.decoded.get('type_of_address', None) != 'ipv4':
raise ValueError('Unsupported type_of_address')
ipv4_bytes = h2b(other_addr_ie.decoded['address'])
ipv4_bytes = other_addr_ie.decoded['address']
ipv4_str = '%u.%u.%u.%u' % (ipv4_bytes[0], ipv4_bytes[1], ipv4_bytes[2], ipv4_bytes[3])
port_nr = transp_lvl_ie.decoded['port_number']
print("%s:%u" % (ipv4_str, port_nr))
logger.error("OpenChannel opening with %s:%u" % (ipv4_str, port_nr))
channel = self.channels.channel_create()
channel.ep = endpoints.TCP4ClientEndpoint(reactor, ipv4_str, port_nr)
channel.prot = TcpProtocol()
d = endpoints.connectProtocol(channel.ep, channel.prot)
d = endpoints.connectProtocol(channel.ep, channel)
# FIXME: why is this never called despite the client showing the inbound connection?
d.addCallback(tcp_connected_callback)
@@ -213,6 +234,17 @@ class Proact(ProactiveHandler):
# ]}
logger.info("ReceiveData")
logger.info(pcmd)
dev_id_ie = Proact._find_first_element_of_type(pcmd.children, DeviceIdentities)
chan_data_len_ie = Proact._find_first_element_of_type(pcmd.children, ChannelDataLength)
len_requested = chan_data_len_ie.decoded
chan_str = dev_id_ie.decoded['dest_dev_id']
chan_nr = 1 # FIXME
chan = self.channels.channels.get(chan_nr, None)
requested = chan.pending_rx[:len_requested]
chan.pending_rx = chan.pending_rx[len_requested:]
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
# Terminal Response example: [
# {'command_details': {'command_number': 1,
# 'type_of_command': 'receive_data',
@@ -222,7 +254,8 @@ class Proact(ProactiveHandler):
# {'channel_data': '16030100040e000000'},
# {'channel_data_length': 0}
# ]
return self.prepare_response(pcmd) + []
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
return resp
def handle_SendData(self, pcmd: ProactiveCommand):
"""Send/write data received from the SIM to the socket."""
@@ -240,7 +273,10 @@ class Proact(ProactiveHandler):
chan_str = dev_id_ie.decoded['dest_dev_id']
chan_nr = 1 # FIXME
chan = self.channels.channels.get(chan_nr, None)
# FIXME chan.prot.transport.write(h2b(chan_data_ie.decoded))
# FIXME
logger.error(f"Chan data received: {chan_data_ie.decoded}")
chan.write(chan_data_ie.decoded)
#chan.write(h2b(chan_data_ie.decoded))
# Terminal Response example: [
# {'command_details': {'command_number': 1,
# 'type_of_command': 'send_data',
@@ -425,4 +461,3 @@ if __name__ == '__main__':
g_ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip, opts.smpp_system_id, opts.smpp_password)
g_ms.connect_to_card(tp)
reactor.run()

View File

@@ -144,9 +144,6 @@ class File:
def file_size(self) -> Optional[int]:
"""Return the size of the file in bytes."""
if self.file_type in ['LF', 'CY']:
if self._file_size and self.nb_rec is None and self.rec_len:
self.nb_rec = self._file_size // self.rec_len
return self.nb_rec * self.rec_len
elif self.file_type in ['TR', 'BT']:
return self._file_size
@@ -294,13 +291,6 @@ class File:
dfName = fileDescriptor.get('dfName', None)
if dfName:
self.df_name = dfName
dfName = fileDescriptor.get('dfName', None)
if dfName:
self.df_name = dfName
efFileSize = fileDescriptor.get('efFileSize', None)
if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
pefi = fileDescriptor.get('proprietaryEFInfo', {})
securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None)
if securityAttributesReferenced:
@@ -310,11 +300,13 @@ class File:
fdb_dec = fd_dec['file_descriptor_byte']
self.shareable = fdb_dec['shareable']
if fdb_dec['file_type'] == 'working_ef':
efFileSize = fileDescriptor.get('efFileSize', None)
if fd_dec['num_of_rec']:
self.nb_rec = fd_dec['num_of_rec']
if fd_dec['record_len']:
self.rec_len = fd_dec['record_len']
if efFileSize:
self._file_size = self._decode_file_size(efFileSize)
if self.rec_len and self.nb_rec == None:
# compute the number of records from file size and record length
self.nb_rec = self._file_size // self.rec_len
@@ -360,9 +352,6 @@ class File:
fd = self.get_tuplelist_item(l, 'fileDescriptor')
if not fd and not self._template_derived:
raise ValueError("%s: No fileDescriptor found in tuple, and none set by template before" % self)
if self.name == "EF.SUCI_Calc_Info":
breakpoint()
#breakpoint()
if fd:
self.from_fileDescriptor(dict(fd))
# BODY
@@ -397,16 +386,6 @@ class File:
stream = io.BytesIO()
# Providing file content within "fillFileContent" / "fillFileOffset" shall have the same effect as
# creating a file with a fill/repeat pattern and thereafter updating the content via Update.
# Step 0: determine file size
#file_size = self._file_size
#for k, v in l:
# if k != 'fileDescriptor':
# continue
# file_desc = v
# if 'efFileSize' in file_desc:
# file_size = self._decode_file_size(file_desc['efFileSize'])
# Step 1: Fill with pattern from Fcp or Template
if self.fill_pattern:
stream.write(self.expand_fill_pattern())
@@ -1006,9 +985,9 @@ class SecurityDomainKey:
self.key_components = key_components
def __repr__(self) -> str:
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=0x%x, Comp=%s)' % (self.key_version_number,
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=%s, Comp=%s)' % (self.key_version_number,
self.key_identifier,
build_construct(KeyUsageQualifier, self.key_usage_qualifier)[0],
self.key_usage_qualifier,
repr(self.key_components))
@classmethod

View File

@@ -673,7 +673,7 @@ class FilesUsimDf5GS(ProfileTemplate):
FileTemplate(0x4f06, 'EF.UAC_AIC', 'TR', None, 4, 2, 0x06, None, True, ass_serv=[126]),
FileTemplate(0x4f07, 'EF.SUCI_Calc_Info', 'TR', None, None, 2, 0x07, 'FF...FF', False, ass_serv=[124]),
FileTemplate(0x4f08, 'EF.OPL5G', 'LF', None, 10, 10, 0x08, 'FF...FF', False, ['nb_rec'], ass_serv=[129]),
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130], pe_name='ef-supinai'),
FileTemplate(0x4f09, 'EF.SUPI_NAI', 'TR', None, None, 2, 0x09, None, True, ['size'], ass_serv=[130]),
FileTemplate(0x4f0a, 'EF.Routing_Indicator', 'TR', None, 4, 2, 0x0a, 'F0FFFFFF', False, ass_serv=[124]),
]
@@ -818,7 +818,7 @@ class FilesIsimOptional(ProfileTemplate):
base_path = Path('ADF.ISIM')
extends = FilesIsimMandatory
files = [
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5], pe_name='ef-pcscf'),
FileTemplate(0x6f09, 'EF.P-CSCF', 'LF', 1, None, 2, None, None, True, ['size'], ass_serv=[1,5]),
FileTemplate(0x6f3c, 'EF.SMS', 'LF', 10, 176, 5, None, '00FF...FF', False, ass_serv=[6,8]),
FileTemplate(0x6f42, 'EF.SMSP', 'LF', 1, 38, 5, None, 'FF...FF', False, ass_serv=[8]),
FileTemplate(0x6f43, 'EF.SMSS', 'TR', None, 2, 5, None, 'FFFF', False, ass_serv=[6,8]),

View File

@@ -477,15 +477,11 @@ class RuntimeLchan:
def get_file_for_filename(self, name: str):
"""Get the related CardFile object for a specified filename."""
if is_hex(name):
name = name.lower()
sels = self.selected_file.get_selectables()
return sels[name]
def activate_file(self, name: str):
"""Request ACTIVATE FILE of specified file."""
if is_hex(name):
name = name.lower()
sels = self.selected_file.get_selectables()
f = sels[name]
data, sw = self.scc.activate_file(f.fid)

View File

@@ -750,7 +750,7 @@ class EF_ARR(LinFixedEF):
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
def do_read_arr_record(self, opts):
"""Read one EF.ARR record in flattened, human-friendly form."""
(data, _sw) = self._cmd.lchan.read_record_dec(opts.RECORD_NR)
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
data = self._cmd.lchan.selected_file.flatten(data)
self._cmd.poutput_json(data, opts.oneline)

View File

@@ -389,10 +389,7 @@ class EF_SUCI_Calc_Info(TransparentEF):
# remaining data holds Home Network Public Key Data Object
hpkl = EF_SUCI_Calc_Info.HnetPubkeyList()
hpkl.from_tlv(in_bytes[pos:])
hnet_pubkey_list = []
if hpkl.to_dict()['hnet_pubkey_list']:
hnet_pubkey_list = self._compact_pubkey_list(hpkl.to_dict()['hnet_pubkey_list'])
hnet_pubkey_list = self._compact_pubkey_list(hpkl.to_dict()['hnet_pubkey_list'])
return {
'prot_scheme_id_list': prot_scheme_id_list,

View File

@@ -21,7 +21,7 @@ setup(
"pyscard",
"pyserial",
"pytlv",
"cmd2 >= 1.5.0, < 3.0",
"cmd2 >= 1.5.0",
"jsonpath-ng",
"construct >= 2.10.70",
"bidict",

107
smpp_ota_apdu.py Executable file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
import logging
import sys
from pprint import pprint as pp
from pySim.ota import OtaKeyset, OtaDialectSms
from pySim.utils import b2h, h2b
import smpplib.gsm
import smpplib.client
import smpplib.consts
import argparse
logger = logging.getLogger(__name__)
# if you want to know what's happening
logging.basicConfig(level='DEBUG')
class Foo:
def smpp_rx_handler(self, pdu):
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
if pdu.short_message:
try:
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
except ValueError:
spi = self.spi.copy()
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
pp(dec)
return None
def __init__(self, kic, kid, idx, tar):
# Two parts, UCS2, SMS with UDH
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
# Print when obtain message_id
client.set_message_sent_handler(
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
#client.set_message_received_handler(
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
client.set_message_received_handler(self.smpp_rx_handler)
client.connect()
client.bind_transceiver(system_id='test', password='test')
self.client = client
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=idx, kic=h2b(kic),
algo_auth='triple_des_cbc2', kid_idx=idx, kid=h2b(kid))
self.ota_keyset.cntr = 0xdadb
self.tar = h2b(tar)
self.ota_dialect = OtaDialectSms()
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
def tx_sms_tpdu(self, tpdu: bytes):
self.client.send_message(
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure it is a byte string, not unicode:
source_addr='12',
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure thease two params are byte strings, not unicode:
destination_addr='23',
short_message=tpdu,
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
protocol_id=0x7f,
#registered_delivery=True,
)
def tx_c_apdu(self, apdu: bytes):
logger.info("C-APDU: %s" % b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
self.tx_sms_tpdu(tpdu)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--kic')
parser.add_argument('-d', '--kid')
parser.add_argument('-i', '--idx', type=int, default=1)
parser.add_argument('-t', '--tar', default='b00011')
parser.add_argument('apdu', default="", nargs='+')
args = parser.parse_args()
f = Foo(args.kic, args.kid, args.idx, args.tar)
print("initialized, sending APDU")
f.tx_c_apdu(h2b("".join(args.apdu)))
f.client.listen()