15 Commits

Author SHA1 Message Date
Vadim Yanitskiy
8f38800643 pySim-shell.py: make it work with cmd2 >= v2.4.0
In v2.3.0 both cmd2.{fg,bg} have been deprecated in favour of cmd2.{Fg,Bg}.
In v2.4.0 both cmd2.{fg,bg} have been removed.

See https://github.com/python-cmd2/cmd2/blob/master/CHANGELOG.md

Change-Id: I7ca95e85fc45ba66fd9ba6bea1fec2bae0e892c0
2022-09-05 23:27:43 +07:00
Vadim Yanitskiy
d5c1bec869 pySim-shell.py: make it work with cmd2 >= v2.0.0
* Argument 'use_ipython' was renamed to 'use_ipython'.
* Class 'Settable' requires the reference to the object that holds
  the settable attribute.

See https://github.com/python-cmd2/cmd2/releases/tag/2.0.0.

Change-Id: Ia38f0ca5c3f41395f8fe850adae37f5af4e3fe19
2022-09-05 23:27:43 +07:00
Vadim Yanitskiy
7d05e49f11 README.md: update installation instructions for Debian
Change-Id: Icefa33570a34960a4fff145f3c1b6585d867605c
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
98ea2a0f7a README.md: update git URLs (git -> https; gitea)
Change-Id: Ia86979f656557e442b0f432b0646aa7661c293e9
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
0a8d27ad7a README.md: list recent dependencies from requirements.txt
Change-Id: Ia486dbc7f630c1404e51728b5353cf5a0d643415
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
9550a0a45b README.md: fix module name: s/serial/pyserial/
Change-Id: I5fd308fb161cd5bd5f702845691296877e523248
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
b5eaf14991 README.md,requirements.txt: add missing construct version info
Change-Id: I90da0df431f0d7dbfa4aa428366fbf0e35db388f
Related: OS#5666
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
bdac3f61be Bump minimum required construct version to v2.9.51
With this version I can get all unittests passing:

  python -m unittest discover tests/

We're passing argument 'path' to stream_read_entire(), which was
added in [1] and become available since v2.9.51.

Change-Id: I4223c83570d333ad8d79bc2aa2d8bcc580156cff
Related: [1] bfe71315b027e18e62f00ec4de75043992fd2316 construct.git
Related: OS#5666
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
05d30eb666 construct: use Python's API for int<->bytes conversion
Argument 'signed' was added in [1] and become available since v2.10.63.
Therefore using bytes2integer() and integer2bytes() from construct.core
bumps the minimum required version of construct to v2.10.63.  For
instance, debian:bullseye currently ships v2.10.58.

There is no strict requirement to use construct's API, so let's use
Python's API instead.  This allows using older construct versions
from the v2.9.xx family.

Change-Id: I613dbfebe993f9c19003635371941710fc1b1236
Related: [1] 660ddbe2d9a351731ad7976351adbf413809a715 construct.git
Related: OS#5666
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
7800f9d356 contrib/jenkins.sh: install dependencies from requirements.txt
Change-Id: I99af496e9a3758ea624ca484f4fbc51b262ffaf4
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
7ce04a5a29 contrib/jenkins.sh: execute this script with -x and -e
-x  Print commands and their arguments as they are executed
  -e  Exit immediately if a command exits with a non-zero status

Change-Id: I13af70ef770936bec00b050b6c4f988e53ee2833
2022-09-05 23:15:06 +07:00
Vadim Yanitskiy
b3ea021b32 contrib/jenkins.sh: speed up pylint by running multiple processes
Use multiple processes to speed up pylint.  Specifying -j0 will
auto-detect the number of processors available to use.

On AMD Ryzen 7 3700X this significantly reduces the exec time:

  $ time python -m pylint -j1 ... pySim *.py
  real    0m12.409s
  user    0m12.149s
  sys     0m0.136s

  $ time python -m pylint -j0 ... pySim *.py
  real    0m5.541s
  user    0m58.496s
  sys     0m1.213s

Change-Id: I76d1696c27ddcab358526f807c4a0a7f0d4c85d4
2022-08-30 17:15:53 +07:00
Vadim Yanitskiy
12175d3588 contrib/jenkins.sh: pylint v2.15 is unstable, pin v2.14.5
pylint v2.15 is crashing, let's fall-back to a known to work v2.14.5.

Change-Id: Ie29be6ec6631ff2b3d8cd6b2dd9ac0ed8f505e4f
Related: https://github.com/PyCQA/pylint/issues/7375
Related: OS#5668
2022-08-30 17:12:03 +07:00
Christian Amsüss
59f3b1154f proactive: Send a Terminal Response automatically after a Fetch
Change-Id: I43bc994e7517b5907fb40a98d84797c54056c47d
2022-08-21 11:54:33 +00:00
Christian Amsüss
98552ef1bd proactive: Avoid clobbering the output of the command that triggered the FETCH
Change-Id: I2b794a5c5bc808b9703b4bc679c119341a0ed41c
2022-08-21 11:54:00 +00:00
17 changed files with 49 additions and 2092 deletions

View File

@@ -23,10 +23,10 @@ Git Repository
You can clone from the official Osmocom git repository using
```
git clone git://git.osmocom.org/pysim.git
git clone https://gitea.osmocom.org/sim-card/pysim.git
```
There is a cgit interface at <https://git.osmocom.org/pysim>
There is a web interface at <https://gitea.osmocom.org/sim-card/pysim>.
Installation
@@ -35,18 +35,26 @@ Installation
Please install the following dependencies:
- pyscard
- serial
- pyserial
- pytlv
- cmd2 >= 1.3.0 but < 2.0.0
- jsonpath-ng
- construct
- construct >= 2.9.51
- bidict
- gsm0338
- pyyaml >= 5.1
- termcolor
- colorlog
Example for Debian:
```
apt-get install python3-pyscard python3-serial python3-pip python3-yaml python3-termcolor python3-colorlog
pip3 install -r requirements.txt
```sh
sudo apt-get install --no-install-recommends \
pcscd libpcsclite-dev \
python3 \
python3-setuptools \
python3-pyscard \
python3-pip
pip3 install --user -r requirements.txt
```
After installing all dependencies, the pySim applications ``pySim-read.py``, ``pySim-prog.py`` and ``pySim-shell.py`` may be started directly from the cloned repository.

View File

@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/sh -xe
# jenkins build helper script for pysim. This is how we build on jenkins.osmocom.org
#
# environment variables:
@@ -6,8 +6,6 @@
# * PUBLISH: upload manuals after building if set to "1" (ignored without WITH_MANUALS = "1")
#
set -e
if [ ! -d "./pysim-testdata/" ] ; then
echo "###############################################"
echo "Please call from pySim-prog top directory"
@@ -17,18 +15,7 @@ fi
virtualenv -p python3 venv --system-site-packages
. venv/bin/activate
pip install pytlv
pip install 'pyyaml>=5.1'
pip install cmd2==1.5
pip install jsonpath-ng
pip install construct
pip install bidict
pip install gsm0338
pip install termcolor
pip install colorlog
pip install pycryptodome
# we need this direct git install, as pypi only lists the python2.7 only release 0.3 from 2013 :(
pip install git+https://github.com/hologram-io/smpp.pdu
pip install -r requirements.txt
# Execute automatically discovered unit tests first
python -m unittest discover -v -s tests/
@@ -39,8 +26,8 @@ python -m unittest discover -v -s tests/
# Ignore E0401: import-error
# pySim/utils.py:276: E0401: Unable to import 'Crypto.Cipher' (import-error)
# pySim/utils.py:277: E0401: Unable to import 'Crypto.Util.strxor' (import-error)
pip install pylint
python -m pylint --errors-only \
pip install pylint==2.14.5 # FIXME: 2.15 is crashing, see OS#5668
python -m pylint -j0 --errors-only \
--disable E1102 \
--disable E0401 \
--enable W0301 \

View File

@@ -1,57 +0,0 @@
#!/usr/bin/python3
from pySim.ota import *
from pySim.sms import SMS_SUBMIT, SMS_DELIVER, AddressField
from pySim.utils import h2b, h2b
# KIC1 + KID1 of 8988211000000515398
#KIC1 = h2b('C039ED58F7B81446105E79EBFD373038')
#KID1 = h2b('1799B93FE53F430BD7FD4810C77E1FDF')
#KIC3 = h2b('167F2576D64C8D41862954875C8D7979')
#KID3 = h2b('ECAE122B0E6AE4186D6487D50FDC0922')
# KIC1 + KID1 of 8988211000000467285
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
KID3 = h2b('12110C78E678C25408233076AA033615')
od = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
print(od.crypt)
print(od.auth)
dialect = OtaDialectSms()
# RAM: B00000
# SIM RFM: B00010
# USIM RFM: B00011
tar = h2b('B00011')
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
outp = dialect.encode_cmd(od, tar, spi, apdu=b'\x00\xa4\x00\x04\x02\x3f\x00')
print("result: %s" % b2h(outp))
with_udh = b'\x02\x70\x00' + outp
print("with_udh: %s" % b2h(with_udh))
da = AddressField('12345678', 'unknown', 'isdn_e164')
#tpdu = SMS_SUBMIT(tp_udhi=True, tp_mr=0x23, tp_da=da, tp_pid=0x7F, tp_dcs=0xF6, tp_udl=3, tp_ud=with_udh)
tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=da, tp_pid=0x7F, tp_dcs=0xF6, tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
print(tpdu)
print("tpdu: %s" % b2h(tpdu.toBytes()))
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
dialect.decode_resp(od, spi, '027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c')
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':False, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
dialect.decode_resp(od, spi, '027100001612b000110000000000000055f47118381175fb01612f')
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':False, 'por_rc_cc_ds': 'no_rc_cc_ds', 'por': 'por_required'}
dialect.decode_resp(od, spi, '027100000e0ab000110000000000000001612f')

View File

@@ -23,7 +23,7 @@ import json
import traceback
import cmd2
from cmd2 import style, fg
from cmd2 import style, Fg
from cmd2 import CommandSet, with_default_category, with_argparser
import argparse
@@ -134,8 +134,8 @@ class PysimApp(cmd2.Cmd):
def __init__(self, card, rs, sl, ch, script=None):
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
use_ipython=True, auto_load_commands=False, startup_script=script)
self.intro = style('Welcome to pySim-shell!', fg=fg.red)
auto_load_commands=False, startup_script=script)
self.intro = style('Welcome to pySim-shell!', fg=Fg.RED)
self.default_category = 'pySim-shell built-in commands'
self.card = None
self.rs = None
@@ -145,16 +145,16 @@ class PysimApp(cmd2.Cmd):
self.ch = ch
self.numeric_path = False
self.add_settable(cmd2.Settable('numeric_path', bool, 'Print File IDs instead of names',
self.add_settable(cmd2.Settable('numeric_path', bool, 'Print File IDs instead of names', self,
onchange_cb=self._onchange_numeric_path))
self.conserve_write = True
self.add_settable(cmd2.Settable('conserve_write', bool, 'Read and compare before write',
self.add_settable(cmd2.Settable('conserve_write', bool, 'Read and compare before write', self,
onchange_cb=self._onchange_conserve_write))
self.json_pretty_print = True
self.add_settable(cmd2.Settable('json_pretty_print',
bool, 'Pretty-Print JSON output'))
bool, 'Pretty-Print JSON output', self))
self.apdu_trace = False
self.add_settable(cmd2.Settable('apdu_trace', bool, 'Trace and display APDUs exchanged with card',
self.add_settable(cmd2.Settable('apdu_trace', bool, 'Trace and display APDUs exchanged with card', self,
onchange_cb=self._onchange_apdu_trace))
self.equip(card, rs)
@@ -189,7 +189,7 @@ class PysimApp(cmd2.Cmd):
self.register_command_set(Iso7816Commands())
self.register_command_set(Ts102222Commands())
self.register_command_set(PySimCommands())
#self.iccid, sw = self.card.read_iccid()
self.iccid, sw = self.card.read_iccid()
self.lchan.select('MF', self)
rc = True
else:
@@ -287,23 +287,23 @@ class PysimApp(cmd2.Cmd):
sys.stderr = self._stderr_backup
def _show_failure_sign(self):
self.poutput(style(" +-------------+", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" + ### +", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" +-------------+", fg=fg.bright_red))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ### +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_RED))
self.poutput("")
def _show_success_sign(self):
self.poutput(style(" +-------------+", fg=fg.bright_green))
self.poutput(style(" + ## +", fg=fg.bright_green))
self.poutput(style(" + ## +", fg=fg.bright_green))
self.poutput(style(" + # ## +", fg=fg.bright_green))
self.poutput(style(" + ## # +", fg=fg.bright_green))
self.poutput(style(" + ## +", fg=fg.bright_green))
self.poutput(style(" +-------------+", fg=fg.bright_green))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + # ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## # +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_GREEN))
self.poutput("")
def _process_card(self, first, script_path):

View File

@@ -27,11 +27,10 @@ logger = colorlog.getLogger()
# merge all of the command sets into one global set. This will override instructions,
# the one from the 'last' set in the addition below will prevail.
from pySim.apdu.ts_51_011 import ApduCommands as SimApduCommands
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
ApduCommands = SimApduCommands + UiccApduCommands + UsimApduCommands #+ GpApduCommands
ApduCommands = UiccApduCommands + UsimApduCommands #+ GpApduCommands
class DummySimLink(LinkBase):

View File

@@ -1,337 +0,0 @@
# coding=utf-8
"""APDU definitions/decoders of 3GPP TS 51.011, the classic SIM spec.
(C) 2022 by Harald Welte <laforge@osmocom.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
from pySim.construct import *
from pySim.filesystem import *
from pySim.apdu import ApduCommand, ApduCommandSet
from typing import Optional, Dict, Tuple
logger = logging.getLogger(__name__)
# TS 51.011 Section 9.2.1
class SimSelect(ApduCommand, n='SELECT', ins=0xA4, cla=['A0']):
_apdu_case = 4
def process_on_lchan(self, lchan: RuntimeLchan):
path = [self.cmd_data[i:i+2] for i in range(0, len(self.cmd_data), 2)]
for file in path:
file_hex = b2h(file)
sels = lchan.selected_file.get_selectables(['FIDS'])
if file_hex in sels:
if self.successful:
#print("\tSELECT %s" % sels[file_hex])
lchan.selected_file = sels[file_hex]
else:
#print("\tSELECT %s FAILED" % sels[file_hex])
pass
continue
logger.warning('SELECT UNKNOWN FID %s (%s)' % (file_hex, '/'.join([b2h(x) for x in path])))
if len(self.cmd_data) != 2:
raise ValueError('Expecting a 2-byte FID')
# decode the SELECT response
if self.successful:
self.file = lchan.selected_file
if 'body' in self.rsp_dict:
# not every SELECT is asking for the FCP in response...
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
return None
# TS 51.011 Section 9.2.2
class SimStatus(ApduCommand, n='STATUS', ins=0xF2, cla=['A0']):
_apdu_case = 2
def process_on_lchan(self, lchan):
if self.successful:
if 'body' in self.rsp_dict:
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
def _decode_binary_p1p2(p1, p2) -> Dict:
ret = {}
if p1 & 0x80:
ret['file'] = 'sfi'
ret['sfi'] = p1 & 0x1f
ret['offset'] = p2
else:
ret['file'] = 'currently_selected_ef'
ret['offset'] = ((p1 & 0x7f) << 8) & p2
return ret
# TS 51.011 Section 9.2.3 / 31.101
class ReadBinary(ApduCommand, n='READ BINARY', ins=0xB0, cla=['A0']):
_apdu_case = 2
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short reads
if self.cmd_dict['offset'] != 0 or self.lr < self.file.size[0]:
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 51.011 Section 9.2.4 / 31.101
class UpdateBinary(ApduCommand, n='UPDATE BINARY', ins=0xD6, cla=['A0']):
_apdu_case = 3
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short writes
if self.cmd_dict['offset'] != 0 or self.lc < self.file.size[0]:
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
def _decode_record_p1p2(p1, p2):
ret = {}
ret['record_number'] = p1
if p2 >> 3 == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = p2 >> 3
mode = p2 & 0x7
if mode == 2:
ret['mode'] = 'next_record'
elif mode == 3:
ret['mode'] = 'previous_record'
elif mode == 8:
ret['mode'] = 'absolute_current'
return ret
# TS 51.011 Section 9.2.5
class ReadRecord(ApduCommand, n='READ RECORD', ins=0xB2, cla=['A0']):
_apdu_case = 2
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 51.011 Section 9.2.6
class UpdateRecord(ApduCommand, n='UPDATE RECORD', ins=0xDC, cla=['A0']):
_apdu_case = 3
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
# TS 51.011 Section 9.2.7
class Seek(ApduCommand, n='SEEK', ins=0xA2, cla=['A0']):
_apdu_case = 4
_construct_rsp = GreedyRange(Int8ub)
def _decode_p1p2(self):
ret = {}
sfi = self.p2 >> 3
if sfi == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = sfi
mode = self.p2 & 0x7
if mode in [0x4, 0x5]:
if mode == 0x4:
ret['mode'] = 'forward_search'
else:
ret['mode'] = 'backward_search'
ret['record_number'] = self.p1
self.col_id = '%02u' % ret['record_number']
elif mode == 6:
ret['mode'] = 'enhanced_search'
# TODO: further decode
elif mode == 7:
ret['mode'] = 'proprietary_search'
return ret
def _decode_cmd(self):
ret = self._decode_p1p2()
if self.cmd_data:
if ret['mode'] == 'enhanced_search':
ret['search_indication'] = b2h(self.cmd_data[:2])
ret['search_string'] = b2h(self.cmd_data[2:])
else:
ret['search_string'] = b2h(self.cmd_data)
return ret
def process_on_lchan(self, lchan):
self._determine_file(lchan)
return self.to_dict()
# TS 51.011 Section 9.2.8
class Increase(ApduCommand, n='INCREASE', ins=0x32, cla=['A0']):
_apdu_case = 4
PinConstructP2 = BitStruct('scope'/Enum(Flag, global_mf=0, specific_df_adf=1),
BitsInteger(2), 'reference_data_nr'/BitsInteger(5))
# TS 51.011 Section 9.2.9
class VerifyChv(ApduCommand, n='VERIFY CHV', ins=0x20, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
@staticmethod
def _pin_process(apdu):
processed = {
'scope': apdu.cmd_dict['p2']['scope'],
'referenced_data_nr': apdu.cmd_dict['p2']['reference_data_nr'],
}
if apdu.lc == 0:
# this is just a question on the counters remaining
processed['mode'] = 'check_remaining_attempts'
else:
processed['pin'] = b2h(apdu.cmd_data)
if apdu.sw[0] == 0x63:
processed['remaining_attempts'] = apdu.sw[1] & 0xf
return processed
@staticmethod
def _pin_is_success(sw):
if sw[0] == 0x63:
return True
else:
return False
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 51.011 Section 9.2.10
class ChangeChv(ApduCommand, n='CHANGE CHV', ins=0x24, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 51.011 Section 9.2.11
class DisableChv(ApduCommand, n='DISABLE CHV', ins=0x26, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 51.011 Section 9.2.12
class EnableChv(ApduCommand, n='ENABLE CHV', ins=0x28, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 51.011 Section 9.2.13
class UnblockChv(ApduCommand, n='UNBLOCK CHV', ins=0x2C, cla=['A0']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 51.011 Section 9.2.14
class Invalidate(ApduCommand, n='INVALIDATE', ins=0x04, cla=['A0']):
_apdu_case = 1
_construct_p1 = BitStruct(BitsInteger(4),
'select_mode'/Enum(BitsInteger(4), ef_by_file_id=0,
path_from_mf=8, path_from_current_df=9))
# TS 51.011 Section 9.2.15
class Rehabilitate(ApduCommand, n='REHABILITATE', ins=0x44, cla=['A0']):
_apdu_case = 1
_construct_p1 = Invalidate._construct_p1
# TS 51.011 Section 9.2.16
class RunGsmAlgorithm(ApduCommand, n='RUN GSM ALGORITHM', ins=0x88, cla=['A0']):
_apdu_case = 4
_construct = Struct('rand'/HexAdapter(Bytes(16)))
_construct_rsp = Struct('sres'/HexAdapter(Bytes(4)), 'kc'/HexAdapter(Bytes(8)))
# TS 51.011 Section 9.2.17
class Sleep(ApduCommand, n='SLEEP', ins=0xFA, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.18
class GetResponse(ApduCommand, n='GET RESPONSE', ins=0xC0, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.19
class TerminalProfile(ApduCommand, n='TERMINAL PROFILE', ins=0x10, cla=['A0']):
_apdu_case = 3
# TS 51.011 Section 9.2.20
class Envelope(ApduCommand, n='ENVELOPE', ins=0xC2, cla=['A0']):
_apdu_case = 4
# TS 51.011 Section 9.2.21
class Fetch(ApduCommand, n='FETCH', ins=0x12, cla=['A0']):
_apdu_case = 2
# TS 51.011 Section 9.2.22
class TerminalResponse(ApduCommand, n='TERMINAL RESPONSE', ins=0x14, cla=['A0']):
_apdu_case = 3
ApduCommands = ApduCommandSet('TS 51.011', cmds=[SimSelect, SimStatus, ReadBinary, UpdateBinary, ReadRecord,
UpdateRecord, Seek, Increase, VerifyChv, ChangeChv, DisableChv,
EnableChv, UnblockChv, Invalidate, Rehabilitate, RunGsmAlgorithm,
Sleep, GetResponse, TerminalProfile, Envelope, Fetch, TerminalResponse])

View File

@@ -2,7 +2,7 @@ from construct.lib.containers import Container, ListContainer
from construct.core import EnumIntegerString
import typing
from construct import *
from construct.core import evaluate, bytes2integer, integer2bytes, BitwisableString
from construct.core import evaluate, BitwisableString
from construct.lib import integertypes
from pySim.utils import b2h, h2b, swap_nibbles
import gsm0338
@@ -219,7 +219,7 @@ class GreedyInteger(Construct):
if evaluate(self.swapped, context):
data = swapbytes(data)
try:
return bytes2integer(data, self.signed)
return int.from_bytes(data, byteorder='big', signed=self.signed)
except ValueError as e:
raise IntegerError(str(e), path=path)
@@ -248,7 +248,7 @@ class GreedyInteger(Construct):
raise IntegerError(f"value {obj} is not an integer", path=path)
length = self.__bytes_required(obj, self.minlen)
try:
data = integer2bytes(obj, length, self.signed)
data = obj.to_bytes(length, byteorder='big', signed=self.signed)
except ValueError as e:
raise IntegerError(str(e), path=path)
if evaluate(self.swapped, context):

View File

@@ -1,461 +0,0 @@
"""Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115."""
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.construct import *
from pySim.utils import b2h
from pySim.sms import UserDataHeader
from construct import *
from bidict import bidict
import zlib
import abc
import struct
from typing import Optional
# ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS
# 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP
# CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data
# CAT_TP TCP/IP SMS
# CPI 0x01 0x01 =IEIa=70,len=0
# CHI NULL NULL NULL
# CPI, CPL and CHL included in RC/CC/DS true true
# RPI 0x02 0x02 =IEIa=71,len=0
# RHI NULL NULL
# RPI, RPL and RHL included in RC/CC/DS true true
# packet-id 0-bf,ff 0-bf,ff
# identification packet false 102 225 tbl 6
# KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK
# TS 102 225 Table 5
ota_status_codes = bidict({
0x00: 'PoR OK',
0x01: 'RC/CC/DS failed',
0x02: 'CNTR low',
0x03: 'CNTR high',
0x04: 'CNTR blocked',
0x05: 'Ciphering error',
0x06: 'Unidentified security error',
0x07: 'Insufficient memory',
0x08: 'more time',
0x09: 'TAR unknown',
0x0a: 'Insufficient security level',
0x0b: 'Actual Response in SMS-SUBMIT', # 31.115
0x0c: 'Actual Response in USSD', # 31.115
})
# ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7
ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3,
cntr_blocked=4, ciphering_error=5, undefined_security_error=6,
insufficient_memory=7, more_time_needed=8, tar_unknown=9,
insufficient_security_level=0x0A,
actual_response_sms_submit=0x0B,
actual_response_ussd=0x0C)
# ETSI TS 102 226 Section 5.1.2
CompactRemoteResp = Struct('number_of_commands'/Int8ub,
'last_status_word'/HexAdapter(Bytes(2)),
'last_response_data'/HexAdapter(GreedyBytes))
RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
SPI = BitStruct( # first octet
Padding(3),
'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
counter_must_be_higher=2, counter_must_be_lower=3),
'ciphering'/Flag,
'rc_cc_ds'/RC_CC_DS,
# second octet
Padding(2),
'por_in_submit'/Flag,
'por_shall_be_ciphered'/Flag,
'por_rc_cc_ds'/RC_CC_DS,
'por'/Enum(BitsInteger(2), no_por=0,
por_required=1, por_only_when_error=2)
)
# TS 102 225 Section 5.1.2
KIC = BitStruct('key'/BitsInteger(4),
'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
aes_cbc=2)
)
# TS 102 225 Section 5.1.3.1
KID_CC = BitStruct('key'/BitsInteger(4),
'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
aes_cmac=2)
)
# TS 102 225 Section 5.1.3.2
KID_RC = BitStruct('key'/BitsInteger(4),
'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3)
)
SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub,
'cmd_hdr_len'/Int8ub,
'spi'/SPI,
'kic'/KIC,
'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }),
'tar'/Bytes(3),
'secured_data'/GreedyBytes)
class OtaKeyset:
"""The OTA related data (key material, counter) to be used in encrypt/decrypt."""
def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes,
algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0):
self.algo_crypt = algo_crypt
self.kic = bytes(kic)
self.kic_idx = kic_idx
self.algo_auth = algo_auth
self.kid = bytes(kid)
self.kid_idx = kid_idx
self.cntr = cntr
@property
def auth(self):
"""Return an instance of the matching OtaAlgoAuth."""
return OtaAlgoAuth.fromKeyset(self)
@property
def crypt(self):
"""Return an instance of the matching OtaAlgoCrypt."""
return OtaAlgoCrypt.fromKeyset(self)
class OtaCheckError(Exception):
pass
class OtaDialect(abc.ABC):
"""Base Class for OTA dialects such as SMS, BIP, ..."""
def _compute_sig_len(self, spi:SPI):
if spi['rc_cc_ds'] == 'no_rc_cc_ds':
return 0
elif spi['rc_cc_ds'] == 'rc': # CRC-32
return 4
elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
# TODO: this is not entirely correct, as in AES case it could be 4 or 8
return 8
else:
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
@abc.abstractmethod
def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes:
pass
@abc.abstractmethod
def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]):
"""Decode a response into a response packet and, if indicted (by a
response status of `"por_ok"`) a decoded response.
The response packet's common characteristics are not fully determined,
and (so far) completely proprietary per dialect."""
pass
from Crypto.Cipher import DES, DES3, AES
from Crypto.Hash import CMAC
class OtaAlgo(abc.ABC):
iv = b'\x00\x00\x00\x00\x00\x00\x00\x00'
blocksize = None
enum_name = None
@staticmethod
def _get_padding(in_len: int, multiple: int, padding: int = 0):
"""Return padding bytes towards multiple of N."""
if in_len % multiple == 0:
return b''
pad_cnt = multiple - (in_len % multiple)
return b'\x00' * pad_cnt
@staticmethod
def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0):
"""Pad input bytes to multiple of N."""
return indat + OtaAlgo._get_padding(len(indat), multiple, padding)
def pad_to_blocksize(self, indat: bytes, padding: int = 0):
"""Pad the given input data to multiple of the cipher block size."""
return self._pad_to_multiple(indat, self.blocksize, padding)
def __init__(self, otak: OtaKeyset):
self.otak = otak
def __str__(self):
return self.__class__.__name__
class OtaAlgoCrypt(OtaAlgo, abc.ABC):
def __init__(self, otak: OtaKeyset):
if self.enum_name != otak.algo_crypt:
raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
super().__init__(otak)
def encrypt(self, data:bytes) -> bytes:
"""Encrypt given input bytes using the key material given in constructor."""
padded_data = self.pad_to_blocksize(data)
return self._encrypt(data)
def decrypt(self, data:bytes) -> bytes:
"""Decrypt given input bytes using the key material given in constructor."""
return self._decrypt(data)
@abc.abstractmethod
def _encrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@abc.abstractmethod
def _decrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@classmethod
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
"""Resolve the class for the encryption algorithm of otak and instantiate it."""
for subc in cls.__subclasses__():
if subc.enum_name == otak.algo_crypt:
return subc(otak)
raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth)
class OtaAlgoAuth(OtaAlgo, abc.ABC):
def __init__(self, otak: OtaKeyset):
if self.enum_name != otak.algo_auth:
raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
super().__init__(otak)
def sign(self, data:bytes) -> bytes:
"""Compute the CC/CR check bytes for the input data using key material
given in constructor."""
padded_data = self.pad_to_blocksize(data)
sig = self._sign(padded_data)
return sig
def check_sig(self, data:bytes, cc_received:bytes):
"""Compute the CC/CR check bytes for the input data and compare against cc_received."""
cc = self.sign(data)
if cc_received != cc:
raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc)))
@abc.abstractmethod
def _sign(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@classmethod
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
"""Resolve the class for the authentication algorithm of otak and instantiate it."""
for subc in cls.__subclasses__():
if subc.enum_name == otak.algo_auth:
return subc(otak)
raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth)
class OtaAlgoCryptDES(OtaAlgoCrypt):
"""DES is insecure. For backwards compatibility with pre-Rel8"""
name = 'DES'
enum_name = 'single_des'
blocksize = 8
def _encrypt(self, data:bytes) -> bytes:
cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
return cipher.encrypt(data)
def _decrypt(self, data:bytes) -> bytes:
cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
return cipher.decrypt(data)
class OtaAlgoAuthDES(OtaAlgoAuth):
"""DES is insecure. For backwards compatibility with pre-Rel8"""
name = 'DES'
enum_name = 'single_des'
blocksize = 8
def _sign(self, data:bytes) -> bytes:
cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv)
ciph = cipher.encrypt(data)
return ciph[len(ciph) - 8:]
class OtaAlgoCryptDES3(OtaAlgoCrypt):
name = '3DES'
enum_name = 'triple_des_cbc2'
blocksize = 8
def _encrypt(self, data:bytes) -> bytes:
cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
return cipher.encrypt(data)
def _decrypt(self, data:bytes) -> bytes:
cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
return cipher.decrypt(data)
class OtaAlgoAuthDES3(OtaAlgoAuth):
name = '3DES'
enum_name = 'triple_des_cbc2'
blocksize = 8
def _sign(self, data:bytes) -> bytes:
cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv)
ciph = cipher.encrypt(data)
return ciph[len(ciph) - 8:]
class OtaAlgoCryptAES(OtaAlgoCrypt):
name = 'AES'
enum_name = 'aes_cbc'
blocksize = 16 # TODO: is this needed?
def _encrypt(self, data:bytes) -> bytes:
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
return cipher.encrypt(data)
def _decrypt(self, data:bytes) -> bytes:
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
return cipher.decrypt(data)
class OtaAlgoAuthAES(OtaAlgoAuth):
name = 'AES'
enum_name = 'aes_cmac'
blocksize = 16 # TODO: is this needed?
def _sign(self, data:bytes) -> bytes:
cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8)
cmac.update(data)
ciph = cmac.digest()
return ciph[len(ciph) - 8:]
class OtaDialectSms(OtaDialect):
"""OTA dialect for SMS based transport, as described in 3GPP TS 31.115."""
SmsResponsePacket = Struct('rpl'/Int16ub,
'rhl'/Int8ub,
'tar'/Bytes(3),
'cntr'/Bytes(5),
'pcntr'/Int8ub,
'response_status'/ResponseStatus,
'cc_rc'/Bytes(this.rhl-10),
'secured_data'/GreedyBytes)
def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes:
# length of signature in octets
len_sig = self._compute_sig_len(spi)
pad_cnt = 0
if spi['ciphering']: # ciphering is requested
# append padding bytes to end up with blocksize
len_cipher = 6 + len_sig + len(apdu)
padding = otak.crypt._get_padding(len_cipher, otak.crypt.blocksize)
pad_cnt = len(padding)
apdu += padding
kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt}
kid = {'key': otak.kid_idx, 'algo': otak.algo_auth}
# CHL = number of octets from (and including) SPI to the end of RC/CC/DS
# 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1)
chl = 13 + len_sig
# CHL + SPI (+ KIC + KID)
c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3))
part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
#print("part_head: %s" % b2h(part_head))
# CNTR + PCNTR (CNTR not used)
part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
#print("part_cnt: %s" % b2h(part_cnt))
envelope_data = part_head + part_cnt + apdu
#print("envelope_data: %s" % b2h(envelope_data))
# 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
# CPL from and including CPI to end of secured data, including any padding for ciphering
cpl = len(envelope_data) + len_sig
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
#print("envelope_data with cpl: %s" % b2h(envelope_data))
if spi['rc_cc_ds'] == 'cc':
cc = otak.auth.sign(envelope_data)
envelope_data = part_cnt + cc + apdu
elif spi['rc_cc_ds'] == 'rc':
# CRC32
crc32 = zlib.crc32(envelope_data) & 0xffffffff
envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu
elif spi['rc_cc_ds'] == 'no_rc_cc_ds':
envelope_data = part_cnt + apdu
else:
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
#print("envelope_data with sig: %s" % b2h(envelope_data))
# encrypt as needed
if spi['ciphering']: # ciphering is requested
ciph = otak.crypt.encrypt(envelope_data)
envelope_data = part_head + ciph
# prefix with another CPL
cpl = len(envelope_data)
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
else:
envelope_data = part_head + envelope_data
#print("envelope_data: %s" % b2h(envelope_data))
return envelope_data
def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]):
if isinstance(data, str):
data = h2b(data)
# plain-text POR: 027100000e0ab000110000000000000001612f
# UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS
# 02 71 00 000e 0a b00011 0000000000 00 00 01 612f
# POR with CC: 027100001612b000110000000000000055f47118381175fb01612f
# POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
if data[0] != 0x02:
raise ValueError('Unexpected UDL=0x%02x' % data[0])
udhd, remainder = UserDataHeader.fromBytes(data)
if not udhd.has_ie(0x71):
raise ValueError('RPI 0x71 not found in UDH')
rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered
res = self.SmsResponsePacket.parse(remainder)
if spi['por_shall_be_ciphered']:
# decrypt
ciphered_part = remainder[6:]
deciph = otak.crypt.decrypt(ciphered_part)
temp_data = rph_rhl_tar + deciph
res = self.SmsResponsePacket.parse(temp_data)
# remove specified number of padding bytes, if any
if res['pcntr'] != 0:
# this conditional is needed as python [:-0] renders an empty return!
res['secured_data'] = res['secured_data'][:-res['pcntr']]
remainder = temp_data
# is there a CC/RC present?
len_sig = res['rhl'] - 10
if spi['por_rc_cc_ds'] == 'no_rc_cc_ds':
if len_sig:
raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig)
elif spi['por_rc_cc_ds'] == 'cc':
# verify signature
# UDH is part of CC/RC!
udh = data[:3]
# RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC
rpl_rhl_tar_cntr_pcntr_sts = remainder[:13]
# remove the CC/RC bytes
temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:]
cc = otak.auth.check_sig(temp_data, res['cc_rc'])
# TODO: CRC
else:
raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
# TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
if res.response_status == 'por_ok':
dec = CompactRemoteResp.parse(res['secured_data'])
else:
dec = None
return (res, dec)

View File

@@ -1,394 +0,0 @@
"""Code related to SMS Encoding/Decoding"""
# simplistic SMS T-PDU code, as unfortunately nobody bothered to port the python smspdu
# module to python3, and I gave up after >= 3 hours of trying and failing to do so
# (C) 2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import typing
import abc
from pprint import pprint as pp
from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag
from construct import Struct, Enum, Tell, BitStruct, this, Padding
from construct import Prefixed, GreedyRange, GreedyBytes
from pySim.construct import HexAdapter, BcdAdapter, TonNpi
from pySim.utils import Hexstr, h2b, b2h
from smpp.pdu import pdu_types
BytesOrHex = typing.Union[Hexstr, bytes]
class UserDataHeader:
# a single IE in the user data header
ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
# parser for the full UDH: Length octet followed by sequence of IEs
_construct = Struct('ies'/Prefixed(Int8ub, GreedyRange(ie_c)),
'data'/GreedyBytes)
def __init__(self, ies=[]):
self.ies = ies
def __repr__(self) -> str:
return 'UDH(%r)' % self.ies
def has_ie(self, iei:int) -> bool:
for ie in self.ies:
if ie['iei'] == iei:
return True
return False
@classmethod
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
if isinstance(inb, str):
inb = h2b(inb)
res = cls._construct.parse(inb)
return cls(res['ies']), res['data']
def toBytes(self) -> bytes:
return self._construct.build({'ies':self.ies, 'data':b''})
def smpp_dcs_is_8bit(dcs: pdu_types.DataCoding) -> bool:
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
return True
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
return True
if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
return True
else:
return False
def ensure_smpp_is_8bit(dcs: pdu_types.DataCoding):
if not smpp_dcs_is_8bit(smpp_pdu.params['data_coding']):
raise ValueError('We only support 8bit coded SMS for now')
class AddressField:
"""Representation of an address field as used in SMS T-PDU."""
_construct = Struct('addr_len'/Int8ub,
'type_of_addr'/TonNpi,
'digits'/BcdAdapter(Bytes(this.addr_len//2 + this.addr_len%2)),
'tell'/Tell)
def __init__(self, digits, ton='unknown', npi='unknown'):
self.ton = ton
self.npi = npi
self.digits = digits
def __str__(self):
return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits)
@classmethod
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
"""Construct an AddressField instance from the binary T-PDU address format."""
if isinstance(inb, str):
inb = h2b(inb)
res = cls._construct.parse(inb)
#pp(res)
#print("size: %s" % cls._construct.sizeof())
ton = res['type_of_addr']['type_of_number']
npi = res['type_of_addr']['numbering_plan_id']
# return resulting instance + remainder bytes
return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:]
@classmethod
def fromSmpp(cls, addr, ton, npi) -> 'AddressField':
"""Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu."""
smpp_map_npi = {
'UNKNOWN': 'unknown',
'ISDN': 'isdn_e164',
'DATA': 'data_x121',
'TELEX': 'telex_f69',
'LAND_MOBILE': 'sc_specific6',
'NATIONAL': 'national',
'PRIVATE': 'private',
'ERMES': 'ermes',
}
smpp_map_ton = {
'UNKNOWN': 'unknown',
'INTERNATIONAL': 'international',
'NATIONAL': 'national',
'NETWORK_SPECIFIC': 'network_specific',
'SUBSCRIBER_NUMBER': 'short_code',
'ALPHANUMERIC': 'alphanumeric',
'ABBREVIATED': 'abbreviated',
}
# return the resulting instance
return cls(addr.decode('ascii'), smpp_map_ton[ton.name], smpp_map_npi[npi.name])
def toBytes(self) -> bytes:
"""Encode the AddressField into the binary representation as used in T-PDU."""
num_digits = len(self.digits)
if num_digits % 2:
self.digits += 'f'
d = {
'addr_len': num_digits,
'type_of_addr': {
'ext': True,
'type_of_number': self.ton,
'numbering_plan_id': self.npi,
},
'digits': self.digits,
}
return self._construct.build(d)
class SMS_TPDU(abc.ABC):
"""Base class for a SMS T-PDU."""
def __init__(self, **kwargs):
self.tp_mti = kwargs.get('tp_mti', None)
self.tp_rp = kwargs.get('tp_rp', False)
self.tp_udhi = kwargs.get('tp_udhi', False)
self.tp_pid = kwargs.get('tp_pid', None)
self.tp_dcs = kwargs.get('tp_dcs', None)
self.tp_udl = kwargs.get('tp_udl', None)
self.tp_ud = kwargs.get('tp_ud', None)
class SMS_DELIVER(SMS_TPDU):
"""Representation of a SMS-DELIVER T-PDU."""
flags_construct = BitStruct('tp_rp'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, 'tp_sri'/Flag,
Padding(1), 'tp_mms'/Flag, 'tp_mti'/BitsInteger(2))
def __init__(self, **kwargs):
kwargs['tp_mti'] = 0
super().__init__(**kwargs)
self.tp_lp = kwargs.get('tp_lp', False)
self.tp_mms = kwargs.get('tp_mms', False)
self.tp_oa = kwargs.get('tp_oa', None)
self.tp_scts = kwargs.get('tp_scts', None)
self.tp_sri = kwargs.get('tp_sri', False)
def __repr__(self):
return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud)
@classmethod
def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
"""Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU."""
if isinstance(inb, str):
inb = h2b(inb)
flags = inb[0]
d = SMS_DELIVER.flags_construct.parse(inb)
oa, remainder = AddressField.fromBytes(inb[1:])
d['tp_oa'] = oa
offset = 0
d['tp_pid'] = remainder[offset]
offset += 1
d['tp_dcs'] = remainder[offset]
offset += 1
# TODO: further decode
d['tp_scts'] = remainder[offset:offset+7]
offset += 7
d['tp_udl'] = remainder[offset]
offset += 1
d['tp_ud'] = remainder[offset:]
return cls(**d)
def toBytes(self) -> bytes:
"""Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU."""
outb = bytearray()
d = {
'tp_mti': self.tp_mti, 'tp_mms': self.tp_mms, 'tp_lp': self.tp_lp,
'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_sri': self.tp_sri,
}
flags = SMS_DELIVER.flags_construct.build(d)
outb.extend(flags)
outb.extend(self.tp_oa.toBytes())
outb.append(self.tp_pid)
outb.append(self.tp_dcs)
outb.extend(self.tp_scts)
outb.append(self.tp_udl)
outb.extend(self.tp_ud)
return outb
@classmethod
def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER':
"""Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu."""
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
return cls.fromSmppSubmit(cls, smpp_pdu)
else:
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
@classmethod
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER':
"""Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'],
smpp_pdu.params['source_addr_ton'],
smpp_pdu.params['source_addr_npi'])
tp_ud = smpp_pdu.params['short_message']
d = {
'tp_lp': False,
'tp_mms': False,
'tp_oa': tp_oa,
'tp_scts': h2b('22705200000000'), # FIXME
'tp_sri': False,
'tp_rp': False,
'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
'tp_pid': smpp_pdu.params['protocol_id'],
'tp_dcs': 0xF6, # we only deal with binary SMS here
'tp_udl': len(tp_ud),
'tp_ud': tp_ud,
}
return cls(**d)
class SMS_SUBMIT(SMS_TPDU):
"""Representation of a SMS-DELIVER T-PDU."""
flags_construct = BitStruct('tp_srr'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag,
'tp_vpf'/Enum(BitsInteger(2), none=0, relative=2, enhanced=1, absolute=3),
'tp_rd'/Flag, 'tp_mti'/BitsInteger(2))
def __init__(self, **kwargs):
kwargs['tp_mti'] = 1
super().__init__(**kwargs)
self.tp_rd = kwargs.get('tp_rd', False)
self.tp_vpf = kwargs.get('tp_vpf', 'none')
self.tp_srr = kwargs.get('tp_srr', False)
self.tp_mr = kwargs.get('tp_mr', None)
self.tp_da = kwargs.get('tp_da', None)
self.tp_vp = kwargs.get('tp_vp', None)
def __repr__(self):
return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud)
@classmethod
def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
"""Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU."""
offset = 0
if isinstance(inb, str):
inb = h2b(inb)
d = SMS_SUBMIT.flags_construct.parse(inb)
offset += 1
d['tp_mr']= inb[offset]
offset += 1
da, remainder = AddressField.fromBytes(inb[2:])
d['tp_da'] = da
offset = 0
d['tp_pid'] = remainder[offset]
offset += 1
d['tp_dcs'] = remainder[offset]
offset += 1
if d['tp_vpf'] == 'none':
pass
elif d['tp_vpf'] == 'relative':
# TODO: further decode
d['tp_vp'] = remainder[offset:offset+1]
offset += 1
elif d['tp_vpf'] == 'enhanced':
# TODO: further decode
d['tp_vp'] = remainder[offset:offset+7]
offset += 7
pass
elif d['tp_vpf'] == 'absolute':
# TODO: further decode
d['tp_vp'] = remainder[offset:offset+7]
offset += 7
pass
else:
raise ValueError('Invalid VPF: %s' % d['tp_vpf'])
d['tp_udl'] = remainder[offset]
offset += 1
d['tp_ud'] = remainder[offset:]
return cls(**d)
def toBytes(self) -> bytes:
"""Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU."""
outb = bytearray()
d = {
'tp_mti': self.tp_mti, 'tp_rd': self.tp_rd, 'tp_vpf': self.tp_vpf,
'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_srr': self.tp_srr,
}
flags = SMS_SUBMIT.flags_construct.build(d)
outb.extend(flags)
outb.append(self.tp_mr)
outb.extend(self.tp_da.toBytes())
outb.append(self.tp_pid)
outb.append(self.tp_dcs)
if self.tp_vpf != 'none':
outb.extend(self.tp_vp)
outb.append(self.tp_udl)
outb.extend(self.tp_ud)
return outb
@classmethod
def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
"""Construct a SMS_DELIVER instance from the format used by smpp.pdu."""
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
return cls.fromSmppSubmit(cls, smpp_pdu)
else:
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
@classmethod
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT':
"""Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'],
smpp_pdu.params['dest_addr_ton'],
smpp_pdu.params['dest_addr_npi'])
tp_ud = smpp_pdu.params['short_message']
#vp_smpp = smpp_pdu.params['validity_period']
#if not vp_smpp:
# vpf = 'none'
d = {
'tp_rd': True if smpp_pdu.params['replace_if_present_flag'].name == 'REPLACE' else False,
'tp_vpf': None, # vpf,
'tp_rp': False, # related to ['registered_delivery'] ?
'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
'tp_srr': True if smpp_pdu.params['registered_delivery'] else False,
'tp_mr': 0, # FIXME: sm_default_msg_id ?
'tp_da': tp_da,
'tp_pid': smpp_pdu.params['protocol_id'],
'tp_dcs': 0xF6, # FIXME: we only deal with binary SMS here
'tp_vp': None, # FIXME: implement VPF conversion
'tp_udl': len(tp_ud),
'tp_ud': tp_ud,
}
return cls(**d)
def toSmpp(self) -> pdu_types.PDU:
"""Translate a SMS_DELIVER instance to a smpp.pdu.pdu_types.SubmitSM instance."""
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT)
reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED)
if self.tp_rp:
repl_if = pdu_types.ReplaceIfPresentFlag.REPLACE
else:
repl_if = pdu_types.ReplaceIfPresentFlag.DO_NOT_REPLACE
# we only deal with binary SMS here:
if self.tp_dcs != 0xF6:
raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now')
dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
return pdu_types.SubmitSM(service_type='',
source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC,
source_addr_npi=pdu_types.AddrNpi.UNKNOWN,
source_addr='simcard',
dest_addr_ton=FIXME(self.tp_da.ton),
dest_addr_npi=FIXME(self.tp_da.npi),
destination_addr=self.tp_da.digits,
esm_class=esm_class,
protocol_id=self.tp_pid,
priority_flag=pdu_types.PriorityFlag.LEVEL_0,
#schedule_delivery_time,
#validity_period,
registered_delivery=reg_del,
replace_if_present_flag=repl_if,
data_coding=dc,
#sm_default_msg_id,
short_message=self.tp_ud)

View File

@@ -150,28 +150,6 @@ class EF_SIM_AUTH_KEY(TransparentEF):
HexAdapter(Bytes(16)))
)
class EF_HTTPS_CFG(TransparentEF):
def __init__(self, fid='6f2a', name='EF.HTTPS_CFG'):
super().__init__(fid, name=name, desc='HTTPS configuration')
class EF_HTTPS_KEYS(TransparentEF):
KeyRecord = Struct('security_domain'/Int8ub,
'key_type'/Enum(Int8ub, des=0x80, psk=0x85, aes=0x88),
'key_version'/Int8ub,
'key_id'/Int8ub,
'key_length'/Int8ub,
'key'/HexAdapter(Bytes(this.key_length)))
def __init__(self, fid='6f2b', name='EF.HTTPS_KEYS'):
super().__init__(fid, name=name, desc='HTTPS PSK and DEK keys')
self._construct = GreedyRange(self.KeyRecord)
class EF_HTTPS_POLL(TransparentEF):
TimeUnit = Enum(Int8ub, seconds=0, minutes=1, hours=2, days=3, ten_days=4)
def __init__(self, fid='6f2c', name='EF.HTTPS_POLL'):
super().__init__(fid, name=name, desc='HTTPS polling interval')
self._construct = Struct(Const(b'\x82'), 'time_unit'/self.TimeUnit, 'value'/Int8ub,
'adm_session_triggering_tlv'/HexAdapter(GreedyBytes))
class DF_SYSTEM(CardDF):
def __init__(self):
@@ -190,9 +168,6 @@ class DF_SYSTEM(CardDF):
EF_0348_COUNT(),
EF_GP_COUNT(),
EF_GP_DIV_DATA(),
EF_HTTPS_CFG(),
EF_HTTPS_KEYS(),
EF_HTTPS_POLL(),
]
self.add_files(files)

View File

@@ -3,11 +3,9 @@ pyserial
pytlv
cmd2==1.5
jsonpath-ng
construct
construct>=2.9.51
bidict
gsm0338
pyyaml>=5.1
termcolor
colorlog
pycryptodome
git+https://github.com/hologram-io/smpp.pdu

View File

@@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='pySim',
version='1.1',
version='1.0',
packages=['pySim', 'pySim.transport'],
url='https://osmocom.org/projects/pysim/wiki',
license='GPLv2',
@@ -14,13 +14,11 @@ setup(
"pytlv",
"cmd2 >= 1.3.0, < 2.0.0",
"jsonpath-ng",
"construct >= 2.9",
"construct >= 2.9.51",
"bidict",
"gsm0338",
"termcolor",
"colorlog",
"pycryptodome",
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
"colorlog"
],
scripts=[
'pySim-prog.py',

View File

@@ -1,245 +0,0 @@
#!/usr/bin/env python3
#
# Program to emulate the entire communication path SMSC-MSC-BSC-BTS-ME
# that is usually between an OTA backend and the SIM card. This allows
# to play with SIM OTA technology without using a mobile network or even
# a mobile phone.
#
# An external application must encode (and encrypt/sign) the OTA SMS
# and submit them via SMPP to this program, just like it would submit
# it normally to a SMSC (SMS Service Centre). The program then re-formats
# the SMPP-SUBMIT into a SMS DELIVER TPDU and passes it via an ENVELOPE
# APDU to the SIM card that is locally inserted into a smart card reader.
#
# The path from SIM to external OTA application works the opposite way.
import argparse
import logging
import colorlog
from pprint import pprint as pp
from twisted.protocols import basic
from twisted.internet import defer, endpoints, protocol, reactor, task
from twisted.cred.portal import IRealm
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from twisted.cred.portal import Portal
from zope.interface import implementer
from smpp.twisted.config import SMPPServerConfig
from smpp.twisted.server import SMPPServerFactory, SMPPBindManager
from smpp.twisted.protocol import SMPPSessionStates, DataHandlerResponse
from smpp.pdu import pdu_types, operations, pdu_encoding
from pySim.sms import SMS_DELIVER, AddressField
from pySim.transport import LinkBase, ProactiveHandler, argparse_add_reader_args, init_reader
from pySim.commands import SimCardCommands
from pySim.cards import UsimCard
from pySim.exceptions import *
from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload
from pySim.cat import DeviceIdentities, Address
from pySim.utils import b2h, h2b
logger = logging.getLogger(__name__)
# MSISDNs to use when generating proactive SMS messages
SIM_MSISDN='23'
ESME_MSISDN='12'
# HACK: we need some kind of mapping table between system_id and card-reader
# or actually route based on MSISDNs
hackish_global_smpp = None
class Proact(ProactiveHandler):
def __init__(self, smpp_factory):
self.smpp_factory = smpp_factory
@staticmethod
def _find_first_element_of_type(instlist, cls):
for i in instlist:
if isinstance(i, cls):
return i
return None
"""Call-back which the pySim transport core calls whenever it receives a
proactive command from the SIM."""
def handle_SendShortMessage(self, data):
"""Card requests sending a SMS."""
pp(data)
# Relevant parts in data: Address, SMS_TPDU
addr_ie = _find_first_element_of_type(data.children, Address)
sms_tpdu_ie = _find_first_element_of_type(data.children, SMS_TPDU)
raw_tpdu = sms_tpdu_ie.decoded['tpdu']
submit = SMS_SUBMIT.fromBytes(raw_tpdu)
self.send_sms_via_smpp(data)
def handle_OpenChannel(self, data):
"""Card requests opening a new channel via a UDP/TCP socket."""
pp(data)
pass
def handle_CloseChannel(self, data):
"""Close a channel."""
pp(data)
pass
def handleReceiveData(self, data):
"""Receive/read data from the socket."""
pp(data)
pass
def handleSendData(self, data):
"""Send/write data to the socket."""
pp(data)
pass
def getChannelStatus(self, data):
pp(data)
pass
def send_sms_via_smpp(self, data):
# while in a normal network the phone/ME would *submit* a message to the SMSC,
# we are actually emulating the SMSC itself, so we must *deliver* the message
# to the ESME
dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
deliver = operations.DeliverSM(source_addr=SIM_MSISDN,
destination_addr=ESME_MSISDN,
esm_class=esm_class,
protocol_id=0x7F,
data_coding=dcs,
short_message=h2b(data))
hackish_global_smpp.sendDataRequest(deliver)
# # obtain the connection/binding of system_id to be used for delivering MO-SMS to the ESME
# connection = smpp_server.getBoundConnections[system_id].getNextBindingForDelivery()
# connection.sendDataRequest(deliver)
def dcs_is_8bit(dcs):
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
return True
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
return True
if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
return True
else:
return False
class MyServer:
@implementer(IRealm)
class SmppRealm:
def requestAvatar(self, avatarId, mind, *interfaces):
return ('SMPP', avatarId, lambda: None)
def __init__(self, tcp_port:int = 2775, bind_ip = '::'):
smpp_config = SMPPServerConfig(msgHandler=self._msgHandler,
systems={'test': {'max_bindings': 2}})
portal = Portal(self.SmppRealm())
credential_checker = InMemoryUsernamePasswordDatabaseDontUse()
credential_checker.addUser('test', 'test')
portal.registerChecker(credential_checker)
self.factory = SMPPServerFactory(smpp_config, auth_portal=portal)
logger.info('Binding Virtual SMSC to TCP Port %u at %s' % (tcp_port, bind_ip))
smppEndpoint = endpoints.TCP6ServerEndpoint(reactor, tcp_port, interface=bind_ip)
smppEndpoint.listen(self.factory)
self.tp = self.scc = self.card = None
def connect_to_card(self, tp: LinkBase):
self.tp = tp
self.scc = SimCardCommands(self.tp)
self.card = UsimCard(self.scc)
# this should be part of UsimCard, but FairewavesSIM breaks with that :/
self.scc.cla_byte = "00"
self.scc.sel_ctrl = "0004"
self.card.read_aids()
self.card.select_adf_by_aid(adf='usim')
# FIXME: create a more realistic profile than ffffff
self.scc.terminal_profile('ffffff')
def _msgHandler(self, system_id, smpp, pdu):
# HACK: we need some kind of mapping table between system_id and card-reader
# or actually route based on MSISDNs
global hackish_global_smpp
hackish_global_smpp = smpp
#pp(pdu)
if pdu.id == pdu_types.CommandId.submit_sm:
return self.handle_submit_sm(system_id, smpp, pdu)
else:
logging.warning('Rejecting non-SUBMIT commandID')
return pdu_types.CommandStatus.ESME_RINVCMDID
def handle_submit_sm(self, system_id, smpp, pdu):
# check for valid data coding scheme + PID
if not dcs_is_8bit(pdu.params['data_coding']):
logging.warning('Rejecting non-8bit DCS')
return pdu_types.CommandStatus.ESME_RINVDCS
if pdu.params['protocol_id'] != 0x7f:
logging.warning('Rejecting non-SIM PID')
return pdu_types.CommandStatus.ESME_RINVDCS
# 1) build a SMS-DELIVER (!) from the SMPP-SUBMIT
tpdu = SMS_DELIVER.fromSmppSubmit(pdu)
print(tpdu)
# 2) wrap into the CAT ENVELOPE for SMS-PP-Download
tpdu_ie = SMS_TPDU(decoded={'tpdu': b2h(tpdu.toBytes())})
dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
sms_dl = SMSPPDownload(children=[dev_ids, tpdu_ie])
# 3) send to the card
envelope_hex = b2h(sms_dl.to_tlv())
print("ENVELOPE: %s" % envelope_hex)
(data, sw) = self.scc.envelope(envelope_hex)
print("SW %s: %s" % (sw, data))
if sw == '9300':
# TODO send back RP-ERROR message with TP-FCS == 'SIM Application Toolkit Busy'
return pdu_types.CommandStatus.ESME_RSUBMITFAIL
elif sw == '9000' or sw[0:2] in ['6f', '62', '63']:
# data something like 027100000e0ab000110000000000000001612f or
# 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
# which is the user-data portion of the SMS starting with the UDH (027100)
# TODO: return the response back to the sender in an RP-ACK; PID/DCS like in CMD
deliver = operations.DeliverSM(service_type=pdu.params['service_type'],
source_addr_ton=pdu.params['dest_addr_ton'],
source_addr_npi=pdu.params['dest_addr_npi'],
source_addr=pdu.params['destination_addr'],
dest_addr_ton=pdu.params['source_addr_ton'],
dest_addr_npi=pdu.params['source_addr_npi'],
destination_addr=pdu.params['source_addr'],
esm_class=pdu.params['esm_class'],
protocol_id=pdu.params['protocol_id'],
priority_flag=pdu.params['priority_flag'],
data_coding=pdu.params['data_coding'],
short_message=h2b(data))
smpp.sendDataRequest(deliver)
return pdu_types.CommandStatus.ESME_ROK
else:
return pdu_types.CommandStatus.ESME_RSUBMITFAIL
option_parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argparse_add_reader_args(option_parser)
smpp_group = option_parser.add_argument_group('SMPP Options')
smpp_group.add_argument('--smpp-bind-port', type=int, default=2775,
help='TCP Port to bind the SMPP socket to')
smpp_group.add_argument('--smpp-bind-ip', default='::',
help='IPv4/IPv6 address to bind the SMPP socket to')
if __name__ == '__main__':
log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
colorlog.basicConfig(level=logging.INFO, format = log_format)
logger = colorlog.getLogger()
opts = option_parser.parse_args()
#tp = init_reader(opts, proactive_handler = Proact())
tp = init_reader(opts)
if tp is None:
exit(1)
tp.connect()
ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip)
ms.connect_to_card(tp)
reactor.run()

View File

@@ -1,23 +0,0 @@
#!/usr/bin/env python3
from pySim.sms import *
from pprint import pprint as pp
from construct import setGlobalPrintPrivateEntries
print(UserDataHeader.fromBytes('027100'))
print(UserDataHeader.fromBytes('027100abcdef'))
print(UserDataHeader.fromBytes('03710110'))
print(UserDataHeader.fromBytes('0571007001ffabcd'))
setGlobalPrintPrivateEntries(True)
pp(AddressField.fromBytes('0480214399'))
s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
pp(s)
print(s.tp_da)
pp(b2h(s.toBytes()))
d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
pp(d)
pp(b2h(d.toBytes()))

View File

@@ -1,85 +0,0 @@
#!/usr/bin/env python3
import unittest
from pySim.utils import h2b, b2h
from pySim.ota import *
class Test_SMS_3DES(unittest.TestCase):
tar = h2b('b00000')
"""Test the OtaDialectSms for 3DES algorithms."""
def __init__(self, foo, **kwargs):
super().__init__(foo, **kwargs)
# KIC1 + KID1 of 8988211000000467285
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
KID3 = h2b('12110C78E678C25408233076AA033615')
self.od = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
self.dialect = OtaDialectSms()
self.spi_base = {
'counter':'no_counter',
'ciphering': True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por': 'por_required',
'por_shall_be_ciphered': True,
'por_rc_cc_ds': 'cc',
}
def _check_response(self, r):
self.assertEqual(r['number_of_commands'], 1)
self.assertEqual(r['last_status_word'], '612f')
self.assertEqual(r['last_response_data'], u'')
self.assertEqual(r['response_status'], 'por_ok')
def test_resp_3des_ciphered(self):
spi = self.spi_base
spi['por_shall_be_ciphered'] = True
spi['por_rc_cc_ds'] = 'cc'
r = self.dialect.decode_resp(self.od, spi, '027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c')
self._check_response(r)
def test_resp_3des_signed(self):
spi = self.spi_base
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'cc'
r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb01612f')
self._check_response(r)
def test_resp_3des_signed_err(self):
"""Expect an OtaCheckError exception if the computed CC != received CC"""
spi = self.spi_base
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'cc'
with self.assertRaises(OtaCheckError) as context:
r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb02612f')
self.assertTrue('!= Computed CC' in str(context.exception))
def test_resp_3des_none(self):
spi = self.spi_base
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
r = self.dialect.decode_resp(self.od, spi, '027100000e0ab000110000000000000001612f')
self._check_response(r)
def test_cmd_3des_ciphered(self):
spi = self.spi_base
spi['ciphering'] = True
spi['rc_cc_ds'] = 'no_rc_cc_ds'
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
self.assertEqual(b2h(r), '00180d04193535b000000c8478b552a4ffc5a8f099b83cad7123')
def test_cmd_3des_signed(self):
spi = self.spi_base
spi['ciphering'] = False
spi['rc_cc_ds'] = 'cc'
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
self.assertEqual(b2h(r), '1502193535b00000000000000000072ea17bdb72060e00a40000023f00')
def test_cmd_3des_none(self):
spi = self.spi_base
spi['ciphering'] = False
spi['rc_cc_ds'] = 'no_rc_cc_ds'
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
self.assertEqual(b2h(r), '0d00193535b0000000000000000000a40000023f00')

View File

@@ -1,105 +0,0 @@
#!/usr/bin/env python3
import unittest
from pySim.utils import h2b, b2h
from pySim.sms import *
class Test_SMS_UDH(unittest.TestCase):
def test_single_ie(self):
udh, tail = UserDataHeader.fromBytes('027100')
self.assertEqual(len(udh.ies), 1)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
self.assertEqual(ie.length, 0)
self.assertEqual(ie.value, b'')
self.assertEqual(tail, b'')
def test_single_ie_tail(self):
udh, tail = UserDataHeader.fromBytes('027100abcdef')
self.assertEqual(len(udh.ies), 1)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
self.assertEqual(ie.length, 0)
self.assertEqual(ie.value, b'')
self.assertEqual(tail, b'\xab\xcd\xef')
def test_single_ie_value(self):
udh, tail = UserDataHeader.fromBytes('03710110')
self.assertEqual(len(udh.ies), 1)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
self.assertEqual(ie.length, 1)
self.assertEqual(ie.value, b'\x10')
self.assertEqual(tail, b'')
def test_two_ie_data_tail(self):
udh, tail = UserDataHeader.fromBytes('0571007001ffabcd')
self.assertEqual(len(udh.ies), 2)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
self.assertEqual(ie.length, 0)
self.assertEqual(ie.value, b'')
ie = udh.ies[1]
self.assertEqual(ie.iei, 0x70)
self.assertEqual(ie.length, 1)
self.assertEqual(ie.value, b'\xff')
self.assertEqual(tail, b'\xab\xcd')
def test_toBytes(self):
indata = h2b('0571007001ff')
udh, tail = UserDataHeader.fromBytes(indata)
encoded = udh.toBytes()
self.assertEqual(encoded, indata)
class Test_AddressField(unittest.TestCase):
def test_fromBytes(self):
encoded = h2b('0480214399')
af, trailer = AddressField.fromBytes(encoded)
self.assertEqual(trailer, b'\x99')
self.assertEqual(af.ton, 'unknown')
self.assertEqual(af.npi, 'unknown')
self.assertEqual(af.digits, '1234')
def test_fromBytes_odd(self):
af, trailer = AddressField.fromBytes('038021f399')
self.assertEqual(trailer, b'\x99')
self.assertEqual(af.ton, 'unknown')
self.assertEqual(af.npi, 'unknown')
self.assertEqual(af.digits, '123')
def test_toBytes(self):
encoded = h2b('04802143')
af, trailer = AddressField.fromBytes(encoded)
self.assertEqual(af.toBytes(), encoded)
def test_toBytes_odd(self):
af = AddressField('12345', 'international', 'isdn_e164')
encoded = af.toBytes()
self.assertEqual(encoded, h2b('05912143f5'))
class Test_SUBMIT(unittest.TestCase):
def test_fromBytes(self):
s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
self.assertEqual(s.tp_mti, 1)
self.assertEqual(s.tp_rd, True)
self.assertEqual(s.tp_vpf, 'relative')
self.assertEqual(s.tp_rp, False)
self.assertEqual(s.tp_udhi, True)
self.assertEqual(s.tp_srr, False)
self.assertEqual(s.tp_pid, 0)
self.assertEqual(s.tp_dcs, 0xf5)
self.assertEqual(s.tp_udl, 140)
class Test_DELIVER(unittest.TestCase):
def test_fromBytes(self):
d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
self.assertEqual(d.tp_mti, 0)
self.assertEqual(d.tp_mms, True)
self.assertEqual(d.tp_lp, False)
self.assertEqual(d.tp_rp, False)
self.assertEqual(d.tp_udhi, False)
self.assertEqual(d.tp_sri, False)
self.assertEqual(d.tp_pid, 0x7f)
self.assertEqual(d.tp_dcs, 0xf6)
self.assertEqual(d.tp_udl, 8)

View File

@@ -1,301 +0,0 @@
#!/usr/bin/env python3
#
# This program receive APDUs via the VPCD protocol of Frank Morgner's
# virtualsmartcard, encrypts them with OTA (over the air) keys and
# forwards them via SMPP to a SMSC (SMS service centre).
#
# In other words, you can use it as a poor man's OTA server, to enable
# you to use unmodified application software with PC/SC support to talk
# securely via OTA with a remote SMS card.
#
# This is very much a work in progress at this point.
#######################################################################
# twisted VPCD Library
#######################################################################
import logging
import struct
import abc
from typing import Union, Optional
from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from pySim.utils import b2h, h2b
logger = logging.getLogger(__name__)
class VirtualCard(abc.ABC):
"""Abstract base class for a virtual smart card."""
def __init__(self, atr: Union[str, bytes]):
if isinstance(atr, str):
atr = h2b(atr)
self.atr = atr
@abc.abstractmethod
def power_change(self, new_state: bool):
"""Power the card on or off."""
pass
@abc.abstractmethod
def reset(self):
"""Reset the card."""
pass
@abc.abstractmethod
def rx_c_apdu(self, apdu: bytes):
"""Receive a C-APDU from the reader/application."""
pass
def tx_r_apdu(self, apdu: Union[str, bytes]):
if isinstance(apdu, str):
apdu = h2b(apdu)
logger.info("R-APDU: %s" % b2h(apdu))
self.protocol.send_data(apdu)
class VpcdProtocolBase(Protocol):
# Prefixed couldn't be used as the this.length wouldn't be available in this case
construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)),
'data'/If(this.length > 1, Bytes(this.length)),
'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4)))
def __init__(self, vcard: VirtualCard):
self.recvBuffer = b''
self.connectionCorrupted = False
self.pduReadTimer = None
self.pduReadTimerSecs = 10
self.callLater = reactor.callLater
self.on = False
self.vcard = vcard
self.vcard.protocol = self
def dataReceived(self, data: bytes):
"""entry point where twisted tells us data was received."""
#logger.debug('Data received: %s' % b2h(data))
self.recvBuffer = self.recvBuffer + data
while True:
if self.connectionCorrupted:
return
msg = self.readMessage()
if msg is None:
break
self.endPDURead()
self.rawMessageReceived(msg)
if len(self.recvBuffer) > 0:
self.incompletePDURead()
def incompletePDURead(self):
"""We have an incomplete PDU in readBuffer, schedule pduReadTimer"""
if self.pduReadTimer and self.pduReadTimer.active():
return
self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout)
def endPDURead(self):
"""We completed reading a PDU, cancel the pduReadTimer."""
if self.pduReadTimer and self.pduReadTimer.active():
self.pduReadTimer.cancel()
def readMessage(self) -> Optional[bytes]:
"""read an entire [raw] message."""
pduLen = self._getMessageLength()
if pduLen is None:
return None
return self._getMessage(pduLen)
def _getMessageLength(self) -> Optional[int]:
if len(self.recvBuffer) < 2:
return None
return struct.unpack('!H', self.recvBuffer[:2])[0]
def _getMessage(self, pduLen: int) -> Optional[bytes]:
if len(self.recvBuffer) < pduLen+2:
return None
message = self.recvBuffer[:pduLen+2]
self.recvBuffer = self.recvBuffer[pduLen+2:]
return message
def onPDUReadTimeout(self):
logger.error('PDU read timed out. Buffer is now considered corrupt')
#self.coruptDataReceived
def rawMessageReceived(self, message: bytes):
"""Called once a complete binary vpcd message has been received."""
pdu = None
try:
pdu = VpcdProtocolBase.construct.parse(message)
except Exception as e:
logger.exception(e)
logger.critical('Received corrupt PDU %s' % b2h(message))
#self.corupDataRecvd()
else:
self.PDUReceived(pdu)
def PDUReceived(self, pdu):
logger.debug("Rx PDU: %s" % pdu)
if pdu['data']:
return self.on_rx_data(pdu)
else:
method = getattr(self, 'on_rx_' + pdu['ctrl'])
return method(pdu)
def on_rx_atr(self, pdu):
self.send_data(self.vcard.atr)
def on_rx_on(self, pdu):
if self.on:
return
else:
self.on = True
self.vcard.power_change(self.on)
def on_rx_reset(self, pdu):
self.vcard.reset()
def on_rx_off(self, pdu):
if not self.on:
return
else:
self.on = False
self.vcard.power_change(self.on)
def on_rx_data(self, pdu):
self.vcard.rx_c_apdu(pdu['data'])
def send_pdu(self, pdu):
logger.debug("Sending PDU: %s" % pdu)
encoded = VpcdProtocolBase.construct.build(pdu)
#logger.debug("Sending binary: %s" % b2h(encoded))
self.transport.write(encoded)
def send_data(self, data: Union[str, bytes]):
if isinstance(data, str):
data = h2b(data)
return self.send_pdu({'length': 0, 'ctrl': '', 'data': data})
def send_ctrl(self, ctrl: str):
return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''})
class VpcdProtocolClient(VpcdProtocolBase):
pass
class VpcdClientFactory(ReconnectingClientFactory):
def __init__(self, vcard_class: VirtualCard):
self.vcard_class = vcard_class
def startedConnecting(self, connector):
logger.debug('Started to connect')
def buildProtocol(self, addr):
logger.info('Connection established to %s' % addr)
self.resetDelay()
return VpcdProtocolClient(vcard = self.vcard_class())
def clientConnectionLost(self, connector, reason):
logger.warning('Connection lost (reason: %s)' % reason)
super().clientConnectionLost(connector, reason)
def clientConnectionFailed(self, connector, reason):
logger.warning('Connection failed (reason: %s)' % reason)
super().clientConnectionFailed(connector, reason)
#######################################################################
# Application
#######################################################################
from pprint import pprint as pp
from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator
from twisted.internet import reactor
from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService
from smpp.twisted.protocol import SMPPClientProtocol
from smpp.twisted.config import SMPPClientConfig
from smpp.pdu.operations import SubmitSM, DeliverSM
from smpp.pdu import pdu_types
from pySim.ota import OtaKeyset, OtaDialectSms
from pySim.utils import b2h, h2b
class MyVcard(VirtualCard):
def __init__(self, **kwargs):
super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs)
self.smpp_client = None
# KIC1 + KID1 of 8988211000000467285
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
KID3 = h2b('12110C78E678C25408233076AA033615')
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
self.ota_dialect = OtaDialectSms()
self.tar = h2b('B00011')
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
def ensure_smpp(self):
config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test')
if self.smpp_client:
return
self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp)
smpp = self.smpp_client.connectAndBind()
#self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp)
#d = self.smpp.connectTCP(config.host, config.port)
#d = self.smpp.connectAndBind()
#d.addCallback(self.forwardToClient, self.smpp)
def power_change(self, new_state: bool):
if new_state:
logger.info("POWER ON")
self.ensure_smpp()
else:
logger.info("POWER OFF")
def reset(self):
logger.info("RESET")
def rx_c_apdu(self, apdu: bytes):
pp(self.smpp_client.smpp)
logger.info("C-APDU: %s" % b2h(apdu))
# translate to Secured OTA RFM
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
# add user data header
tpdu = b'\x02\x70\x00' + secured
# send via SMPP
self.tx_sms_tpdu(tpdu)
#self.tx_r_apdu('9000')
def tx_sms_tpdu(self, tpdu: bytes):
"""Send a SMS TPDU via SMPP SubmitSM."""
dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class,
protocol_id=0x7f, short_message=tpdu)
self.smpp_client.smpp.sendDataRequest(submit)
def handleSmpp(self, smpp, pdu):
#logger.info("Received SMPP %s" % pdu)
data = pdu.params['short_message']
#logger.info("Received SMS Data %s" % b2h(data))
r = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data)
logger.info("Decoded SMPP %s" % r)
self.tx_r_apdu(r['last_response_data'] + r['last_status_word'])
if __name__ == '__main__':
import logging
logger = logging.getLogger(__name__)
import colorlog
log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
colorlog.basicConfig(level=logging.INFO, format = log_format)
logger = colorlog.getLogger()
from twisted.internet import reactor
host = 'localhost'
port = 35963
reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard))
reactor.run()