diff --git a/contrib/jenkins.sh b/contrib/jenkins.sh index de1d9f88..b95e80ad 100755 --- a/contrib/jenkins.sh +++ b/contrib/jenkins.sh @@ -42,6 +42,9 @@ case "$JOB_TYPE" in # Run pySim-shell integration tests (requires physical cards) python3 -m unittest discover -v -s ./tests/pySim-shell_test/ + + # Run pySim-smpp2sim test + tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh ;; "distcheck") virtualenv -p python3 venv --system-site-packages diff --git a/contrib/smpp-ota-tool.py b/contrib/smpp-ota-tool.py new file mode 100755 index 00000000..3dc21850 --- /dev/null +++ b/contrib/smpp-ota-tool.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 + +# (C) 2026 by sysmocom - s.f.m.c. GmbH +# All Rights Reserved +# +# Author: Harald Welte, Philipp Maier +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import argparse +import logging +import smpplib.gsm +import smpplib.client +import smpplib.consts +import time +from pySim.ota import OtaKeyset, OtaDialectSms, OtaAlgoCrypt, OtaAlgoAuth, CNTR_REQ, RC_CC_DS, POR_REQ +from pySim.utils import b2h, h2b, is_hexstr +from pathlib import Path + +logger = logging.getLogger(Path(__file__).stem) + +class SmppHandler: + client = None + + def __init__(self, host: str, port: int, + system_id: str, password: str, + ota_keyset: OtaKeyset, spi: dict, tar: bytes): + """ + Initialize connection to SMPP server and set static OTA SMS-TPDU ciphering parameters + Args: + host : Hostname or IPv4/IPv6 address of the SMPP server + port : TCP Port of the SMPP server + system_id: SMPP System-ID used by ESME (client) to bind + password: SMPP Password used by ESME (client) to bind + ota_keyset: OTA keyset to be used for SMS-TPDU ciphering + spi: Security Parameter Indicator (SPI) to be used for SMS-TPDU ciphering + tar: Toolkit Application Reference (TAR) of the targeted card application + """ + + # Create and connect SMPP client + client = smpplib.client.Client(host, port, allow_unknown_opt_params=True) + client.set_message_sent_handler(self.message_sent_handler) + client.set_message_received_handler(self.message_received_handler) + client.connect() + client.bind_transceiver(system_id=system_id, password=password) + self.client = client + + # Setup static OTA parameters + self.ota_dialect = OtaDialectSms() + self.ota_keyset = ota_keyset + self.tar = tar + self.spi = spi + + def __del__(self): + if self.client: + self.client.unbind() + self.client.disconnect() + + def message_received_handler(self, pdu): + if pdu.short_message: + logger.info("SMS-TPDU received: %s", b2h(pdu.short_message)) + try: + dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message) + except ValueError: + # Retry to decoding with ciphering disabled (in case the card has problems to decode the SMS-TDPU + # we have sent, the response will contain an unencrypted error message) + spi = self.spi.copy() + spi['por_shall_be_ciphered'] = False + spi['por_rc_cc_ds'] = 'no_rc_cc_ds' + dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message) + logger.info("SMS-TPDU decoded: %s", dec) + self.response = dec + return None + + def message_sent_handler(self, pdu): + logger.debug("SMS-TPDU sent: pdu_sequence=%s pdu_message_id=%s", pdu.sequence, pdu.message_id) + + def transceive_sms_tpdu(self, tpdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple: + """ + Transceive SMS-TPDU. This method sends the SMS-TPDU to the SMPP server, and waits for a response. The method + returns when the response is received. + + Args: + tpdu : short message content (plaintext) + src_addr : short message source address + dest_addr : short message destination address + timeout : timeout after which this method should give up waiting for a response + Returns: + tuple containing the response (plaintext) + """ + + logger.info("SMS-TPDU sending: %s...", b2h(tpdu)) + + self.client.send_message( + # TODO: add parameters to switch source_addr_ton and dest_addr_ton between SMPP_TON_INTL and SMPP_NPI_ISDN + source_addr_ton=smpplib.consts.SMPP_TON_INTL, + source_addr=src_addr, + dest_addr_ton=smpplib.consts.SMPP_TON_INTL, + destination_addr=dest_addr, + short_message=tpdu, + # TODO: add parameters to set data_coding and esm_class + data_coding=smpplib.consts.SMPP_ENCODING_BINARY, + esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI, + protocol_id=0x7f, + # TODO: add parameter to use registered delivery + # registered_delivery=True, + ) + + logger.info("SMS-TPDU sent, waiting for response...") + timestamp_sent=int(time.time()) + self.response = None + while self.response is None: + self.client.poll() + if int(time.time()) - timestamp_sent > timeout: + raise ValueError("Timeout reached, no response SMS-TPDU received!") + return self.response + + def transceive_apdu(self, apdu: bytes, src_addr: str, dest_addr: str, timeout: int) -> tuple[bytes, bytes]: + """ + Transceive APDU. This method wraps the given APDU into an SMS-TPDU, sends it to the SMPP server and waits for + the response. When the response is received, the last response data and the last status word is extracted from + the response and returned to the caller. + + Args: + apdu : one or more concatenated APDUs + src_addr : short message source address + dest_addr : short message destination address + timeout : timeout after which this method should give up waiting for a response + Returns: + tuple containing the last response data and the last status word as byte strings + """ + + logger.info("C-APDU sending: %s..." % b2h(apdu)) + + # translate to Secured OTA RFM + secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu) + # add user data header + tpdu = b'\x02\x70\x00' + secured + # send via SMPP + response = self.transceive_sms_tpdu(tpdu, src_addr, dest_addr, timeout) + + # Extract last_response_data and last_status_word from the response + sw = None + resp = None + for container in response: + if container: + container_dict = dict(container) + resp = container_dict.get('last_response_data') + sw = container_dict.get('last_status_word') + if resp is None: + raise ValueError("Response does not contain any last_response_data, no R-APDU received!") + if sw is None: + raise ValueError("Response does not contain any last_status_word, no R-APDU received!") + + logger.info("R-APDU received: %s %s", resp, sw) + return h2b(resp), h2b(sw) + +if __name__ == '__main__': + option_parser = argparse.ArgumentParser(description='CSV importer for pySim-shell\'s PostgreSQL Card Key Provider', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + option_parser.add_argument("--host", help="Host/IP of the SMPP server", default="localhost") + option_parser.add_argument("--port", help="TCP port of the SMPP server", default=2775, type=int) + option_parser.add_argument("--system-id", help="System ID to use to bind to the SMPP server", default="test") + option_parser.add_argument("--password", help="Password to use to bind to the SMPP server", default="test") + option_parser.add_argument("--verbose", help="Enable verbose logging", action='store_true', default=False) + algo_crypt_choices = [] + algo_crypt_classes = OtaAlgoCrypt.__subclasses__() + for cls in algo_crypt_classes: + algo_crypt_choices.append(cls.enum_name) + option_parser.add_argument("--algo-crypt", choices=algo_crypt_choices, default='triple_des_cbc2', + help="OTA crypt algorithm") + algo_auth_choices = [] + algo_auth_classes = OtaAlgoAuth.__subclasses__() + for cls in algo_auth_classes: + algo_auth_choices.append(cls.enum_name) + option_parser.add_argument("--algo-auth", choices=algo_auth_choices, default='triple_des_cbc2', + help="OTA auth algorithm") + option_parser.add_argument('--kic', required=True, type=is_hexstr, help='OTA key (KIC)') + option_parser.add_argument('--kic_idx', default=1, type=int, help='OTA key index (KIC)') + option_parser.add_argument('--kid', required=True, type=is_hexstr, help='OTA key (KID)') + option_parser.add_argument('--kid_idx', default=1, type=int, help='OTA key index (KID)') + option_parser.add_argument('--cntr', default=0, type=int, help='replay protection counter') + option_parser.add_argument('--tar', required=True, type=is_hexstr, help='Toolkit Application Reference') + option_parser.add_argument("--cntr_req", choices=CNTR_REQ.decmapping.values(), default='no_counter', + help="Counter requirement") + option_parser.add_argument('--ciphering', default=True, type=bool, help='Enable ciphering') + option_parser.add_argument("--rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc', + help="message check (rc=redundency check, cc=crypt. checksum, ds=digital signature)") + option_parser.add_argument('--por-in-submit', default=False, type=bool, + help='require PoR to be sent via SMS-SUBMIT') + option_parser.add_argument('--por-shall-be-ciphered', default=True, type=bool, help='require encrypted PoR') + option_parser.add_argument("--por-rc-cc-ds", choices=RC_CC_DS.decmapping.values(), default='cc', + help="PoR check (rc=redundency check, cc=crypt. checksum, ds=digital signature)") + option_parser.add_argument("--por_req", choices=POR_REQ.decmapping.values(), default='por_required', + help="Proof of Receipt requirements") + option_parser.add_argument('--src-addr', default='12', type=str, help='TODO') + option_parser.add_argument('--dest-addr', default='23', type=str, help='TODO') + option_parser.add_argument('--timeout', default=10, type=int, help='TODO') + option_parser.add_argument('-a', '--apdu', action='append', required=True, type=is_hexstr, help='C-APDU to send') + opts = option_parser.parse_args() + + logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + ota_keyset = OtaKeyset(algo_crypt=opts.algo_crypt, + kic_idx=opts.kic_idx, + kic=h2b(opts.kic), + algo_auth=opts.algo_auth, + kid_idx=opts.kic_idx, + kid=h2b(opts.kid), + cntr=opts.cntr) + spi = {'counter' : opts.cntr_req, + 'ciphering' : opts.ciphering, + 'rc_cc_ds': opts.rc_cc_ds, + 'por_in_submit':opts.por_in_submit, + 'por_shall_be_ciphered':opts.por_shall_be_ciphered, + 'por_rc_cc_ds': opts.por_rc_cc_ds, + 'por': opts.por_req} + apdu = h2b("".join(opts.apdu)) + + smpp_handler = SmppHandler(opts.host, opts.port, opts.system_id, opts.password, ota_keyset, spi, h2b(opts.tar)) + resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr, opts.dest_addr, opts.timeout) + print("%s %s" % (b2h(resp), b2h(sw))) diff --git a/pySim/ota.py b/pySim/ota.py index a6b563df..fbb9d451 100644 --- a/pySim/ota.py +++ b/pySim/ota.py @@ -57,12 +57,13 @@ CompactRemoteResp = Struct('number_of_commands'/Int8ub, 'last_response_data'/HexAdapter(GreedyBytes)) RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3) +CNTR_REQ = Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, counter_must_be_higher=2, counter_must_be_lower=3) +POR_REQ = Enum(BitsInteger(2), no_por=0, por_required=1, por_only_when_error=2) # TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2 SPI = BitStruct( # first octet Padding(3), - 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, - counter_must_be_higher=2, counter_must_be_lower=3), + 'counter'/CNTR_REQ, 'ciphering'/Flag, 'rc_cc_ds'/RC_CC_DS, # second octet @@ -70,8 +71,7 @@ SPI = BitStruct( # first octet 'por_in_submit'/Flag, 'por_shall_be_ciphered'/Flag, 'por_rc_cc_ds'/RC_CC_DS, - 'por'/Enum(BitsInteger(2), no_por=0, - por_required=1, por_only_when_error=2) + 'por'/POR_REQ ) # TS 102 225 Section 5.1.2 diff --git a/requirements.txt b/requirements.txt index 2ffd9998..4ceec452 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ git+https://github.com/osmocom/asn1tools packaging git+https://github.com/hologram-io/smpp.pdu smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted +smpplib diff --git a/setup.py b/setup.py index dffff5b5..be811225 100644 --- a/setup.py +++ b/setup.py @@ -55,6 +55,7 @@ setup( "service-identity", "pyopenssl", "requests", + "smpplib", ], "CardKeyProviderPgsql": [ "psycopg2-binary", diff --git a/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.cfg b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.cfg new file mode 100644 index 00000000..1c2a9536 --- /dev/null +++ b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.cfg @@ -0,0 +1,9 @@ +# Card parameter: +ICCID="8949440000001155314" +KIC='51D4FC44BCBA7C4589DFADA3297720AF' +KID='0449699C472CE71E2FB7B56245EF7684' + +# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response +TAR='B00010' +APDU='A0A40000027F20A0C0000016' +EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000' \ No newline at end of file diff --git a/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh new file mode 100755 index 00000000..2ca0a9e4 --- /dev/null +++ b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh @@ -0,0 +1,156 @@ +#!/bin/bash + +# Utility to verify the functionality of pySim-trace.py +# +# (C) 2026 by sysmocom - s.f.m.c. GmbH +# All Rights Reserved +# +# Author: Philipp Maier +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +PYSIM_SMPP2SIM=./pySim-smpp2sim.py +PYSIM_SMPP2SIM_LOG=./pySim-smpp2sim.log +PYSIM_SMPP2SIM_PORT=2775 +PYSIM_SMPP2SIM_TIMEOUT=10 +PYSIM_SMPPOTATOOL=./contrib/smpp-ota-tool.py +PYSIM_SMPPOTATOOL_LOG=./smpp-ota-tool.log +PYSIM_SHELL=./pySim-shell.py + +function dump_logs { + echo "" + echo "$PYSIM_SMPPOTATOOL_LOG" + echo "------------8<------------" + cat $PYSIM_SMPPOTATOOL_LOG + echo "------------8<------------" + echo "" + echo "$PYSIM_SMPP2SIM_LOG" + echo "------------8<------------" + cat $PYSIM_SMPP2SIM_LOG + echo "------------8<------------" +} + +function send_test_request { + echo "" + echo "Sending request to SMPP server:" + TAR=$1 + C_APDU=$2 + R_APDU_EXPECTED=$3 + + echo "Sending: $C_APDU" + COMMANDLINE="$PYSIM_SMPPOTATOOL --verbose --port $PYSIM_SMPP2SIM_PORT --kic $KIC --kid $KID --tar $TAR --apdu $C_APDU" + echo "Commandline: $COMMANDLINE" + R_APDU=`$COMMANDLINE 2> $PYSIM_SMPPOTATOOL_LOG` + if [ $? -ne 0 ]; then + echo "Unable to send request! -- failed!" + dump_logs + exit 1 + fi + + echo "Got response from SMPP server:" + echo "Sent: $C_APDU" + echo "Received: $R_APDU" + echo "Expected: $R_APDU_EXPECTED" + if [ "$R_APDU" != "$R_APDU_EXPECTED" ]; then + echo "Response does not match the expected response! -- failed!" + dump_logs + exit 1 + fi + echo "Response matches the expected response -- success!" + echo "" +} + +function start_smpp_server { + PCSC_READER=$1 + + # Start the SMPP server + echo "" + echo "Starting SMPP server:" + + COMMANDLINE="$PYSIM_SMPP2SIM -p $PCSC_READER --smpp-bind-port $PYSIM_SMPP2SIM_PORT --apdu-trace" + echo "Commandline: $COMMANDLINE" + $COMMANDLINE > $PYSIM_SMPP2SIM_LOG 2>&1 & + PYSIM_SMPP2SIM_PID=$! + trap 'kill $PYSIM_SMPP2SIM_PID' EXIT + echo "SMPP server started (PID=$PYSIM_SMPP2SIM_PID)" + + # Wait until the SMPP server is reachable + RC=1 + RETRY_COUNT=0 + while [ $RC -ne 0 ]; do + nc -z localhost $PYSIM_SMPP2SIM_PORT + RC=$? + ((RETRY_COUNT++)) + if [ $RETRY_COUNT -gt $PYSIM_SMPP2SIM_TIMEOUT ]; then + echo "SMPP server not reachable (port=$PYSIM_SMPP2SIM_PORT) -- abort" + dump_logs + exit 1 + fi + sleep 1 + done + echo "SMPP server reachable (port=$PYSIM_SMPP2SIM_PORT)" +} + +function find_card_by_iccid { + # Find reader number of the card + ICCID=$1 + + echo "" + echo "Searching for card:" + echo "ICCID: \"$ICCID\"" + + if [ -z "$ICCID" ]; then + echo "invalid ICCID, zero length ICCID is not allowed! -- abort" + exit 1 + fi + + PCSC_READER_COUNT=`pcsc_scan -rn | wc -l` + for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do + echo "probing card in reader $PCSC_READER ..." + EF_ICCID_DECODED=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e 'select EF.ICCID' -e 'read_binary_decoded --oneline' 2> /dev/null | tail -1` + echo $EF_ICCID_DECODED | grep $ICCID > /dev/null + if [ $? -eq 0 ]; then + echo "Found card in reader $PCSC_READER" + return $PCSC_READER + fi + done + + echo "Card with ICCID \"$ICCID\" not found -- abort" + exit 1 +} + +export PYTHONPATH=./ + +echo "pySim-smpp2sim_test - a test program to test pySim-smpp2sim.py" +echo "==============================================================" + +# TODO: At the moment we can only have one card and one testcase. This is +# sufficient for now. We can extend this later as needed. + +# Read test parameters from config from file +TEST_CONFIG_FILE=${0%.*}.cfg +echo "using config file: $TEST_CONFIG_FILE" +if ! [ -e "$TEST_CONFIG_FILE" ]; then + echo "test configuration file does not exist! -- abort" + exit 1 +fi +. $TEST_CONFIG_FILE + +# Execute testcase +find_card_by_iccid $ICCID +start_smpp_server $? +send_test_request $TAR $APDU "$EXPECTED_RESPONSE" + + +