15 Commits

Author SHA1 Message Date
Vadim Yanitskiy
8f38800643 pySim-shell.py: make it work with cmd2 >= v2.4.0
In v2.3.0 both cmd2.{fg,bg} have been deprecated in favour of cmd2.{Fg,Bg}.
In v2.4.0 both cmd2.{fg,bg} have been removed.

See https://github.com/python-cmd2/cmd2/blob/master/CHANGELOG.md

Change-Id: I7ca95e85fc45ba66fd9ba6bea1fec2bae0e892c0
2022-09-05 23:27:43 +07:00
Vadim Yanitskiy
d5c1bec869 pySim-shell.py: make it work with cmd2 >= v2.0.0
* Argument 'use_ipython' was renamed to 'use_ipython'.
* Class 'Settable' requires the reference to the object that holds
  the settable attribute.

See https://github.com/python-cmd2/cmd2/releases/tag/2.0.0.

Change-Id: Ia38f0ca5c3f41395f8fe850adae37f5af4e3fe19
2022-09-05 23:27:43 +07:00
Vadim Yanitskiy
7d05e49f11 README.md: update installation instructions for Debian
Change-Id: Icefa33570a34960a4fff145f3c1b6585d867605c
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
98ea2a0f7a README.md: update git URLs (git -> https; gitea)
Change-Id: Ia86979f656557e442b0f432b0646aa7661c293e9
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
0a8d27ad7a README.md: list recent dependencies from requirements.txt
Change-Id: Ia486dbc7f630c1404e51728b5353cf5a0d643415
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
9550a0a45b README.md: fix module name: s/serial/pyserial/
Change-Id: I5fd308fb161cd5bd5f702845691296877e523248
2022-09-05 23:15:11 +07:00
Vadim Yanitskiy
b5eaf14991 README.md,requirements.txt: add missing construct version info
Change-Id: I90da0df431f0d7dbfa4aa428366fbf0e35db388f
Related: OS#5666
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
bdac3f61be Bump minimum required construct version to v2.9.51
With this version I can get all unittests passing:

  python -m unittest discover tests/

We're passing argument 'path' to stream_read_entire(), which was
added in [1] and become available since v2.9.51.

Change-Id: I4223c83570d333ad8d79bc2aa2d8bcc580156cff
Related: [1] bfe71315b027e18e62f00ec4de75043992fd2316 construct.git
Related: OS#5666
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
05d30eb666 construct: use Python's API for int<->bytes conversion
Argument 'signed' was added in [1] and become available since v2.10.63.
Therefore using bytes2integer() and integer2bytes() from construct.core
bumps the minimum required version of construct to v2.10.63.  For
instance, debian:bullseye currently ships v2.10.58.

There is no strict requirement to use construct's API, so let's use
Python's API instead.  This allows using older construct versions
from the v2.9.xx family.

Change-Id: I613dbfebe993f9c19003635371941710fc1b1236
Related: [1] 660ddbe2d9a351731ad7976351adbf413809a715 construct.git
Related: OS#5666
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
7800f9d356 contrib/jenkins.sh: install dependencies from requirements.txt
Change-Id: I99af496e9a3758ea624ca484f4fbc51b262ffaf4
2022-09-05 23:15:10 +07:00
Vadim Yanitskiy
7ce04a5a29 contrib/jenkins.sh: execute this script with -x and -e
-x  Print commands and their arguments as they are executed
  -e  Exit immediately if a command exits with a non-zero status

Change-Id: I13af70ef770936bec00b050b6c4f988e53ee2833
2022-09-05 23:15:06 +07:00
Vadim Yanitskiy
b3ea021b32 contrib/jenkins.sh: speed up pylint by running multiple processes
Use multiple processes to speed up pylint.  Specifying -j0 will
auto-detect the number of processors available to use.

On AMD Ryzen 7 3700X this significantly reduces the exec time:

  $ time python -m pylint -j1 ... pySim *.py
  real    0m12.409s
  user    0m12.149s
  sys     0m0.136s

  $ time python -m pylint -j0 ... pySim *.py
  real    0m5.541s
  user    0m58.496s
  sys     0m1.213s

Change-Id: I76d1696c27ddcab358526f807c4a0a7f0d4c85d4
2022-08-30 17:15:53 +07:00
Vadim Yanitskiy
12175d3588 contrib/jenkins.sh: pylint v2.15 is unstable, pin v2.14.5
pylint v2.15 is crashing, let's fall-back to a known to work v2.14.5.

Change-Id: Ie29be6ec6631ff2b3d8cd6b2dd9ac0ed8f505e4f
Related: https://github.com/PyCQA/pylint/issues/7375
Related: OS#5668
2022-08-30 17:12:03 +07:00
Christian Amsüss
59f3b1154f proactive: Send a Terminal Response automatically after a Fetch
Change-Id: I43bc994e7517b5907fb40a98d84797c54056c47d
2022-08-21 11:54:33 +00:00
Christian Amsüss
98552ef1bd proactive: Avoid clobbering the output of the command that triggered the FETCH
Change-Id: I2b794a5c5bc808b9703b4bc679c119341a0ed41c
2022-08-21 11:54:00 +00:00
10 changed files with 96 additions and 659 deletions

View File

@@ -23,10 +23,10 @@ Git Repository
You can clone from the official Osmocom git repository using
```
git clone git://git.osmocom.org/pysim.git
git clone https://gitea.osmocom.org/sim-card/pysim.git
```
There is a cgit interface at <https://git.osmocom.org/pysim>
There is a web interface at <https://gitea.osmocom.org/sim-card/pysim>.
Installation
@@ -35,18 +35,26 @@ Installation
Please install the following dependencies:
- pyscard
- serial
- pyserial
- pytlv
- cmd2 >= 1.3.0 but < 2.0.0
- jsonpath-ng
- construct
- construct >= 2.9.51
- bidict
- gsm0338
- pyyaml >= 5.1
- termcolor
- colorlog
Example for Debian:
```
apt-get install python3-pyscard python3-serial python3-pip python3-yaml python3-termcolor python3-colorlog
pip3 install -r requirements.txt
```sh
sudo apt-get install --no-install-recommends \
pcscd libpcsclite-dev \
python3 \
python3-setuptools \
python3-pyscard \
python3-pip
pip3 install --user -r requirements.txt
```
After installing all dependencies, the pySim applications ``pySim-read.py``, ``pySim-prog.py`` and ``pySim-shell.py`` may be started directly from the cloned repository.

View File

@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/sh -xe
# jenkins build helper script for pysim. This is how we build on jenkins.osmocom.org
#
# environment variables:
@@ -6,8 +6,6 @@
# * PUBLISH: upload manuals after building if set to "1" (ignored without WITH_MANUALS = "1")
#
set -e
if [ ! -d "./pysim-testdata/" ] ; then
echo "###############################################"
echo "Please call from pySim-prog top directory"
@@ -17,16 +15,7 @@ fi
virtualenv -p python3 venv --system-site-packages
. venv/bin/activate
pip install pytlv
pip install 'pyyaml>=5.1'
pip install cmd2==1.5
pip install jsonpath-ng
pip install construct
pip install bidict
pip install gsm0338
pip install termcolor
pip install colorlog
pip install pycryptodome
pip install -r requirements.txt
# Execute automatically discovered unit tests first
python -m unittest discover -v -s tests/
@@ -37,8 +26,8 @@ python -m unittest discover -v -s tests/
# Ignore E0401: import-error
# pySim/utils.py:276: E0401: Unable to import 'Crypto.Cipher' (import-error)
# pySim/utils.py:277: E0401: Unable to import 'Crypto.Util.strxor' (import-error)
pip install pylint
python -m pylint --errors-only \
pip install pylint==2.14.5 # FIXME: 2.15 is crashing, see OS#5668
python -m pylint -j0 --errors-only \
--disable E1102 \
--disable E0401 \
--enable W0301 \

View File

@@ -23,7 +23,7 @@ import json
import traceback
import cmd2
from cmd2 import style, fg
from cmd2 import style, Fg
from cmd2 import CommandSet, with_default_category, with_argparser
import argparse
@@ -134,8 +134,8 @@ class PysimApp(cmd2.Cmd):
def __init__(self, card, rs, sl, ch, script=None):
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
use_ipython=True, auto_load_commands=False, startup_script=script)
self.intro = style('Welcome to pySim-shell!', fg=fg.red)
auto_load_commands=False, startup_script=script)
self.intro = style('Welcome to pySim-shell!', fg=Fg.RED)
self.default_category = 'pySim-shell built-in commands'
self.card = None
self.rs = None
@@ -145,16 +145,16 @@ class PysimApp(cmd2.Cmd):
self.ch = ch
self.numeric_path = False
self.add_settable(cmd2.Settable('numeric_path', bool, 'Print File IDs instead of names',
self.add_settable(cmd2.Settable('numeric_path', bool, 'Print File IDs instead of names', self,
onchange_cb=self._onchange_numeric_path))
self.conserve_write = True
self.add_settable(cmd2.Settable('conserve_write', bool, 'Read and compare before write',
self.add_settable(cmd2.Settable('conserve_write', bool, 'Read and compare before write', self,
onchange_cb=self._onchange_conserve_write))
self.json_pretty_print = True
self.add_settable(cmd2.Settable('json_pretty_print',
bool, 'Pretty-Print JSON output'))
bool, 'Pretty-Print JSON output', self))
self.apdu_trace = False
self.add_settable(cmd2.Settable('apdu_trace', bool, 'Trace and display APDUs exchanged with card',
self.add_settable(cmd2.Settable('apdu_trace', bool, 'Trace and display APDUs exchanged with card', self,
onchange_cb=self._onchange_apdu_trace))
self.equip(card, rs)
@@ -287,23 +287,23 @@ class PysimApp(cmd2.Cmd):
sys.stderr = self._stderr_backup
def _show_failure_sign(self):
self.poutput(style(" +-------------+", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" + ### +", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" + ## ## +", fg=fg.bright_red))
self.poutput(style(" +-------------+", fg=fg.bright_red))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ### +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_RED))
self.poutput("")
def _show_success_sign(self):
self.poutput(style(" +-------------+", fg=fg.bright_green))
self.poutput(style(" + ## +", fg=fg.bright_green))
self.poutput(style(" + ## +", fg=fg.bright_green))
self.poutput(style(" + # ## +", fg=fg.bright_green))
self.poutput(style(" + ## # +", fg=fg.bright_green))
self.poutput(style(" + ## +", fg=fg.bright_green))
self.poutput(style(" +-------------+", fg=fg.bright_green))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + # ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## # +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
self.poutput(style(" +-------------+", fg=Fg.LIGHT_GREEN))
self.poutput("")
def _process_card(self, first, script_path):

View File

@@ -2,7 +2,7 @@ from construct.lib.containers import Container, ListContainer
from construct.core import EnumIntegerString
import typing
from construct import *
from construct.core import evaluate, bytes2integer, integer2bytes, BitwisableString
from construct.core import evaluate, BitwisableString
from construct.lib import integertypes
from pySim.utils import b2h, h2b, swap_nibbles
import gsm0338
@@ -219,7 +219,7 @@ class GreedyInteger(Construct):
if evaluate(self.swapped, context):
data = swapbytes(data)
try:
return bytes2integer(data, self.signed)
return int.from_bytes(data, byteorder='big', signed=self.signed)
except ValueError as e:
raise IntegerError(str(e), path=path)
@@ -248,7 +248,7 @@ class GreedyInteger(Construct):
raise IntegerError(f"value {obj} is not an integer", path=path)
length = self.__bytes_required(obj, self.minlen)
try:
data = integer2bytes(obj, length, self.signed)
data = obj.to_bytes(length, byteorder='big', signed=self.signed)
except ValueError as e:
raise IntegerError(str(e), path=path)
if evaluate(self.swapped, context):

View File

@@ -1,461 +0,0 @@
"""Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115."""
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.construct import *
from pySim.utils import b2h
from pySim.sms import UserDataHeader
from construct import *
from bidict import bidict
import zlib
import abc
import struct
from typing import Optional
# ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS
# 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP
# CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data
# CAT_TP TCP/IP SMS
# CPI 0x01 0x01 =IEIa=70,len=0
# CHI NULL NULL NULL
# CPI, CPL and CHL included in RC/CC/DS true true
# RPI 0x02 0x02 =IEIa=71,len=0
# RHI NULL NULL
# RPI, RPL and RHL included in RC/CC/DS true true
# packet-id 0-bf,ff 0-bf,ff
# identification packet false 102 225 tbl 6
# KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK
# TS 102 225 Table 5
ota_status_codes = bidict({
0x00: 'PoR OK',
0x01: 'RC/CC/DS failed',
0x02: 'CNTR low',
0x03: 'CNTR high',
0x04: 'CNTR blocked',
0x05: 'Ciphering error',
0x06: 'Unidentified security error',
0x07: 'Insufficient memory',
0x08: 'more time',
0x09: 'TAR unknown',
0x0a: 'Insufficient security level',
0x0b: 'Actual Response in SMS-SUBMIT', # 31.115
0x0c: 'Actual Response in USSD', # 31.115
})
# ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7
ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3,
cntr_blocked=4, ciphering_error=5, undefined_security_error=6,
insufficient_memory=7, more_time_needed=8, tar_unknown=9,
insufficient_security_level=0x0A,
actual_response_sms_submit=0x0B,
actual_response_ussd=0x0C)
# ETSI TS 102 226 Section 5.1.2
CompactRemoteResp = Struct('number_of_commands'/Int8ub,
'last_status_word'/HexAdapter(Bytes(2)),
'last_response_data'/HexAdapter(GreedyBytes))
RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
SPI = BitStruct( # first octet
Padding(3),
'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
counter_must_be_higher=2, counter_must_be_lower=3),
'ciphering'/Flag,
'rc_cc_ds'/RC_CC_DS,
# second octet
Padding(2),
'por_in_submit'/Flag,
'por_shall_be_ciphered'/Flag,
'por_rc_cc_ds'/RC_CC_DS,
'por'/Enum(BitsInteger(2), no_por=0,
por_required=1, por_only_when_error=2)
)
# TS 102 225 Section 5.1.2
KIC = BitStruct('key'/BitsInteger(4),
'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
aes_cbc=2)
)
# TS 102 225 Section 5.1.3.1
KID_CC = BitStruct('key'/BitsInteger(4),
'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
aes_cmac=2)
)
# TS 102 225 Section 5.1.3.2
KID_RC = BitStruct('key'/BitsInteger(4),
'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3)
)
SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub,
'cmd_hdr_len'/Int8ub,
'spi'/SPI,
'kic'/KIC,
'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }),
'tar'/Bytes(3),
'secured_data'/GreedyBytes)
class OtaKeyset:
"""The OTA related data (key material, counter) to be used in encrypt/decrypt."""
def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes,
algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0):
self.algo_crypt = algo_crypt
self.kic = bytes(kic)
self.kic_idx = kic_idx
self.algo_auth = algo_auth
self.kid = bytes(kid)
self.kid_idx = kid_idx
self.cntr = cntr
@property
def auth(self):
"""Return an instance of the matching OtaAlgoAuth."""
return OtaAlgoAuth.fromKeyset(self)
@property
def crypt(self):
"""Return an instance of the matching OtaAlgoCrypt."""
return OtaAlgoCrypt.fromKeyset(self)
class OtaCheckError(Exception):
pass
class OtaDialect(abc.ABC):
"""Base Class for OTA dialects such as SMS, BIP, ..."""
def _compute_sig_len(self, spi:SPI):
if spi['rc_cc_ds'] == 'no_rc_cc_ds':
return 0
elif spi['rc_cc_ds'] == 'rc': # CRC-32
return 4
elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
# TODO: this is not entirely correct, as in AES case it could be 4 or 8
return 8
else:
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
@abc.abstractmethod
def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes:
pass
@abc.abstractmethod
def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]):
"""Decode a response into a response packet and, if indicted (by a
response status of `"por_ok"`) a decoded response.
The response packet's common characteristics are not fully determined,
and (so far) completely proprietary per dialect."""
pass
from Crypto.Cipher import DES, DES3, AES
from Crypto.Hash import CMAC
class OtaAlgo(abc.ABC):
iv = b'\x00\x00\x00\x00\x00\x00\x00\x00'
blocksize = None
enum_name = None
@staticmethod
def _get_padding(in_len: int, multiple: int, padding: int = 0):
"""Return padding bytes towards multiple of N."""
if in_len % multiple == 0:
return b''
pad_cnt = multiple - (in_len % multiple)
return b'\x00' * pad_cnt
@staticmethod
def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0):
"""Pad input bytes to multiple of N."""
return indat + OtaAlgo._get_padding(len(indat), multiple, padding)
def pad_to_blocksize(self, indat: bytes, padding: int = 0):
"""Pad the given input data to multiple of the cipher block size."""
return self._pad_to_multiple(indat, self.blocksize, padding)
def __init__(self, otak: OtaKeyset):
self.otak = otak
def __str__(self):
return self.__class__.__name__
class OtaAlgoCrypt(OtaAlgo, abc.ABC):
def __init__(self, otak: OtaKeyset):
if self.enum_name != otak.algo_crypt:
raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
super().__init__(otak)
def encrypt(self, data:bytes) -> bytes:
"""Encrypt given input bytes using the key material given in constructor."""
padded_data = self.pad_to_blocksize(data)
return self._encrypt(data)
def decrypt(self, data:bytes) -> bytes:
"""Decrypt given input bytes using the key material given in constructor."""
return self._decrypt(data)
@abc.abstractmethod
def _encrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@abc.abstractmethod
def _decrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@classmethod
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
"""Resolve the class for the encryption algorithm of otak and instantiate it."""
for subc in cls.__subclasses__():
if subc.enum_name == otak.algo_crypt:
return subc(otak)
raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth)
class OtaAlgoAuth(OtaAlgo, abc.ABC):
def __init__(self, otak: OtaKeyset):
if self.enum_name != otak.algo_auth:
raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
super().__init__(otak)
def sign(self, data:bytes) -> bytes:
"""Compute the CC/CR check bytes for the input data using key material
given in constructor."""
padded_data = self.pad_to_blocksize(data)
sig = self._sign(padded_data)
return sig
def check_sig(self, data:bytes, cc_received:bytes):
"""Compute the CC/CR check bytes for the input data and compare against cc_received."""
cc = self.sign(data)
if cc_received != cc:
raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc)))
@abc.abstractmethod
def _sign(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@classmethod
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
"""Resolve the class for the authentication algorithm of otak and instantiate it."""
for subc in cls.__subclasses__():
if subc.enum_name == otak.algo_auth:
return subc(otak)
raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth)
class OtaAlgoCryptDES(OtaAlgoCrypt):
"""DES is insecure. For backwards compatibility with pre-Rel8"""
name = 'DES'
enum_name = 'single_des'
blocksize = 8
def _encrypt(self, data:bytes) -> bytes:
cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
return cipher.encrypt(data)
def _decrypt(self, data:bytes) -> bytes:
cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
return cipher.decrypt(data)
class OtaAlgoAuthDES(OtaAlgoAuth):
"""DES is insecure. For backwards compatibility with pre-Rel8"""
name = 'DES'
enum_name = 'single_des'
blocksize = 8
def _sign(self, data:bytes) -> bytes:
cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv)
ciph = cipher.encrypt(data)
return ciph[len(ciph) - 8:]
class OtaAlgoCryptDES3(OtaAlgoCrypt):
name = '3DES'
enum_name = 'triple_des_cbc2'
blocksize = 8
def _encrypt(self, data:bytes) -> bytes:
cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
return cipher.encrypt(data)
def _decrypt(self, data:bytes) -> bytes:
cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
return cipher.decrypt(data)
class OtaAlgoAuthDES3(OtaAlgoAuth):
name = '3DES'
enum_name = 'triple_des_cbc2'
blocksize = 8
def _sign(self, data:bytes) -> bytes:
cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv)
ciph = cipher.encrypt(data)
return ciph[len(ciph) - 8:]
class OtaAlgoCryptAES(OtaAlgoCrypt):
name = 'AES'
enum_name = 'aes_cbc'
blocksize = 16 # TODO: is this needed?
def _encrypt(self, data:bytes) -> bytes:
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
return cipher.encrypt(data)
def _decrypt(self, data:bytes) -> bytes:
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
return cipher.decrypt(data)
class OtaAlgoAuthAES(OtaAlgoAuth):
name = 'AES'
enum_name = 'aes_cmac'
blocksize = 16 # TODO: is this needed?
def _sign(self, data:bytes) -> bytes:
cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8)
cmac.update(data)
ciph = cmac.digest()
return ciph[len(ciph) - 8:]
class OtaDialectSms(OtaDialect):
"""OTA dialect for SMS based transport, as described in 3GPP TS 31.115."""
SmsResponsePacket = Struct('rpl'/Int16ub,
'rhl'/Int8ub,
'tar'/Bytes(3),
'cntr'/Bytes(5),
'pcntr'/Int8ub,
'response_status'/ResponseStatus,
'cc_rc'/Bytes(this.rhl-10),
'secured_data'/GreedyBytes)
def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes:
# length of signature in octets
len_sig = self._compute_sig_len(spi)
pad_cnt = 0
if spi['ciphering']: # ciphering is requested
# append padding bytes to end up with blocksize
len_cipher = 6 + len_sig + len(apdu)
padding = otak.crypt._get_padding(len_cipher, otak.crypt.blocksize)
pad_cnt = len(padding)
apdu += padding
kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt}
kid = {'key': otak.kid_idx, 'algo': otak.algo_auth}
# CHL = number of octets from (and including) SPI to the end of RC/CC/DS
# 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1)
chl = 13 + len_sig
# CHL + SPI (+ KIC + KID)
c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3))
part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
#print("part_head: %s" % b2h(part_head))
# CNTR + PCNTR (CNTR not used)
part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
#print("part_cnt: %s" % b2h(part_cnt))
envelope_data = part_head + part_cnt + apdu
#print("envelope_data: %s" % b2h(envelope_data))
# 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
# CPL from and including CPI to end of secured data, including any padding for ciphering
cpl = len(envelope_data) + len_sig
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
#print("envelope_data with cpl: %s" % b2h(envelope_data))
if spi['rc_cc_ds'] == 'cc':
cc = otak.auth.sign(envelope_data)
envelope_data = part_cnt + cc + apdu
elif spi['rc_cc_ds'] == 'rc':
# CRC32
crc32 = zlib.crc32(envelope_data) & 0xffffffff
envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu
elif spi['rc_cc_ds'] == 'no_rc_cc_ds':
envelope_data = part_cnt + apdu
else:
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
#print("envelope_data with sig: %s" % b2h(envelope_data))
# encrypt as needed
if spi['ciphering']: # ciphering is requested
ciph = otak.crypt.encrypt(envelope_data)
envelope_data = part_head + ciph
# prefix with another CPL
cpl = len(envelope_data)
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
else:
envelope_data = part_head + envelope_data
#print("envelope_data: %s" % b2h(envelope_data))
return envelope_data
def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]):
if isinstance(data, str):
data = h2b(data)
# plain-text POR: 027100000e0ab000110000000000000001612f
# UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS
# 02 71 00 000e 0a b00011 0000000000 00 00 01 612f
# POR with CC: 027100001612b000110000000000000055f47118381175fb01612f
# POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
if data[0] != 0x02:
raise ValueError('Unexpected UDL=0x%02x' % data[0])
udhd, remainder = UserDataHeader.fromBytes(data)
if not udhd.has_ie(0x71):
raise ValueError('RPI 0x71 not found in UDH')
rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered
res = self.SmsResponsePacket.parse(remainder)
if spi['por_shall_be_ciphered']:
# decrypt
ciphered_part = remainder[6:]
deciph = otak.crypt.decrypt(ciphered_part)
temp_data = rph_rhl_tar + deciph
res = self.SmsResponsePacket.parse(temp_data)
# remove specified number of padding bytes, if any
if res['pcntr'] != 0:
# this conditional is needed as python [:-0] renders an empty return!
res['secured_data'] = res['secured_data'][:-res['pcntr']]
remainder = temp_data
# is there a CC/RC present?
len_sig = res['rhl'] - 10
if spi['por_rc_cc_ds'] == 'no_rc_cc_ds':
if len_sig:
raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig)
elif spi['por_rc_cc_ds'] == 'cc':
# verify signature
# UDH is part of CC/RC!
udh = data[:3]
# RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC
rpl_rhl_tar_cntr_pcntr_sts = remainder[:13]
# remove the CC/RC bytes
temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:]
cc = otak.auth.check_sig(temp_data, res['cc_rc'])
# TODO: CRC
else:
raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
# TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
if res.response_status == 'por_ok':
dec = CompactRemoteResp.parse(res['secured_data'])
else:
dec = None
return (res, dec)

View File

@@ -1,53 +0,0 @@
"""Code related to SMS Encoding/Decoding"""
# simplistic SMS T-PDU code, as unfortunately nobody bothered to port the python smspdu
# module to python3, and I gave up after >= 3 hours of trying and failing to do so
# (C) 2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import typing
from construct import Int8ub, Bytes
from construct import Struct, Tell, this, RepeatUntil
from pySim.utils import Hexstr, h2b, b2h
BytesOrHex = typing.Union[Hexstr, bytes]
class UserDataHeader:
# a single IE in the user data header
ie_c = Struct('offset'/Tell, 'iei'/Int8ub, 'length'/Int8ub, 'data'/Bytes(this.length))
# parser for the full UDH: Length octet followed by sequence of IEs
_construct = Struct('udhl'/Int8ub,
# FIXME: somehow the below lambda is not working, we always only get the first IE?
'ies'/RepeatUntil(lambda obj,lst,ctx: ctx._io.tell() > 1+this.udhl, ie_c))
def __init__(self, ies=[]):
self.ies = ies
def __repr__(self) -> str:
return 'UDH(%r)' % self.ies
def has_ie(self, iei:int) -> bool:
for ie in self.ies:
if ie['iei'] == iei:
return True
return False
@classmethod
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
if isinstance(inb, str):
inb = h2b(inb)
res = cls._construct.parse(inb)
return cls(res['ies']), inb[1+res['udhl']:]

View File

@@ -10,7 +10,7 @@ from typing import Optional, Tuple
from pySim.exceptions import *
from pySim.construct import filter_dict
from pySim.utils import sw_match, b2h, h2b, i2h, Hexstr
from pySim.cat import ProactiveCommand
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
#
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
@@ -42,10 +42,7 @@ class ProactiveHandler(abc.ABC):
"""Abstract base class representing the interface of some code that handles
the proactive commands, as returned by the card in responses to the FETCH
command."""
def receive_fetch_raw(self, payload: Hexstr):
# parse the proactive command
pcmd = ProactiveCommand()
parsed = pcmd.from_tlv(h2b(payload))
def receive_fetch_raw(self, pcmd: ProactiveCommand, parsed: Hexstr):
# try to find a generic handler like handle_SendShortMessage
handle_name = 'handle_%s' % type(parsed).__name__
if hasattr(self, handle_name):
@@ -160,13 +157,57 @@ class LinkBase(abc.ABC):
sw : string (in hex) of status word (ex. "9000")
"""
rv = self.send_apdu(pdu)
last_sw = rv[1]
while sw == '9000' and sw_match(rv[1], '91xx'):
while sw == '9000' and sw_match(last_sw, '91xx'):
# It *was* successful after all -- the extra pieces FETCH handled
# need not concern the caller.
rv = (rv[0], '9000')
# proactive sim as per TS 102 221 Setion 7.4.2
rv = self.send_apdu_checksw('80120000' + rv[1][2:], sw)
print("FETCH: %s" % rv[0])
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
# Setting this in case we later decide not to send a terminal
# response immediately unconditionally -- the card may still have
# something pending even though the last command was not processed
# yet.
last_sw = fetch_rv[1]
# parse the proactive command
pcmd = ProactiveCommand()
parsed = pcmd.from_tlv(h2b(fetch_rv[0]))
print("FETCH: %s (%s)" % (fetch_rv[0], type(parsed).__name__))
result = Result()
if self.proactive_handler:
self.proactive_handler.receive_fetch_raw(rv[0])
# Extension point: If this does return a list of TLV objects,
# they could be appended after the Result; if the first is a
# Result, that cuold replace the one built here.
self.proactive_handler.receive_fetch_raw(pcmd, parsed)
result.from_dict({'general_result': 'performed_successfully', 'additional_information': ''})
else:
result.from_dict({'general_result': 'command_beyond_terminal_capability', 'additional_information': ''})
# Send response immediately, thus also flushing out any further
# proactive commands that the card already wants to send
#
# Structure as per TS 102 223 V4.4.0 Section 6.8
# The Command Details are echoed from the command that has been processed.
(command_details,) = [c for c in pcmd.decoded.children if isinstance(c, CommandDetails)]
# The Device Identities are fixed. (TS 102 223 V4.0.0 Section 6.8.2)
device_identities = DeviceIdentities()
device_identities.from_dict({'source_dev_id': 'terminal', 'dest_dev_id': 'uicc'})
# Testing hint: The value of tail does not influence the behavior
# of an SJA2 that sent ans SMS, so this is implemented only
# following TS 102 223, and not fully tested.
tail = command_details.to_tlv() + device_identities.to_tlv() + result.to_tlv()
# Testing hint: In contrast to the above, this part is positively
# essential to get the SJA2 to provide the later parts of a
# multipart SMS in response to an OTA RFM command.
terminal_response = '80140000' + b2h(len(tail).to_bytes(1, 'big') + tail)
terminal_response_rv = self.send_apdu(terminal_response)
last_sw = terminal_response_rv[1]
if not sw_match(rv[1], sw):
raise SwMatchError(rv[1], sw.lower(), self.sw_interpreter)
return rv

View File

@@ -3,10 +3,9 @@ pyserial
pytlv
cmd2==1.5
jsonpath-ng
construct
construct>=2.9.51
bidict
gsm0338
pyyaml>=5.1
termcolor
colorlog
pycryptodome

View File

@@ -14,12 +14,11 @@ setup(
"pytlv",
"cmd2 >= 1.3.0, < 2.0.0",
"jsonpath-ng",
"construct >= 2.9",
"construct >= 2.9.51",
"bidict",
"gsm0338",
"termcolor",
"colorlog",
"pycryptodome"
"colorlog"
],
scripts=[
'pySim-prog.py',

View File

@@ -1,85 +0,0 @@
#!/usr/bin/env python3
import unittest
from pySim.utils import h2b, b2h
from pySim.ota import *
class Test_SMS_3DES(unittest.TestCase):
tar = h2b('b00000')
"""Test the OtaDialectSms for 3DES algorithms."""
def __init__(self, foo, **kwargs):
super().__init__(foo, **kwargs)
# KIC1 + KID1 of 8988211000000467285
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
KID3 = h2b('12110C78E678C25408233076AA033615')
self.od = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
self.dialect = OtaDialectSms()
self.spi_base = {
'counter':'no_counter',
'ciphering': True,
'rc_cc_ds': 'cc',
'por_in_submit':False,
'por': 'por_required',
'por_shall_be_ciphered': True,
'por_rc_cc_ds': 'cc',
}
def _check_response(self, r):
self.assertEqual(r['number_of_commands'], 1)
self.assertEqual(r['last_status_word'], '612f')
self.assertEqual(r['last_response_data'], u'')
self.assertEqual(r['response_status'], 'por_ok')
def test_resp_3des_ciphered(self):
spi = self.spi_base
spi['por_shall_be_ciphered'] = True
spi['por_rc_cc_ds'] = 'cc'
r = self.dialect.decode_resp(self.od, spi, '027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c')
self._check_response(r)
def test_resp_3des_signed(self):
spi = self.spi_base
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'cc'
r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb01612f')
self._check_response(r)
def test_resp_3des_signed_err(self):
"""Expect an OtaCheckError exception if the computed CC != received CC"""
spi = self.spi_base
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'cc'
with self.assertRaises(OtaCheckError) as context:
r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb02612f')
self.assertTrue('!= Computed CC' in str(context.exception))
def test_resp_3des_none(self):
spi = self.spi_base
spi['por_shall_be_ciphered'] = False
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
r = self.dialect.decode_resp(self.od, spi, '027100000e0ab000110000000000000001612f')
self._check_response(r)
def test_cmd_3des_ciphered(self):
spi = self.spi_base
spi['ciphering'] = True
spi['rc_cc_ds'] = 'no_rc_cc_ds'
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
self.assertEqual(b2h(r), '00180d04193535b000000c8478b552a4ffc5a8f099b83cad7123')
def test_cmd_3des_signed(self):
spi = self.spi_base
spi['ciphering'] = False
spi['rc_cc_ds'] = 'cc'
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
self.assertEqual(b2h(r), '1502193535b00000000000000000072ea17bdb72060e00a40000023f00')
def test_cmd_3des_none(self):
spi = self.spi_base
spi['ciphering'] = False
spi['rc_cc_ds'] = 'no_rc_cc_ds'
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
self.assertEqual(b2h(r), '0d00193535b0000000000000000000a40000023f00')