mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-17 02:48:34 +03:00
Compare commits
15 Commits
chrysn/for
...
fixeria/cm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f38800643 | ||
|
|
d5c1bec869 | ||
|
|
7d05e49f11 | ||
|
|
98ea2a0f7a | ||
|
|
0a8d27ad7a | ||
|
|
9550a0a45b | ||
|
|
b5eaf14991 | ||
|
|
bdac3f61be | ||
|
|
05d30eb666 | ||
|
|
7800f9d356 | ||
|
|
7ce04a5a29 | ||
|
|
b3ea021b32 | ||
|
|
12175d3588 | ||
|
|
59f3b1154f | ||
|
|
98552ef1bd |
22
README.md
22
README.md
@@ -23,10 +23,10 @@ Git Repository
|
||||
|
||||
You can clone from the official Osmocom git repository using
|
||||
```
|
||||
git clone git://git.osmocom.org/pysim.git
|
||||
git clone https://gitea.osmocom.org/sim-card/pysim.git
|
||||
```
|
||||
|
||||
There is a cgit interface at <https://git.osmocom.org/pysim>
|
||||
There is a web interface at <https://gitea.osmocom.org/sim-card/pysim>.
|
||||
|
||||
|
||||
Installation
|
||||
@@ -35,18 +35,26 @@ Installation
|
||||
Please install the following dependencies:
|
||||
|
||||
- pyscard
|
||||
- serial
|
||||
- pyserial
|
||||
- pytlv
|
||||
- cmd2 >= 1.3.0 but < 2.0.0
|
||||
- jsonpath-ng
|
||||
- construct
|
||||
- construct >= 2.9.51
|
||||
- bidict
|
||||
- gsm0338
|
||||
- pyyaml >= 5.1
|
||||
- termcolor
|
||||
- colorlog
|
||||
|
||||
Example for Debian:
|
||||
```
|
||||
apt-get install python3-pyscard python3-serial python3-pip python3-yaml python3-termcolor python3-colorlog
|
||||
pip3 install -r requirements.txt
|
||||
```sh
|
||||
sudo apt-get install --no-install-recommends \
|
||||
pcscd libpcsclite-dev \
|
||||
python3 \
|
||||
python3-setuptools \
|
||||
python3-pyscard \
|
||||
python3-pip
|
||||
pip3 install --user -r requirements.txt
|
||||
```
|
||||
|
||||
After installing all dependencies, the pySim applications ``pySim-read.py``, ``pySim-prog.py`` and ``pySim-shell.py`` may be started directly from the cloned repository.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/sh
|
||||
#!/bin/sh -xe
|
||||
# jenkins build helper script for pysim. This is how we build on jenkins.osmocom.org
|
||||
#
|
||||
# environment variables:
|
||||
@@ -6,8 +6,6 @@
|
||||
# * PUBLISH: upload manuals after building if set to "1" (ignored without WITH_MANUALS = "1")
|
||||
#
|
||||
|
||||
set -e
|
||||
|
||||
if [ ! -d "./pysim-testdata/" ] ; then
|
||||
echo "###############################################"
|
||||
echo "Please call from pySim-prog top directory"
|
||||
@@ -17,16 +15,7 @@ fi
|
||||
|
||||
virtualenv -p python3 venv --system-site-packages
|
||||
. venv/bin/activate
|
||||
pip install pytlv
|
||||
pip install 'pyyaml>=5.1'
|
||||
pip install cmd2==1.5
|
||||
pip install jsonpath-ng
|
||||
pip install construct
|
||||
pip install bidict
|
||||
pip install gsm0338
|
||||
pip install termcolor
|
||||
pip install colorlog
|
||||
pip install pycryptodome
|
||||
pip install -r requirements.txt
|
||||
|
||||
# Execute automatically discovered unit tests first
|
||||
python -m unittest discover -v -s tests/
|
||||
@@ -37,8 +26,8 @@ python -m unittest discover -v -s tests/
|
||||
# Ignore E0401: import-error
|
||||
# pySim/utils.py:276: E0401: Unable to import 'Crypto.Cipher' (import-error)
|
||||
# pySim/utils.py:277: E0401: Unable to import 'Crypto.Util.strxor' (import-error)
|
||||
pip install pylint
|
||||
python -m pylint --errors-only \
|
||||
pip install pylint==2.14.5 # FIXME: 2.15 is crashing, see OS#5668
|
||||
python -m pylint -j0 --errors-only \
|
||||
--disable E1102 \
|
||||
--disable E0401 \
|
||||
--enable W0301 \
|
||||
|
||||
@@ -23,7 +23,7 @@ import json
|
||||
import traceback
|
||||
|
||||
import cmd2
|
||||
from cmd2 import style, fg
|
||||
from cmd2 import style, Fg
|
||||
from cmd2 import CommandSet, with_default_category, with_argparser
|
||||
import argparse
|
||||
|
||||
@@ -134,8 +134,8 @@ class PysimApp(cmd2.Cmd):
|
||||
|
||||
def __init__(self, card, rs, sl, ch, script=None):
|
||||
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
|
||||
use_ipython=True, auto_load_commands=False, startup_script=script)
|
||||
self.intro = style('Welcome to pySim-shell!', fg=fg.red)
|
||||
auto_load_commands=False, startup_script=script)
|
||||
self.intro = style('Welcome to pySim-shell!', fg=Fg.RED)
|
||||
self.default_category = 'pySim-shell built-in commands'
|
||||
self.card = None
|
||||
self.rs = None
|
||||
@@ -145,16 +145,16 @@ class PysimApp(cmd2.Cmd):
|
||||
self.ch = ch
|
||||
|
||||
self.numeric_path = False
|
||||
self.add_settable(cmd2.Settable('numeric_path', bool, 'Print File IDs instead of names',
|
||||
self.add_settable(cmd2.Settable('numeric_path', bool, 'Print File IDs instead of names', self,
|
||||
onchange_cb=self._onchange_numeric_path))
|
||||
self.conserve_write = True
|
||||
self.add_settable(cmd2.Settable('conserve_write', bool, 'Read and compare before write',
|
||||
self.add_settable(cmd2.Settable('conserve_write', bool, 'Read and compare before write', self,
|
||||
onchange_cb=self._onchange_conserve_write))
|
||||
self.json_pretty_print = True
|
||||
self.add_settable(cmd2.Settable('json_pretty_print',
|
||||
bool, 'Pretty-Print JSON output'))
|
||||
bool, 'Pretty-Print JSON output', self))
|
||||
self.apdu_trace = False
|
||||
self.add_settable(cmd2.Settable('apdu_trace', bool, 'Trace and display APDUs exchanged with card',
|
||||
self.add_settable(cmd2.Settable('apdu_trace', bool, 'Trace and display APDUs exchanged with card', self,
|
||||
onchange_cb=self._onchange_apdu_trace))
|
||||
|
||||
self.equip(card, rs)
|
||||
@@ -287,23 +287,23 @@ class PysimApp(cmd2.Cmd):
|
||||
sys.stderr = self._stderr_backup
|
||||
|
||||
def _show_failure_sign(self):
|
||||
self.poutput(style(" +-------------+", fg=fg.bright_red))
|
||||
self.poutput(style(" + ## ## +", fg=fg.bright_red))
|
||||
self.poutput(style(" + ## ## +", fg=fg.bright_red))
|
||||
self.poutput(style(" + ### +", fg=fg.bright_red))
|
||||
self.poutput(style(" + ## ## +", fg=fg.bright_red))
|
||||
self.poutput(style(" + ## ## +", fg=fg.bright_red))
|
||||
self.poutput(style(" +-------------+", fg=fg.bright_red))
|
||||
self.poutput(style(" +-------------+", fg=Fg.LIGHT_RED))
|
||||
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
|
||||
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
|
||||
self.poutput(style(" + ### +", fg=Fg.LIGHT_RED))
|
||||
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
|
||||
self.poutput(style(" + ## ## +", fg=Fg.LIGHT_RED))
|
||||
self.poutput(style(" +-------------+", fg=Fg.LIGHT_RED))
|
||||
self.poutput("")
|
||||
|
||||
def _show_success_sign(self):
|
||||
self.poutput(style(" +-------------+", fg=fg.bright_green))
|
||||
self.poutput(style(" + ## +", fg=fg.bright_green))
|
||||
self.poutput(style(" + ## +", fg=fg.bright_green))
|
||||
self.poutput(style(" + # ## +", fg=fg.bright_green))
|
||||
self.poutput(style(" + ## # +", fg=fg.bright_green))
|
||||
self.poutput(style(" + ## +", fg=fg.bright_green))
|
||||
self.poutput(style(" +-------------+", fg=fg.bright_green))
|
||||
self.poutput(style(" +-------------+", fg=Fg.LIGHT_GREEN))
|
||||
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
|
||||
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
|
||||
self.poutput(style(" + # ## +", fg=Fg.LIGHT_GREEN))
|
||||
self.poutput(style(" + ## # +", fg=Fg.LIGHT_GREEN))
|
||||
self.poutput(style(" + ## +", fg=Fg.LIGHT_GREEN))
|
||||
self.poutput(style(" +-------------+", fg=Fg.LIGHT_GREEN))
|
||||
self.poutput("")
|
||||
|
||||
def _process_card(self, first, script_path):
|
||||
|
||||
@@ -2,7 +2,7 @@ from construct.lib.containers import Container, ListContainer
|
||||
from construct.core import EnumIntegerString
|
||||
import typing
|
||||
from construct import *
|
||||
from construct.core import evaluate, bytes2integer, integer2bytes, BitwisableString
|
||||
from construct.core import evaluate, BitwisableString
|
||||
from construct.lib import integertypes
|
||||
from pySim.utils import b2h, h2b, swap_nibbles
|
||||
import gsm0338
|
||||
@@ -219,7 +219,7 @@ class GreedyInteger(Construct):
|
||||
if evaluate(self.swapped, context):
|
||||
data = swapbytes(data)
|
||||
try:
|
||||
return bytes2integer(data, self.signed)
|
||||
return int.from_bytes(data, byteorder='big', signed=self.signed)
|
||||
except ValueError as e:
|
||||
raise IntegerError(str(e), path=path)
|
||||
|
||||
@@ -248,7 +248,7 @@ class GreedyInteger(Construct):
|
||||
raise IntegerError(f"value {obj} is not an integer", path=path)
|
||||
length = self.__bytes_required(obj, self.minlen)
|
||||
try:
|
||||
data = integer2bytes(obj, length, self.signed)
|
||||
data = obj.to_bytes(length, byteorder='big', signed=self.signed)
|
||||
except ValueError as e:
|
||||
raise IntegerError(str(e), path=path)
|
||||
if evaluate(self.swapped, context):
|
||||
|
||||
461
pySim/ota.py
461
pySim/ota.py
@@ -1,461 +0,0 @@
|
||||
"""Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115."""
|
||||
|
||||
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from pySim.construct import *
|
||||
from pySim.utils import b2h
|
||||
from pySim.sms import UserDataHeader
|
||||
from construct import *
|
||||
from bidict import bidict
|
||||
import zlib
|
||||
import abc
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
# ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS
|
||||
# 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP
|
||||
|
||||
# CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data
|
||||
|
||||
# CAT_TP TCP/IP SMS
|
||||
# CPI 0x01 0x01 =IEIa=70,len=0
|
||||
# CHI NULL NULL NULL
|
||||
# CPI, CPL and CHL included in RC/CC/DS true true
|
||||
# RPI 0x02 0x02 =IEIa=71,len=0
|
||||
# RHI NULL NULL
|
||||
# RPI, RPL and RHL included in RC/CC/DS true true
|
||||
# packet-id 0-bf,ff 0-bf,ff
|
||||
# identification packet false 102 225 tbl 6
|
||||
|
||||
# KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK
|
||||
|
||||
# TS 102 225 Table 5
|
||||
ota_status_codes = bidict({
|
||||
0x00: 'PoR OK',
|
||||
0x01: 'RC/CC/DS failed',
|
||||
0x02: 'CNTR low',
|
||||
0x03: 'CNTR high',
|
||||
0x04: 'CNTR blocked',
|
||||
0x05: 'Ciphering error',
|
||||
0x06: 'Unidentified security error',
|
||||
0x07: 'Insufficient memory',
|
||||
0x08: 'more time',
|
||||
0x09: 'TAR unknown',
|
||||
0x0a: 'Insufficient security level',
|
||||
0x0b: 'Actual Response in SMS-SUBMIT', # 31.115
|
||||
0x0c: 'Actual Response in USSD', # 31.115
|
||||
})
|
||||
|
||||
# ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7
|
||||
ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3,
|
||||
cntr_blocked=4, ciphering_error=5, undefined_security_error=6,
|
||||
insufficient_memory=7, more_time_needed=8, tar_unknown=9,
|
||||
insufficient_security_level=0x0A,
|
||||
actual_response_sms_submit=0x0B,
|
||||
actual_response_ussd=0x0C)
|
||||
|
||||
# ETSI TS 102 226 Section 5.1.2
|
||||
CompactRemoteResp = Struct('number_of_commands'/Int8ub,
|
||||
'last_status_word'/HexAdapter(Bytes(2)),
|
||||
'last_response_data'/HexAdapter(GreedyBytes))
|
||||
|
||||
RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
|
||||
|
||||
# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
|
||||
SPI = BitStruct( # first octet
|
||||
Padding(3),
|
||||
'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
|
||||
counter_must_be_higher=2, counter_must_be_lower=3),
|
||||
'ciphering'/Flag,
|
||||
'rc_cc_ds'/RC_CC_DS,
|
||||
# second octet
|
||||
Padding(2),
|
||||
'por_in_submit'/Flag,
|
||||
'por_shall_be_ciphered'/Flag,
|
||||
'por_rc_cc_ds'/RC_CC_DS,
|
||||
'por'/Enum(BitsInteger(2), no_por=0,
|
||||
por_required=1, por_only_when_error=2)
|
||||
)
|
||||
|
||||
# TS 102 225 Section 5.1.2
|
||||
KIC = BitStruct('key'/BitsInteger(4),
|
||||
'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
|
||||
aes_cbc=2)
|
||||
)
|
||||
|
||||
# TS 102 225 Section 5.1.3.1
|
||||
KID_CC = BitStruct('key'/BitsInteger(4),
|
||||
'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
|
||||
aes_cmac=2)
|
||||
)
|
||||
|
||||
# TS 102 225 Section 5.1.3.2
|
||||
KID_RC = BitStruct('key'/BitsInteger(4),
|
||||
'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3)
|
||||
)
|
||||
|
||||
SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub,
|
||||
'cmd_hdr_len'/Int8ub,
|
||||
'spi'/SPI,
|
||||
'kic'/KIC,
|
||||
'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }),
|
||||
'tar'/Bytes(3),
|
||||
'secured_data'/GreedyBytes)
|
||||
|
||||
class OtaKeyset:
|
||||
"""The OTA related data (key material, counter) to be used in encrypt/decrypt."""
|
||||
def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes,
|
||||
algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0):
|
||||
self.algo_crypt = algo_crypt
|
||||
self.kic = bytes(kic)
|
||||
self.kic_idx = kic_idx
|
||||
self.algo_auth = algo_auth
|
||||
self.kid = bytes(kid)
|
||||
self.kid_idx = kid_idx
|
||||
self.cntr = cntr
|
||||
|
||||
@property
|
||||
def auth(self):
|
||||
"""Return an instance of the matching OtaAlgoAuth."""
|
||||
return OtaAlgoAuth.fromKeyset(self)
|
||||
|
||||
@property
|
||||
def crypt(self):
|
||||
"""Return an instance of the matching OtaAlgoCrypt."""
|
||||
return OtaAlgoCrypt.fromKeyset(self)
|
||||
|
||||
class OtaCheckError(Exception):
|
||||
pass
|
||||
|
||||
class OtaDialect(abc.ABC):
|
||||
"""Base Class for OTA dialects such as SMS, BIP, ..."""
|
||||
|
||||
def _compute_sig_len(self, spi:SPI):
|
||||
if spi['rc_cc_ds'] == 'no_rc_cc_ds':
|
||||
return 0
|
||||
elif spi['rc_cc_ds'] == 'rc': # CRC-32
|
||||
return 4
|
||||
elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
|
||||
# TODO: this is not entirely correct, as in AES case it could be 4 or 8
|
||||
return 8
|
||||
else:
|
||||
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
|
||||
|
||||
@abc.abstractmethod
|
||||
def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]):
|
||||
"""Decode a response into a response packet and, if indicted (by a
|
||||
response status of `"por_ok"`) a decoded response.
|
||||
|
||||
The response packet's common characteristics are not fully determined,
|
||||
and (so far) completely proprietary per dialect."""
|
||||
pass
|
||||
|
||||
|
||||
from Crypto.Cipher import DES, DES3, AES
|
||||
from Crypto.Hash import CMAC
|
||||
|
||||
class OtaAlgo(abc.ABC):
|
||||
iv = b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
blocksize = None
|
||||
enum_name = None
|
||||
|
||||
@staticmethod
|
||||
def _get_padding(in_len: int, multiple: int, padding: int = 0):
|
||||
"""Return padding bytes towards multiple of N."""
|
||||
if in_len % multiple == 0:
|
||||
return b''
|
||||
pad_cnt = multiple - (in_len % multiple)
|
||||
return b'\x00' * pad_cnt
|
||||
|
||||
@staticmethod
|
||||
def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0):
|
||||
"""Pad input bytes to multiple of N."""
|
||||
return indat + OtaAlgo._get_padding(len(indat), multiple, padding)
|
||||
|
||||
def pad_to_blocksize(self, indat: bytes, padding: int = 0):
|
||||
"""Pad the given input data to multiple of the cipher block size."""
|
||||
return self._pad_to_multiple(indat, self.blocksize, padding)
|
||||
|
||||
def __init__(self, otak: OtaKeyset):
|
||||
self.otak = otak
|
||||
|
||||
def __str__(self):
|
||||
return self.__class__.__name__
|
||||
|
||||
class OtaAlgoCrypt(OtaAlgo, abc.ABC):
|
||||
def __init__(self, otak: OtaKeyset):
|
||||
if self.enum_name != otak.algo_crypt:
|
||||
raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
|
||||
super().__init__(otak)
|
||||
|
||||
def encrypt(self, data:bytes) -> bytes:
|
||||
"""Encrypt given input bytes using the key material given in constructor."""
|
||||
padded_data = self.pad_to_blocksize(data)
|
||||
return self._encrypt(data)
|
||||
|
||||
def decrypt(self, data:bytes) -> bytes:
|
||||
"""Decrypt given input bytes using the key material given in constructor."""
|
||||
return self._decrypt(data)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _encrypt(self, data:bytes) -> bytes:
|
||||
"""Actual implementation, to be implemented by derived class."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def _decrypt(self, data:bytes) -> bytes:
|
||||
"""Actual implementation, to be implemented by derived class."""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
|
||||
"""Resolve the class for the encryption algorithm of otak and instantiate it."""
|
||||
for subc in cls.__subclasses__():
|
||||
if subc.enum_name == otak.algo_crypt:
|
||||
return subc(otak)
|
||||
raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth)
|
||||
|
||||
class OtaAlgoAuth(OtaAlgo, abc.ABC):
|
||||
def __init__(self, otak: OtaKeyset):
|
||||
if self.enum_name != otak.algo_auth:
|
||||
raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
|
||||
super().__init__(otak)
|
||||
|
||||
def sign(self, data:bytes) -> bytes:
|
||||
"""Compute the CC/CR check bytes for the input data using key material
|
||||
given in constructor."""
|
||||
padded_data = self.pad_to_blocksize(data)
|
||||
sig = self._sign(padded_data)
|
||||
return sig
|
||||
|
||||
def check_sig(self, data:bytes, cc_received:bytes):
|
||||
"""Compute the CC/CR check bytes for the input data and compare against cc_received."""
|
||||
cc = self.sign(data)
|
||||
if cc_received != cc:
|
||||
raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc)))
|
||||
|
||||
@abc.abstractmethod
|
||||
def _sign(self, data:bytes) -> bytes:
|
||||
"""Actual implementation, to be implemented by derived class."""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
|
||||
"""Resolve the class for the authentication algorithm of otak and instantiate it."""
|
||||
for subc in cls.__subclasses__():
|
||||
if subc.enum_name == otak.algo_auth:
|
||||
return subc(otak)
|
||||
raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth)
|
||||
|
||||
class OtaAlgoCryptDES(OtaAlgoCrypt):
|
||||
"""DES is insecure. For backwards compatibility with pre-Rel8"""
|
||||
name = 'DES'
|
||||
enum_name = 'single_des'
|
||||
blocksize = 8
|
||||
def _encrypt(self, data:bytes) -> bytes:
|
||||
cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
|
||||
return cipher.encrypt(data)
|
||||
|
||||
def _decrypt(self, data:bytes) -> bytes:
|
||||
cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
|
||||
return cipher.decrypt(data)
|
||||
|
||||
class OtaAlgoAuthDES(OtaAlgoAuth):
|
||||
"""DES is insecure. For backwards compatibility with pre-Rel8"""
|
||||
name = 'DES'
|
||||
enum_name = 'single_des'
|
||||
blocksize = 8
|
||||
def _sign(self, data:bytes) -> bytes:
|
||||
cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv)
|
||||
ciph = cipher.encrypt(data)
|
||||
return ciph[len(ciph) - 8:]
|
||||
|
||||
class OtaAlgoCryptDES3(OtaAlgoCrypt):
|
||||
name = '3DES'
|
||||
enum_name = 'triple_des_cbc2'
|
||||
blocksize = 8
|
||||
def _encrypt(self, data:bytes) -> bytes:
|
||||
cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
|
||||
return cipher.encrypt(data)
|
||||
|
||||
def _decrypt(self, data:bytes) -> bytes:
|
||||
cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
|
||||
return cipher.decrypt(data)
|
||||
|
||||
class OtaAlgoAuthDES3(OtaAlgoAuth):
|
||||
name = '3DES'
|
||||
enum_name = 'triple_des_cbc2'
|
||||
blocksize = 8
|
||||
def _sign(self, data:bytes) -> bytes:
|
||||
cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv)
|
||||
ciph = cipher.encrypt(data)
|
||||
return ciph[len(ciph) - 8:]
|
||||
|
||||
class OtaAlgoCryptAES(OtaAlgoCrypt):
|
||||
name = 'AES'
|
||||
enum_name = 'aes_cbc'
|
||||
blocksize = 16 # TODO: is this needed?
|
||||
def _encrypt(self, data:bytes) -> bytes:
|
||||
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
|
||||
return cipher.encrypt(data)
|
||||
|
||||
def _decrypt(self, data:bytes) -> bytes:
|
||||
cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
|
||||
return cipher.decrypt(data)
|
||||
|
||||
class OtaAlgoAuthAES(OtaAlgoAuth):
|
||||
name = 'AES'
|
||||
enum_name = 'aes_cmac'
|
||||
blocksize = 16 # TODO: is this needed?
|
||||
def _sign(self, data:bytes) -> bytes:
|
||||
cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8)
|
||||
cmac.update(data)
|
||||
ciph = cmac.digest()
|
||||
return ciph[len(ciph) - 8:]
|
||||
|
||||
|
||||
|
||||
class OtaDialectSms(OtaDialect):
|
||||
"""OTA dialect for SMS based transport, as described in 3GPP TS 31.115."""
|
||||
SmsResponsePacket = Struct('rpl'/Int16ub,
|
||||
'rhl'/Int8ub,
|
||||
'tar'/Bytes(3),
|
||||
'cntr'/Bytes(5),
|
||||
'pcntr'/Int8ub,
|
||||
'response_status'/ResponseStatus,
|
||||
'cc_rc'/Bytes(this.rhl-10),
|
||||
'secured_data'/GreedyBytes)
|
||||
|
||||
def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes:
|
||||
# length of signature in octets
|
||||
len_sig = self._compute_sig_len(spi)
|
||||
pad_cnt = 0
|
||||
if spi['ciphering']: # ciphering is requested
|
||||
# append padding bytes to end up with blocksize
|
||||
len_cipher = 6 + len_sig + len(apdu)
|
||||
padding = otak.crypt._get_padding(len_cipher, otak.crypt.blocksize)
|
||||
pad_cnt = len(padding)
|
||||
apdu += padding
|
||||
|
||||
kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt}
|
||||
kid = {'key': otak.kid_idx, 'algo': otak.algo_auth}
|
||||
|
||||
# CHL = number of octets from (and including) SPI to the end of RC/CC/DS
|
||||
# 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1)
|
||||
chl = 13 + len_sig
|
||||
|
||||
# CHL + SPI (+ KIC + KID)
|
||||
c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3))
|
||||
part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
|
||||
#print("part_head: %s" % b2h(part_head))
|
||||
|
||||
# CNTR + PCNTR (CNTR not used)
|
||||
part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
|
||||
#print("part_cnt: %s" % b2h(part_cnt))
|
||||
|
||||
envelope_data = part_head + part_cnt + apdu
|
||||
#print("envelope_data: %s" % b2h(envelope_data))
|
||||
|
||||
# 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
|
||||
# CPL from and including CPI to end of secured data, including any padding for ciphering
|
||||
cpl = len(envelope_data) + len_sig
|
||||
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
|
||||
#print("envelope_data with cpl: %s" % b2h(envelope_data))
|
||||
|
||||
if spi['rc_cc_ds'] == 'cc':
|
||||
cc = otak.auth.sign(envelope_data)
|
||||
envelope_data = part_cnt + cc + apdu
|
||||
elif spi['rc_cc_ds'] == 'rc':
|
||||
# CRC32
|
||||
crc32 = zlib.crc32(envelope_data) & 0xffffffff
|
||||
envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu
|
||||
elif spi['rc_cc_ds'] == 'no_rc_cc_ds':
|
||||
envelope_data = part_cnt + apdu
|
||||
else:
|
||||
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
|
||||
|
||||
#print("envelope_data with sig: %s" % b2h(envelope_data))
|
||||
|
||||
# encrypt as needed
|
||||
if spi['ciphering']: # ciphering is requested
|
||||
ciph = otak.crypt.encrypt(envelope_data)
|
||||
envelope_data = part_head + ciph
|
||||
# prefix with another CPL
|
||||
cpl = len(envelope_data)
|
||||
envelope_data = cpl.to_bytes(2, 'big') + envelope_data
|
||||
else:
|
||||
envelope_data = part_head + envelope_data
|
||||
|
||||
#print("envelope_data: %s" % b2h(envelope_data))
|
||||
|
||||
return envelope_data
|
||||
|
||||
def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]):
|
||||
if isinstance(data, str):
|
||||
data = h2b(data)
|
||||
# plain-text POR: 027100000e0ab000110000000000000001612f
|
||||
# UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS
|
||||
# 02 71 00 000e 0a b00011 0000000000 00 00 01 612f
|
||||
# POR with CC: 027100001612b000110000000000000055f47118381175fb01612f
|
||||
# POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
|
||||
if data[0] != 0x02:
|
||||
raise ValueError('Unexpected UDL=0x%02x' % data[0])
|
||||
udhd, remainder = UserDataHeader.fromBytes(data)
|
||||
if not udhd.has_ie(0x71):
|
||||
raise ValueError('RPI 0x71 not found in UDH')
|
||||
rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered
|
||||
res = self.SmsResponsePacket.parse(remainder)
|
||||
|
||||
if spi['por_shall_be_ciphered']:
|
||||
# decrypt
|
||||
ciphered_part = remainder[6:]
|
||||
deciph = otak.crypt.decrypt(ciphered_part)
|
||||
temp_data = rph_rhl_tar + deciph
|
||||
res = self.SmsResponsePacket.parse(temp_data)
|
||||
# remove specified number of padding bytes, if any
|
||||
if res['pcntr'] != 0:
|
||||
# this conditional is needed as python [:-0] renders an empty return!
|
||||
res['secured_data'] = res['secured_data'][:-res['pcntr']]
|
||||
remainder = temp_data
|
||||
|
||||
# is there a CC/RC present?
|
||||
len_sig = res['rhl'] - 10
|
||||
if spi['por_rc_cc_ds'] == 'no_rc_cc_ds':
|
||||
if len_sig:
|
||||
raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig)
|
||||
elif spi['por_rc_cc_ds'] == 'cc':
|
||||
# verify signature
|
||||
# UDH is part of CC/RC!
|
||||
udh = data[:3]
|
||||
# RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC
|
||||
rpl_rhl_tar_cntr_pcntr_sts = remainder[:13]
|
||||
# remove the CC/RC bytes
|
||||
temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:]
|
||||
cc = otak.auth.check_sig(temp_data, res['cc_rc'])
|
||||
# TODO: CRC
|
||||
else:
|
||||
raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
|
||||
|
||||
# TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
|
||||
if res.response_status == 'por_ok':
|
||||
dec = CompactRemoteResp.parse(res['secured_data'])
|
||||
else:
|
||||
dec = None
|
||||
return (res, dec)
|
||||
53
pySim/sms.py
53
pySim/sms.py
@@ -1,53 +0,0 @@
|
||||
"""Code related to SMS Encoding/Decoding"""
|
||||
# simplistic SMS T-PDU code, as unfortunately nobody bothered to port the python smspdu
|
||||
# module to python3, and I gave up after >= 3 hours of trying and failing to do so
|
||||
|
||||
# (C) 2022 by Harald Welte <laforge@osmocom.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import typing
|
||||
from construct import Int8ub, Bytes
|
||||
from construct import Struct, Tell, this, RepeatUntil
|
||||
|
||||
from pySim.utils import Hexstr, h2b, b2h
|
||||
|
||||
BytesOrHex = typing.Union[Hexstr, bytes]
|
||||
|
||||
class UserDataHeader:
|
||||
# a single IE in the user data header
|
||||
ie_c = Struct('offset'/Tell, 'iei'/Int8ub, 'length'/Int8ub, 'data'/Bytes(this.length))
|
||||
# parser for the full UDH: Length octet followed by sequence of IEs
|
||||
_construct = Struct('udhl'/Int8ub,
|
||||
# FIXME: somehow the below lambda is not working, we always only get the first IE?
|
||||
'ies'/RepeatUntil(lambda obj,lst,ctx: ctx._io.tell() > 1+this.udhl, ie_c))
|
||||
|
||||
def __init__(self, ies=[]):
|
||||
self.ies = ies
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return 'UDH(%r)' % self.ies
|
||||
|
||||
def has_ie(self, iei:int) -> bool:
|
||||
for ie in self.ies:
|
||||
if ie['iei'] == iei:
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
|
||||
if isinstance(inb, str):
|
||||
inb = h2b(inb)
|
||||
res = cls._construct.parse(inb)
|
||||
return cls(res['ies']), inb[1+res['udhl']:]
|
||||
@@ -10,7 +10,7 @@ from typing import Optional, Tuple
|
||||
from pySim.exceptions import *
|
||||
from pySim.construct import filter_dict
|
||||
from pySim.utils import sw_match, b2h, h2b, i2h, Hexstr
|
||||
from pySim.cat import ProactiveCommand
|
||||
from pySim.cat import ProactiveCommand, CommandDetails, DeviceIdentities, Result
|
||||
|
||||
#
|
||||
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
|
||||
@@ -42,10 +42,7 @@ class ProactiveHandler(abc.ABC):
|
||||
"""Abstract base class representing the interface of some code that handles
|
||||
the proactive commands, as returned by the card in responses to the FETCH
|
||||
command."""
|
||||
def receive_fetch_raw(self, payload: Hexstr):
|
||||
# parse the proactive command
|
||||
pcmd = ProactiveCommand()
|
||||
parsed = pcmd.from_tlv(h2b(payload))
|
||||
def receive_fetch_raw(self, pcmd: ProactiveCommand, parsed: Hexstr):
|
||||
# try to find a generic handler like handle_SendShortMessage
|
||||
handle_name = 'handle_%s' % type(parsed).__name__
|
||||
if hasattr(self, handle_name):
|
||||
@@ -160,13 +157,57 @@ class LinkBase(abc.ABC):
|
||||
sw : string (in hex) of status word (ex. "9000")
|
||||
"""
|
||||
rv = self.send_apdu(pdu)
|
||||
last_sw = rv[1]
|
||||
|
||||
while sw == '9000' and sw_match(rv[1], '91xx'):
|
||||
while sw == '9000' and sw_match(last_sw, '91xx'):
|
||||
# It *was* successful after all -- the extra pieces FETCH handled
|
||||
# need not concern the caller.
|
||||
rv = (rv[0], '9000')
|
||||
# proactive sim as per TS 102 221 Setion 7.4.2
|
||||
rv = self.send_apdu_checksw('80120000' + rv[1][2:], sw)
|
||||
print("FETCH: %s" % rv[0])
|
||||
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
|
||||
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
|
||||
# Setting this in case we later decide not to send a terminal
|
||||
# response immediately unconditionally -- the card may still have
|
||||
# something pending even though the last command was not processed
|
||||
# yet.
|
||||
last_sw = fetch_rv[1]
|
||||
# parse the proactive command
|
||||
pcmd = ProactiveCommand()
|
||||
parsed = pcmd.from_tlv(h2b(fetch_rv[0]))
|
||||
print("FETCH: %s (%s)" % (fetch_rv[0], type(parsed).__name__))
|
||||
result = Result()
|
||||
if self.proactive_handler:
|
||||
self.proactive_handler.receive_fetch_raw(rv[0])
|
||||
# Extension point: If this does return a list of TLV objects,
|
||||
# they could be appended after the Result; if the first is a
|
||||
# Result, that cuold replace the one built here.
|
||||
self.proactive_handler.receive_fetch_raw(pcmd, parsed)
|
||||
result.from_dict({'general_result': 'performed_successfully', 'additional_information': ''})
|
||||
else:
|
||||
result.from_dict({'general_result': 'command_beyond_terminal_capability', 'additional_information': ''})
|
||||
|
||||
# Send response immediately, thus also flushing out any further
|
||||
# proactive commands that the card already wants to send
|
||||
#
|
||||
# Structure as per TS 102 223 V4.4.0 Section 6.8
|
||||
|
||||
# The Command Details are echoed from the command that has been processed.
|
||||
(command_details,) = [c for c in pcmd.decoded.children if isinstance(c, CommandDetails)]
|
||||
# The Device Identities are fixed. (TS 102 223 V4.0.0 Section 6.8.2)
|
||||
device_identities = DeviceIdentities()
|
||||
device_identities.from_dict({'source_dev_id': 'terminal', 'dest_dev_id': 'uicc'})
|
||||
|
||||
# Testing hint: The value of tail does not influence the behavior
|
||||
# of an SJA2 that sent ans SMS, so this is implemented only
|
||||
# following TS 102 223, and not fully tested.
|
||||
tail = command_details.to_tlv() + device_identities.to_tlv() + result.to_tlv()
|
||||
# Testing hint: In contrast to the above, this part is positively
|
||||
# essential to get the SJA2 to provide the later parts of a
|
||||
# multipart SMS in response to an OTA RFM command.
|
||||
terminal_response = '80140000' + b2h(len(tail).to_bytes(1, 'big') + tail)
|
||||
|
||||
terminal_response_rv = self.send_apdu(terminal_response)
|
||||
last_sw = terminal_response_rv[1]
|
||||
|
||||
if not sw_match(rv[1], sw):
|
||||
raise SwMatchError(rv[1], sw.lower(), self.sw_interpreter)
|
||||
return rv
|
||||
|
||||
@@ -3,10 +3,9 @@ pyserial
|
||||
pytlv
|
||||
cmd2==1.5
|
||||
jsonpath-ng
|
||||
construct
|
||||
construct>=2.9.51
|
||||
bidict
|
||||
gsm0338
|
||||
pyyaml>=5.1
|
||||
termcolor
|
||||
colorlog
|
||||
pycryptodome
|
||||
|
||||
5
setup.py
5
setup.py
@@ -14,12 +14,11 @@ setup(
|
||||
"pytlv",
|
||||
"cmd2 >= 1.3.0, < 2.0.0",
|
||||
"jsonpath-ng",
|
||||
"construct >= 2.9",
|
||||
"construct >= 2.9.51",
|
||||
"bidict",
|
||||
"gsm0338",
|
||||
"termcolor",
|
||||
"colorlog",
|
||||
"pycryptodome"
|
||||
"colorlog"
|
||||
],
|
||||
scripts=[
|
||||
'pySim-prog.py',
|
||||
|
||||
@@ -1,85 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
from pySim.utils import h2b, b2h
|
||||
from pySim.ota import *
|
||||
|
||||
class Test_SMS_3DES(unittest.TestCase):
|
||||
tar = h2b('b00000')
|
||||
"""Test the OtaDialectSms for 3DES algorithms."""
|
||||
def __init__(self, foo, **kwargs):
|
||||
super().__init__(foo, **kwargs)
|
||||
# KIC1 + KID1 of 8988211000000467285
|
||||
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
|
||||
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
|
||||
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
|
||||
KID3 = h2b('12110C78E678C25408233076AA033615')
|
||||
self.od = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
|
||||
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
|
||||
self.dialect = OtaDialectSms()
|
||||
self.spi_base = {
|
||||
'counter':'no_counter',
|
||||
'ciphering': True,
|
||||
'rc_cc_ds': 'cc',
|
||||
'por_in_submit':False,
|
||||
'por': 'por_required',
|
||||
'por_shall_be_ciphered': True,
|
||||
'por_rc_cc_ds': 'cc',
|
||||
}
|
||||
|
||||
def _check_response(self, r):
|
||||
self.assertEqual(r['number_of_commands'], 1)
|
||||
self.assertEqual(r['last_status_word'], '612f')
|
||||
self.assertEqual(r['last_response_data'], u'')
|
||||
self.assertEqual(r['response_status'], 'por_ok')
|
||||
|
||||
def test_resp_3des_ciphered(self):
|
||||
spi = self.spi_base
|
||||
spi['por_shall_be_ciphered'] = True
|
||||
spi['por_rc_cc_ds'] = 'cc'
|
||||
r = self.dialect.decode_resp(self.od, spi, '027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c')
|
||||
self._check_response(r)
|
||||
|
||||
def test_resp_3des_signed(self):
|
||||
spi = self.spi_base
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'cc'
|
||||
r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb01612f')
|
||||
self._check_response(r)
|
||||
|
||||
def test_resp_3des_signed_err(self):
|
||||
"""Expect an OtaCheckError exception if the computed CC != received CC"""
|
||||
spi = self.spi_base
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'cc'
|
||||
with self.assertRaises(OtaCheckError) as context:
|
||||
r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb02612f')
|
||||
self.assertTrue('!= Computed CC' in str(context.exception))
|
||||
|
||||
def test_resp_3des_none(self):
|
||||
spi = self.spi_base
|
||||
spi['por_shall_be_ciphered'] = False
|
||||
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
r = self.dialect.decode_resp(self.od, spi, '027100000e0ab000110000000000000001612f')
|
||||
self._check_response(r)
|
||||
|
||||
def test_cmd_3des_ciphered(self):
|
||||
spi = self.spi_base
|
||||
spi['ciphering'] = True
|
||||
spi['rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
|
||||
self.assertEqual(b2h(r), '00180d04193535b000000c8478b552a4ffc5a8f099b83cad7123')
|
||||
|
||||
def test_cmd_3des_signed(self):
|
||||
spi = self.spi_base
|
||||
spi['ciphering'] = False
|
||||
spi['rc_cc_ds'] = 'cc'
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
|
||||
self.assertEqual(b2h(r), '1502193535b00000000000000000072ea17bdb72060e00a40000023f00')
|
||||
|
||||
def test_cmd_3des_none(self):
|
||||
spi = self.spi_base
|
||||
spi['ciphering'] = False
|
||||
spi['rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
|
||||
self.assertEqual(b2h(r), '0d00193535b0000000000000000000a40000023f00')
|
||||
Reference in New Issue
Block a user