Compare commits

..

11 Commits

Author SHA1 Message Date
Harald Welte
890e1951fe Implement Global Platform SCP03
This adds an implementation of the GlobalPlatform SCP03 protocol. It has
been tested in S8 mode for C-MAC, C-ENC, R-MAC and R-ENC with AES using
128, 192 and 256 bit key lengh.  Test vectors generated while talking to
a sysmoEUICC1-C2T are included as unit tests.

Change-Id: Ibc35af5474923aed2e3bcb29c8d713b4127a160d
2024-02-04 15:11:08 +01:00
Harald Welte
dd6895ff06 rename global_platform.scp02 to global_platform.scp
This is in preparation of extending it to cover SCP03 in a follow-up
patch.

Change-Id: Idc0afac6e95f89ddaf277a89f9c95607e70a471c
2024-02-04 15:06:50 +01:00
Harald Welte
3a0f881d84 Contstrain argparse integers to permitted range
In many casese we used type=int permitting any integer value, positive
or negative without a constratint in size.  However, in reality often
we're constrained to unsigned 8 or 16 bit ranges.  Let's use the
auto_uint{8,16} functions to enforce this within argparse before
we even try to encode something that won't work.

Change-Id: I35c81230bc18e2174ec1930aa81463f03bcd69c8
2024-02-04 14:53:07 +01:00
Harald Welte
4b23130da9 global_platform: Fix --key-id argument
The key-id is actually a 7-bit integer and on the wire the 8th bit
has a special meaning which can be derived automatically.

Let's unburden the user from explicitly encoding that 8th bit and
instead set it automatically.

Change-Id: I8da37aa8fd064e6d35ed29a70f5d7a0e9060be3a
2024-02-04 14:53:07 +01:00
Harald Welte
c84c04afb5 global_platform: add delete_key and delete_card_content
This GlobalPlatform command is used to delete applications/load-files
or keys.

Change-Id: Ib5d18e983d0e918633d7c090c54fb9a3384a22e5
2024-02-04 14:53:07 +01:00
Harald Welte
0e9ad7b5d4 global_platform: add set_status command
Using this command, one can change the life cycle status of on-card
applications, specifically one can LOCK (disable) them and re-enable
them as needed.

Change-Id: Ie14297a119d01cad1284f315a2508aa92cb4633b
2024-02-04 14:53:07 +01:00
Harald Welte
5dc8471526 global_platform: Add install_for_personalization command
This allows us to perform STORE DATA on applications like ARA-M/ARA-D
after establishing SCP02 to the related security domain.

Change-Id: I2ce766b97bba42c64c4d4492b505be66c24f471e
2024-02-04 14:53:07 +01:00
Harald Welte
5bbd720512 pySim-shell: Make 'apdu' command use logical (and secure) channel
The 'apdu' command so far bypassed the logical channel and also
the recently-introduced support for secure channels.  Let's change
that, at least by default.  If somebody wants a raw APDU without
secure / logical channel processing, they may use the --raw option.

Change-Id: Id0c364f772c31e11e8dfa21624d8685d253220d0
2024-02-04 14:53:07 +01:00
Harald Welte
71a7a19159 SCP02: Only C-MAC/C-ENCRYPT APDUs whose CLA byte indicates GlobalPlatform
I'm not entirely sure if this is the right thing to do.  For sure I do
have cards which don't like SELECT with C-MAC appended... and
GlobalPlatform clearly states SELECT is coded with CLA value that has
the MSB not set (i.e. not a GlobalPlatform command).

Change-Id: Ieda75c865a6ff2725fc3c8772bb274d96b8a5a43
2024-02-04 14:53:07 +01:00
Harald Welte
ca3678e471 Add global_platform shell command establish_scp02 and release_scp
Those commands can be used to establish and release a SCP02 secure
channel on the currently active logical channel.

The prompt is adjusted with a 'SCP02' prefix while the secure channel is
established.

Change-Id: Ib2f3c8f0563f81a941dd55b97c9836e3a6856407
2024-02-04 14:53:07 +01:00
Harald Welte
1e052f1efa Introduce GlobalPlatform SCP02 implementation
This implementation of GlobalPlatform SCP02 currently only supports
C-MAC and C-ENC, but no R-MAC or R-ENC yet.

The patch also introduces the notion of having a SCP instance associated
with a SimCardCommands instance.  No code is using this yet, it will be
introduced in a separate patch.

Change-Id: I56020382b9dfe8ba0f7c1c9f71eb1a9746bc5a27
2024-02-04 14:53:07 +01:00
87 changed files with 726 additions and 2207 deletions

View File

@@ -1,79 +0,0 @@
#!/usr/bin/env python3
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import argparse
from pySim.esim import es2p
EID_HELP='EID of the eUICC for which eSIM shall be made available'
ICCID_HELP='The ICCID of the eSIM that shall be made available'
MATCHID_HELP='MatchingID that shall be used by profile download'
parser = argparse.ArgumentParser(description="""
Utility to manuall issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint')
parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+')
parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server')
parser.add_argument('--server-ca-cert', help="""X.509 CA certificates acceptable for the server side. In
production use cases, this would be the GSMA Root CA (CI) certificate.""")
subparsers = parser.add_subparsers(dest='command',help="The command (API function) to call")
parser_dlo = subparsers.add_parser('download-order', help="ES2+ DownloadOrder function")
parser_dlo.add_argument('--eid', help=EID_HELP)
parser_dlo.add_argument('--iccid', help=ICCID_HELP)
parser_dlo.add_argument('--profileType', help='The profile type of which one eSIM shall be made available')
parser_cfo = subparsers.add_parser('confirm-order', help="ES2+ ConfirmOrder function")
parser_cfo.add_argument('--iccid', required=True, help=ICCID_HELP)
parser_cfo.add_argument('--eid', help=EID_HELP)
parser_cfo.add_argument('--matchingId', help=MATCHID_HELP)
parser_cfo.add_argument('--confirmationCode', help='Confirmation code that shall be used by profile download')
parser_cfo.add_argument('--smdsAddress', help='SM-DS Address')
parser_cfo.add_argument('--releaseFlag', action='store_true', help='Shall the profile be immediately released?')
parser_co = subparsers.add_parser('cancel-order', help="ES2+ CancelOrder function")
parser_co.add_argument('--iccid', required=True, help=ICCID_HELP)
parser_co.add_argument('--eid', help=EID_HELP)
parser_co.add_argument('--matchingId', help=MATCHID_HELP)
parser_co.add_argument('--finalProfileStatusIndicator', required=True, choices=['Available','Unavailable'])
parser_rp = subparsers.add_parser('release-profile', help='ES2+ ReleaseProfile function')
parser_rp.add_argument('--iccid', required=True, help=ICCID_HELP)
if __name__ == '__main__':
opts = parser.parse_args()
#print(opts)
peer = es2p.Es2pApiClient(opts.url, opts.id, server_cert_verify=opts.server_ca_cert, client_cert=opts.client_cert)
data = {}
for k, v in vars(opts).items():
if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']:
# remove keys from dict that shold not end up in JSON...
continue
if v is not None:
data[k] = v
print(data)
if opts.command == 'download-order':
res = peer.call_downloadOrder(data)
elif opts.command == 'confirm-order':
res = peer.call_confirmOrder(data)
elif opts.command == 'cancel-order':
res = peer.call_cancelOrder(data)
elif opts.command == 'release-profile':
res = peer.call_releaseProfile(data)

View File

@@ -45,8 +45,7 @@ case "$JOB_TYPE" in
--disable E1102 \
--disable E0401 \
--enable W0301 \
pySim tests/*.py *.py \
contrib/es2p_client.py
pySim *.py
;;
"docs")
rm -rf docs/_build

View File

@@ -473,18 +473,7 @@ sequence including the electrical power down.
:module: pySim.ts_102_221
:func: CardProfileUICC.AddlShellCommands.resume_uicc_parser
terminal_capability
~~~~~~~~~~~~~~~~~~~
This command allows you to perform the TERMINAL CAPABILITY command towards the card.
TS 102 221 specifies the TERMINAL CAPABILITY command using which the
terminal (Software + hardware talking to the card) can expose their
capabilities. This is also used in the eUICC universe to let the eUICC
know which features are supported.
.. argparse::
:module: pySim.ts_102_221
:func: CardProfileUICC.AddlShellCommands.term_cap_parser
Linear Fixed EF commands
@@ -986,13 +975,7 @@ install_for_personalization
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. argparse::
:module: pySim.global_platform
:func: ADF_SD.AddlShellCommands.inst_perso_parser
install_for_install
~~~~~~~~~~~~~~~~~~~
.. argparse::
:module: pySim.global_platform
:func: ADF_SD.AddlShellCommands.inst_inst_parser
:func: ADF_SD.AddlShellCommands.inst_for_perso_parser
delete_card_content
~~~~~~~~~~~~~~~~~~~

View File

@@ -35,7 +35,6 @@ import asn1tools
from pySim.utils import h2b, b2h, swap_nibbles
import pySim.esim.rsp as rsp
from pySim.esim import saip
from pySim.esim.es8p import *
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
@@ -364,14 +363,11 @@ class SmDppHttpServer:
if not os.path.isfile(path) or not os.access(path, os.R_OK):
raise ApiError('8.2.6', '3.8', 'Refused')
ss.matchingId = matchingId
with open(path, 'rb') as f:
pes = saip.ProfileElementSequence.from_der(f.read())
iccid_str = b2h(pes.get_pe_for_type('header').decoded['iccid'])
# FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
# Put together profileMetadata + _bin
ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId)
ss.profileMetadata = ProfileMetadata(iccid_bin= h2b(swap_nibbles('89000123456789012358')), spn="OsmocomSPN", profile_name="OsmocomProfile")
profileMetadata_bin = ss.profileMetadata.gen_store_metadata_request()
# Put together smdpSigned2 + _bin

View File

@@ -250,7 +250,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
# noted that the apdu command plays an exceptional role since it is the only card accessing command that
# can be executed without the presence of a runtime state (self.rs) object. However, this also means that
# self.lchan is also not present (see method equip).
if opts.raw or self.lchan is None:
if opts.raw:
data, sw = self.card._scc.send_apdu(opts.APDU)
else:
data, sw = self.lchan.scc.send_apdu(opts.APDU)

View File

@@ -1,3 +1,4 @@
# coding=utf-8
"""APDU (and TPDU) parser for UICC/USIM/ISIM cards.
The File (and its classes) represent the structure / hierarchy
@@ -26,13 +27,12 @@ we already know in pySim about the filesystem structure, file encoding, etc.
import abc
from termcolor import colored
import typing
from typing import List, Dict, Optional
from termcolor import colored
from construct import Byte, GreedyBytes
from construct import *
from construct import Optional as COptional
from pySim.construct import *
from pySim.utils import *
from pySim.runtime import RuntimeLchan, RuntimeState, lchan_nr_from_cla
@@ -52,8 +52,8 @@ from pySim.filesystem import CardADF, CardFile, TransparentEF, LinFixedEF
class ApduCommandMeta(abc.ABCMeta):
"""A meta-class that we can use to set some class variables when declaring
a derived class of ApduCommand."""
def __new__(mcs, name, bases, namespace, **kwargs):
x = super().__new__(mcs, name, bases, namespace)
def __new__(metacls, name, bases, namespace, **kwargs):
x = super().__new__(metacls, name, bases, namespace)
x._name = namespace.get('name', kwargs.get('n', None))
x._ins = namespace.get('ins', kwargs.get('ins', None))
x._cla = namespace.get('cla', kwargs.get('cla', None))
@@ -187,39 +187,44 @@ class ApduCommand(Apdu, metaclass=ApduCommandMeta):
if apdu_case in [1, 2]:
# data is part of response
return cls(buffer[:5], buffer[5:])
if apdu_case in [3, 4]:
elif apdu_case in [3, 4]:
# data is part of command
lc = buffer[4]
return cls(buffer[:5+lc], buffer[5+lc:])
raise ValueError('%s: Invalid APDU Case %u' % (cls.__name__, apdu_case))
else:
raise ValueError('%s: Invalid APDU Case %u' % (cls.__name__, apdu_case))
@property
def path(self) -> List[str]:
"""Return (if known) the path as list of files to the file on which this command operates."""
if self.file:
return self.file.fully_qualified_path()
return []
else:
return []
@property
def path_str(self) -> str:
"""Return (if known) the path as string to the file on which this command operates."""
if self.file:
return self.file.fully_qualified_path_str()
return ''
else:
return ''
@property
def col_sw(self) -> str:
"""Return the ansi-colorized status word. Green==OK, Red==Error"""
if self.successful:
return colored(b2h(self.sw), 'green')
return colored(b2h(self.sw), 'red')
else:
return colored(b2h(self.sw), 'red')
@property
def lchan_nr(self) -> int:
"""Logical channel number over which this ApduCommand was transmitted."""
if self.lchan:
return self.lchan.lchan_nr
return lchan_nr_from_cla(self.cla)
else:
return lchan_nr_from_cla(self.cla)
def __str__(self) -> str:
return '%02u %s(%s): %s' % (self.lchan_nr, type(self).__name__, self.path_str, self.to_dict())
@@ -231,7 +236,7 @@ class ApduCommand(Apdu, metaclass=ApduCommandMeta):
"""Fall-back function to be called if there is no derived-class-specific
process_global or process_on_lchan method. Uses information from APDU decode."""
self.processed = {}
if 'p1' not in self.cmd_dict:
if not 'p1' in self.cmd_dict:
self.processed = self.to_dict()
else:
self.processed['p1'] = self.cmd_dict['p1']
@@ -423,7 +428,8 @@ class TpduFilter(ApduHandler):
apdu = Apdu(icmd, tpdu.rsp)
if self.apdu_handler:
return self.apdu_handler.input(apdu)
return Apdu(icmd, tpdu.rsp)
else:
return Apdu(icmd, tpdu.rsp)
def input(self, cmd: bytes, rsp: bytes):
if isinstance(cmd, str):
@@ -446,6 +452,7 @@ class CardReset:
self.atr = atr
def __str__(self):
if self.atr:
if (self.atr):
return '%s(%s)' % (type(self).__name__, b2h(self.atr))
return '%s' % (type(self).__name__)
else:
return '%s' % (type(self).__name__)

View File

@@ -17,16 +17,12 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from typing import Optional, Dict
import logging
from construct import GreedyRange, Struct
from pySim.construct import *
from pySim.filesystem import *
from pySim.runtime import RuntimeLchan
from pySim.apdu import ApduCommand, ApduCommandSet
from pySim.utils import i2h
from typing import Optional, Dict, Tuple
logger = logging.getLogger(__name__)
@@ -107,6 +103,7 @@ class UiccSelect(ApduCommand, n='SELECT', ins=0xA4, cla=['0X', '4X', '6X']):
#print("\tSELECT AID %s" % adf)
else:
logger.warning('SELECT UNKNOWN AID %s', aid)
pass
else:
raise ValueError('Select Mode %s not implemented' % mode)
# decode the SELECT response
@@ -293,9 +290,12 @@ class VerifyPin(ApduCommand, n='VERIFY PIN', ins=0x20, cla=['0X', '4X', '6X']):
@staticmethod
def _pin_is_success(sw):
return bool(sw[0] == 0x63)
if sw[0] == 0x63:
return True
else:
return False
def process_on_lchan(self, _lchan: RuntimeLchan):
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
@@ -307,7 +307,7 @@ class ChangePin(ApduCommand, n='CHANGE PIN', ins=0x24, cla=['0X', '4X', '6X']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, _lchan: RuntimeLchan):
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
@@ -319,7 +319,7 @@ class DisablePin(ApduCommand, n='DISABLE PIN', ins=0x26, cla=['0X', '4X', '6X'])
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, _lchan: RuntimeLchan):
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
@@ -330,7 +330,7 @@ class DisablePin(ApduCommand, n='DISABLE PIN', ins=0x26, cla=['0X', '4X', '6X'])
class EnablePin(ApduCommand, n='ENABLE PIN', ins=0x28, cla=['0X', '4X', '6X']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, _lchan: RuntimeLchan):
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
@@ -342,7 +342,7 @@ class UnblockPin(ApduCommand, n='UNBLOCK PIN', ins=0x2C, cla=['0X', '4X', '6X'])
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, _lchan: RuntimeLchan):
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
@@ -395,12 +395,13 @@ class ManageChannel(ApduCommand, n='MANAGE CHANNEL', ins=0x70, cla=['0X', '4X',
manage_channel.add_lchan(created_channel_nr)
self.col_id = '%02u' % created_channel_nr
return {'mode': mode, 'created_channel': created_channel_nr }
if mode == 'close_channel':
elif mode == 'close_channel':
closed_channel_nr = self.cmd_dict['p2']['logical_channel_number']
rs.del_lchan(closed_channel_nr)
self.col_id = '%02u' % closed_channel_nr
return {'mode': mode, 'closed_channel': closed_channel_nr }
raise ValueError('Unsupported MANAGE CHANNEL P1=%02X' % self.p1)
else:
raise ValueError('Unsupported MANAGE CHANNEL P1=%02X' % self.p1)
# TS 102 221 Section 11.1.18
class GetChallenge(ApduCommand, n='GET CHALLENGE', ins=0x84, cla=['0X', '4X', '6X']):
@@ -418,13 +419,13 @@ class ManageSecureChannel(ApduCommand, n='MANAGE SECURE CHANNEL', ins=0x73, cla=
p2 = hdr[3]
if p1 & 0x7 == 0: # retrieve UICC Endpoints
return 2
if p1 & 0xf in [1,2,3]: # establish sa, start secure channel SA
elif p1 & 0xf in [1,2,3]: # establish sa, start secure channel SA
p2_cmd = p2 >> 5
if p2_cmd in [0,2,4]: # command data
return 3
if p2_cmd in [1,3,5]: # response data
elif p2_cmd in [1,3,5]: # response data
return 2
if p1 & 0xf == 4: # terminate secure channel SA
elif p1 & 0xf == 4: # terminate secure channel SA
return 3
raise ValueError('%s: Unable to detect APDU case for %s' % (cls.__name__, b2h(hdr)))
@@ -435,7 +436,8 @@ class TransactData(ApduCommand, n='TRANSACT DATA', ins=0x75, cla=['0X', '4X', '6
p1 = hdr[2]
if p1 & 0x04:
return 3
return 2
else:
return 2
# TS 102 221 Section 11.1.22
class SuspendUicc(ApduCommand, n='SUSPEND UICC', ins=0x76, cla=['80']):

View File

@@ -9,12 +9,12 @@ APDU commands of 3GPP TS 31.102 V16.6.0
"""
from typing import Dict
from construct import BitStruct, Enum, BitsInteger, Int8ub, Bytes, this, Struct, If, Switch, Const
from construct import *
from construct import Optional as COptional
from pySim.filesystem import *
from pySim.construct import *
from pySim.ts_31_102 import SUCI_TlvDataObject
from pySim.apdu import ApduCommand, ApduCommandSet
# Copyright (C) 2022 Harald Welte <laforge@osmocom.org>
@@ -35,6 +35,8 @@ from pySim.apdu import ApduCommand, ApduCommandSet
# Mapping between USIM Service Number and its description
from pySim.apdu import ApduCommand, ApduCommandSet
# TS 31.102 Section 7.1
class UsimAuthenticateEven(ApduCommand, n='AUTHENTICATE', ins=0x88, cla=['0X', '4X', '6X']):
_apdu_case = 4

View File

@@ -14,6 +14,7 @@ class ApduSource(abc.ABC):
@abc.abstractmethod
def read_packet(self) -> PacketType:
"""Read one packet from the source."""
pass
def read(self) -> Union[Apdu, CardReset]:
"""Main function to call by the user: Blocking read, returns Apdu or CardReset."""
@@ -30,5 +31,5 @@ class ApduSource(abc.ABC):
elif isinstance(r, CardReset):
apdu = r
else:
raise ValueError('Unknown read_packet() return %s' % r)
ValueError('Unknown read_packet() return %s' % r)
return apdu

View File

@@ -16,14 +16,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.gsmtap import GsmtapSource
from pySim.gsmtap import GsmtapMessage, GsmtapSource
from . import ApduSource, PacketType, CardReset
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
from . import ApduSource, PacketType, CardReset
ApduCommands = UiccApduCommands + UsimApduCommands + GpApduCommands
class GsmtapApduSource(ApduSource):
@@ -43,16 +41,16 @@ class GsmtapApduSource(ApduSource):
self.gsmtap = GsmtapSource(bind_ip, bind_port)
def read_packet(self) -> PacketType:
gsmtap_msg, _addr = self.gsmtap.read_packet()
gsmtap_msg, addr = self.gsmtap.read_packet()
if gsmtap_msg['type'] != 'sim':
raise ValueError('Unsupported GSMTAP type %s' % gsmtap_msg['type'])
sub_type = gsmtap_msg['sub_type']
if sub_type == 'apdu':
return ApduCommands.parse_cmd_bytes(gsmtap_msg['body'])
if sub_type == 'atr':
elif sub_type == 'atr':
# card has been reset
return CardReset(gsmtap_msg['body'])
if sub_type in ['pps_req', 'pps_rsp']:
elif sub_type in ['pps_req', 'pps_rsp']:
# simply ignore for now
pass
else:

View File

@@ -16,19 +16,20 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import logging
from pprint import pprint as pp
from typing import Tuple
import pyshark
from pySim.utils import h2b
from pySim.utils import h2b, b2h
from pySim.apdu import Tpdu
from pySim.gsmtap import GsmtapMessage
from . import ApduSource, PacketType, CardReset
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
from . import ApduSource, PacketType, CardReset
ApduCommands = UiccApduCommands + UsimApduCommands + GpApduCommands
logger = logging.getLogger(__name__)
@@ -66,10 +67,10 @@ class _PysharkGsmtap(ApduSource):
sub_type = gsmtap_msg['sub_type']
if sub_type == 'apdu':
return ApduCommands.parse_cmd_bytes(gsmtap_msg['body'])
if sub_type == 'atr':
elif sub_type == 'atr':
# card has been reset
return CardReset(gsmtap_msg['body'])
if sub_type in ['pps_req', 'pps_rsp']:
elif sub_type in ['pps_req', 'pps_rsp']:
# simply ignore for now
pass
else:
@@ -86,3 +87,4 @@ class PysharkGsmtapPcap(_PysharkGsmtap):
"""
pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='gsm_sim', use_json=True, keep_packets=False)
super().__init__(pyshark_inst)

View File

@@ -16,11 +16,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import logging
from pprint import pprint as pp
from typing import Tuple
import pyshark
from pySim.utils import h2b
from pySim.utils import h2b, b2h
from pySim.apdu import Tpdu
from . import ApduSource, PacketType, CardReset

View File

@@ -20,6 +20,7 @@ from pySim.transport import LinkBase
from pySim.commands import SimCardCommands
from pySim.filesystem import CardModel, CardApplication
from pySim.cards import card_detect, SimCardBase, UiccCardBase
from pySim.exceptions import NoCardError
from pySim.runtime import RuntimeState
from pySim.profile import CardProfile
from pySim.cdma_ruim import CardProfileRUIM
@@ -107,3 +108,6 @@ def init_card(sl: LinkBase) -> Tuple[RuntimeState, SimCardBase]:
sl.set_sw_interpreter(rs)
return rs, card

View File

@@ -26,12 +26,11 @@ Support for the Secure Element Access Control, specifically the ARA-M inside an
#
from construct import GreedyBytes, GreedyString, Struct, Enum, Int8ub, Int16ub
from construct import *
from construct import Optional as COptional
from pySim.construct import *
from pySim.filesystem import *
from pySim.tlv import *
from pySim.utils import Hexstr
import pySim.global_platform
# various BER-TLV encoded Data Objects (DOs)
@@ -69,10 +68,11 @@ class ApduArDO(BER_TLV_IE, tag=0xd0):
if do[0] == 0x00:
self.decoded = {'generic_access_rule': 'never'}
return self.decoded
if do[0] == 0x01:
elif do[0] == 0x01:
self.decoded = {'generic_access_rule': 'always'}
return self.decoded
return ValueError('Invalid 1-byte generic APDU access rule')
else:
return ValueError('Invalid 1-byte generic APDU access rule')
else:
if len(do) % 8:
return ValueError('Invalid non-modulo-8 length of APDU filter: %d' % len(do))
@@ -88,9 +88,10 @@ class ApduArDO(BER_TLV_IE, tag=0xd0):
if 'generic_access_rule' in self.decoded:
if self.decoded['generic_access_rule'] == 'never':
return b'\x00'
if self.decoded['generic_access_rule'] == 'always':
elif self.decoded['generic_access_rule'] == 'always':
return b'\x01'
return ValueError('Invalid 1-byte generic APDU access rule')
else:
return ValueError('Invalid 1-byte generic APDU access rule')
else:
if not 'apdu_filter' in self.decoded:
return ValueError('Invalid APDU AR DO')
@@ -276,13 +277,14 @@ class ADF_ARAM(CardADF):
cmd_do_enc = b''
cmd_do_len = 0
c_apdu = hdr + ('%02x' % cmd_do_len) + b2h(cmd_do_enc)
(data, _sw) = tp.send_apdu_checksw(c_apdu, exp_sw)
(data, sw) = tp.send_apdu_checksw(c_apdu, exp_sw)
if data:
if resp_cls:
resp_do = resp_cls()
resp_do.from_tlv(h2b(data))
return resp_do
return data
else:
return data
else:
return None
@@ -304,13 +306,16 @@ class ADF_ARAM(CardADF):
@with_default_category('Application-Specific Commands')
class AddlShellCommands(CommandSet):
def do_aram_get_all(self, _opts):
def __init(self):
super().__init__()
def do_aram_get_all(self, opts):
"""GET DATA [All] on the ARA-M Applet"""
res_do = ADF_ARAM.get_all(self._cmd.lchan.scc._tp)
if res_do:
self._cmd.poutput_json(res_do.to_dict())
def do_aram_get_config(self, _opts):
def do_aram_get_config(self, opts):
"""Perform GET DATA [Config] on the ARA-M Applet: Tell it our version and retrieve its version."""
res_do = ADF_ARAM.get_config(self._cmd.lchan.scc._tp)
if res_do:
@@ -348,7 +353,7 @@ class ADF_ARAM(CardADF):
"""Perform STORE DATA [Command-Store-REF-AR-DO] to store a (new) access rule."""
# REF
ref_do_content = []
if opts.aid is not None:
if opts.aid != None:
ref_do_content += [{'aid_ref_do': opts.aid}]
elif opts.aid_empty:
ref_do_content += [{'aid_ref_empty_do': None}]
@@ -377,7 +382,7 @@ class ADF_ARAM(CardADF):
if res_do:
self._cmd.poutput_json(res_do.to_dict())
def do_aram_delete_all(self, _opts):
def do_aram_delete_all(self, opts):
"""Perform STORE DATA [Command-Delete[all]] to delete all access rules."""
deldo = CommandDelete()
res_do = ADF_ARAM.store_data(self._cmd.lchan.scc._tp, deldo)

View File

@@ -24,11 +24,12 @@ there are also automatic card feeders.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from pySim.transport import LinkBase
import subprocess
import sys
import yaml
from pySim.transport import LinkBase
class CardHandlerBase:
"""Abstract base class representing a mechanism for card insertion/removal."""
@@ -96,7 +97,7 @@ class CardHandlerAuto(CardHandlerBase):
print("Card handler Config-file: " + str(config_file))
with open(config_file) as cfg:
self.cmds = yaml.load(cfg, Loader=yaml.FullLoader)
self.verbose = self.cmds.get('verbose') is True
self.verbose = (self.cmds.get('verbose') == True)
def __print_outout(self, out):
print("")

View File

@@ -54,11 +54,11 @@ class CardKeyProvider(abc.ABC):
dictionary of {field, value} strings for each requested field from 'fields'
"""
for f in fields:
if f not in self.VALID_FIELD_NAMES:
if (f not in self.VALID_FIELD_NAMES):
raise ValueError("Requested field name '%s' is not a valid field name, valid field names are: %s" %
(f, str(self.VALID_FIELD_NAMES)))
if key not in self.VALID_FIELD_NAMES:
if (key not in self.VALID_FIELD_NAMES):
raise ValueError("Key field name '%s' is not a valid field name, valid field names are: %s" %
(key, str(self.VALID_FIELD_NAMES)))

View File

@@ -22,9 +22,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Optional, Tuple
from typing import Optional, Dict, Tuple
from pySim.ts_102_221 import EF_DIR
from pySim.ts_51_011 import DF_GSM
import abc
from pySim.utils import *
from pySim.commands import Path, SimCardCommands
@@ -39,7 +40,8 @@ class CardBase:
rc = self._scc.reset_card()
if rc == 1:
return self._scc.get_atr()
return None
else:
return None
def set_apdu_parameter(self, cla: Hexstr, sel_ctrl: Hexstr) -> None:
"""Set apdu parameters (class byte and selection control bytes)"""
@@ -52,6 +54,7 @@ class CardBase:
def erase(self):
print("warning: erasing is not supported for specified card type!")
return
def file_exists(self, fid: Path) -> bool:
res_arr = self._scc.try_select_path(fid)
@@ -72,7 +75,7 @@ class SimCardBase(CardBase):
name = 'SIM'
def __init__(self, scc: SimCardCommands):
super().__init__(scc)
super(SimCardBase, self).__init__(scc)
self._scc.cla_byte = "A0"
self._scc.sel_ctrl = "0000"
@@ -85,7 +88,7 @@ class UiccCardBase(SimCardBase):
name = 'UICC'
def __init__(self, scc: SimCardCommands):
super().__init__(scc)
super(UiccCardBase, self).__init__(scc)
self._scc.cla_byte = "00"
self._scc.sel_ctrl = "0004" # request an FCP
# See also: ETSI TS 102 221, Table 9.3
@@ -155,8 +158,9 @@ class UiccCardBase(SimCardBase):
aid_full = self._complete_aid(aid)
if aid_full:
return scc.select_adf(aid_full)
# If we cannot get the full AID, try with short AID
return scc.select_adf(aid)
else:
# If we cannot get the full AID, try with short AID
return scc.select_adf(aid)
return (None, None)
def card_detect(scc: SimCardCommands) -> Optional[CardBase]:

View File

@@ -18,14 +18,14 @@ as described in 3GPP TS 31.111."""
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import List
from bidict import bidict
from construct import Int8ub, Int16ub, Byte, Bytes, BitsInteger
from construct import Struct, Enum, BitStruct, this
from construct import GreedyBytes, Switch, GreedyRange, FlagsEnum
from typing import List
from pySim.utils import b2h, h2b, dec_xplmn_w_act
from pySim.tlv import TLV_IE, COMPR_TLV_IE, BER_TLV_IE, TLV_IE_Collection
from pySim.construct import PlmnAdapter, BcdAdapter, HexAdapter, GsmStringAdapter, TonNpi
from pySim.utils import b2h, dec_xplmn_w_act
from construct import Int8ub, Int16ub, Byte, Bytes, Bit, Flag, BitsInteger
from construct import Struct, Enum, Tell, BitStruct, this, Padding, RepeatUntil
from construct import GreedyBytes, Switch, GreedyRange, FlagsEnum
# Tag values as per TS 101 220 Table 7.23
@@ -583,11 +583,11 @@ class ActivateDescriptor(COMPR_TLV_IE, tag=0xFB):
# TS 31.111 Section 8.90
class PlmnWactList(COMPR_TLV_IE, tag=0xF2):
def _from_bytes(self, do: bytes):
def _from_bytes(self, x):
r = []
i = 0
while i < len(do):
r.append(dec_xplmn_w_act(b2h(do[i:i+5])))
while i < len(x):
r.append(dec_xplmn_w_act(b2h(x[i:i+5])))
i += 5
return r
@@ -978,8 +978,8 @@ class ProactiveCommandBase(BER_TLV_IE, tag=0xD0, nested=[CommandDetails]):
for c in self.children:
if type(c).__name__ == 'CommandDetails':
return c
else:
return None
else:
return None
class ProactiveCommand(TLV_IE_Collection,
nested=[Refresh, MoreTime, PollInterval, PollingOff, SetUpEventList, SetUpCall,
@@ -997,7 +997,7 @@ class ProactiveCommand(TLV_IE_Collection,
more difficult than any normal TLV IE Collection, because the content of one of the IEs defines the
definitions of all the other IEs. So we first need to find the CommandDetails, and then parse according
to the command type indicated in that IE data."""
def from_bytes(self, binary: bytes, context: dict = {}) -> List[TLV_IE]:
def from_bytes(self, binary: bytes) -> List[TLV_IE]:
# do a first parse step to get the CommandDetails
pcmd = ProactiveCommandBase()
pcmd.from_tlv(binary)
@@ -1007,7 +1007,7 @@ class ProactiveCommand(TLV_IE_Collection,
if cmd_type in self.members_by_tag:
cls = self.members_by_tag[cmd_type]
inst = cls()
_dec, remainder = inst.from_tlv(binary)
dec, remainder = inst.from_tlv(binary)
self.decoded = inst
else:
self.decoded = pcmd
@@ -1019,7 +1019,7 @@ class ProactiveCommand(TLV_IE_Collection,
def to_dict(self):
return self.decoded.to_dict()
def to_bytes(self, context: dict = {}):
def to_bytes(self):
return self.decoded.to_tlv()

View File

@@ -19,8 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
import enum
from construct import Bytewise, BitStruct, BitsInteger, Struct, FlagsEnum
from pySim.utils import *
from pySim.filesystem import *
from pySim.profile import match_ruim
@@ -29,6 +27,7 @@ from pySim.ts_51_011 import CardProfileSIM
from pySim.ts_51_011 import DF_TELECOM, DF_GSM
from pySim.ts_51_011 import EF_ServiceTable
from pySim.construct import *
from construct import *
# Mapping between CDMA Service Number and its description
@@ -186,9 +185,9 @@ class CardProfileRUIM(CardProfile):
sel_ctrl="0000", files_in_mf=[DF_TELECOM(), DF_GSM(), DF_CDMA()])
@staticmethod
def decode_select_response(data_hex: str) -> object:
def decode_select_response(resp_hex: str) -> object:
# TODO: Response parameters/data in case of DF_CDMA (section 2.6)
return CardProfileSIM.decode_select_response(data_hex)
return CardProfileSIM.decode_select_response(resp_hex)
@staticmethod
def match_with_card(scc: SimCardCommands) -> bool:

View File

@@ -21,13 +21,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List, Tuple
from typing import List, Optional, Tuple
import typing # construct also has a Union, so we do typing.Union below
from construct import Construct, Struct, Const, Select
from construct import Optional as COptional
from construct import *
from pySim.construct import LV, filter_dict
from pySim.utils import rpad, lpad, b2h, h2b, sw_match, bertlv_encode_len, h2i, i2h, str_sanitize, expand_hex, SwMatchstr
from pySim.utils import rpad, lpad, b2h, h2b, sw_match, bertlv_encode_len, Hexstr, h2i, i2h, str_sanitize, expand_hex, SwMatchstr
from pySim.utils import Hexstr, SwHexstr, ResTuple
from pySim.exceptions import SwMatchError
from pySim.transport import LinkBase
@@ -84,14 +83,6 @@ class SimCardCommands:
"""Return the (cached) patched default CLA byte for this card."""
return self._cla4lchan
@property
def max_cmd_len(self) -> int:
"""Maximum length of the command apdu data section. Depends on secure channel protocol used."""
if self.scp:
return 255 - self.scp.overhead
else:
return 255
@cla_byte.setter
def cla_byte(self, new_val: Hexstr):
"""Set the (raw, without lchan) default CLA value for this card."""
@@ -258,7 +249,7 @@ class SimCardCommands:
"""
rv = []
if not isinstance(dir_list, list):
if type(dir_list) is not list:
dir_list = [dir_list]
for i in dir_list:
data, sw = self.send_apdu(self.cla_byte + "a4" + self.sel_ctrl + "02" + i)
@@ -277,10 +268,10 @@ class SimCardCommands:
list of return values (FCP in hex encoding) for each element of the path
"""
rv = []
if not isinstance(dir_list, list):
if type(dir_list) is not list:
dir_list = [dir_list]
for i in dir_list:
data, _sw = self.select_file(i)
data, sw = self.select_file(i)
rv.append(data)
return rv
@@ -326,14 +317,14 @@ class SimCardCommands:
total_data = ''
chunk_offset = 0
while chunk_offset < length:
chunk_len = min(self.max_cmd_len, length-chunk_offset)
chunk_len = min(255, length-chunk_offset)
pdu = self.cla_byte + \
'b0%04x%02x' % (offset + chunk_offset, chunk_len)
try:
data, sw = self.send_apdu_checksw(pdu)
except Exception as e:
raise ValueError('%s, failed to read (offset %d)' %
(str_sanitize(str(e)), offset)) from e
(str_sanitize(str(e)), offset))
total_data += data
chunk_offset += chunk_len
return total_data, sw
@@ -384,7 +375,7 @@ class SimCardCommands:
total_data = ''
chunk_offset = 0
while chunk_offset < data_length:
chunk_len = min(self.max_cmd_len, data_length - chunk_offset)
chunk_len = min(255, data_length - chunk_offset)
# chunk_offset is bytes, but data slicing is hex chars, so we need to multiply by 2
pdu = self.cla_byte + \
'd6%04x%02x' % (offset + chunk_offset, chunk_len) + \
@@ -393,7 +384,7 @@ class SimCardCommands:
chunk_data, chunk_sw = self.send_apdu_checksw(pdu)
except Exception as e:
raise ValueError('%s, failed to write chunk (chunk_offset %d, chunk_len %d)' %
(str_sanitize(str(e)), chunk_offset, chunk_len)) from e
(str_sanitize(str(e)), chunk_offset, chunk_len))
total_data += data
chunk_offset += chunk_len
if verify:
@@ -449,10 +440,10 @@ class SimCardCommands:
else:
# make sure the input data is padded to the record length using 0xFF.
# In cases where the input data exceed we throw an exception.
if len(data) // 2 > rec_length:
if (len(data) // 2 > rec_length):
raise ValueError('Data length exceeds record length (expected max %d, got %d)' % (
rec_length, len(data) // 2))
elif len(data) // 2 < rec_length:
elif (len(data) // 2 < rec_length):
if leftpad:
data = lpad(data, rec_length * 2)
else:
@@ -527,7 +518,7 @@ class SimCardCommands:
# retrieve first block
data, sw = self._retrieve_data(tag, first=True)
total_data += data
while sw in ['62f1', '62f2']:
while sw == '62f1' or sw == '62f2':
data, sw = self._retrieve_data(tag, first=False)
total_data += data
return total_data, sw
@@ -538,7 +529,7 @@ class SimCardCommands:
p1 = 0x80
else:
p1 = 0x00
if isinstance(data, (bytes, bytearray)):
if isinstance(data, bytes) or isinstance(data, bytearray):
data = b2h(data)
pdu = self.cla4lchan('80') + 'db00%02x%02x%s' % (p1, len(data)//2, data)
return self.send_apdu_checksw(pdu)
@@ -568,10 +559,10 @@ class SimCardCommands:
total_len = len(tlv_bin)
remaining = tlv_bin
while len(remaining) > 0:
fragment = remaining[:self.max_cmd_len]
fragment = remaining[:255]
rdata, sw = self._set_data(fragment, first=first)
first = False
remaining = remaining[self.max_cmd_len:]
remaining = remaining[255:]
return rdata, sw
def run_gsm(self, rand: Hexstr) -> ResTuple:
@@ -594,9 +585,10 @@ class SimCardCommands:
context : 16 byte random data ('3g' or 'gsm')
"""
# 3GPP TS 31.102 Section 7.1.2.1
AuthCmd3G = Struct('rand'/LV, 'autn'/COptional(LV))
AuthCmd3G = Struct('rand'/LV, 'autn'/Optional(LV))
AuthResp3GSyncFail = Struct(Const(b'\xDC'), 'auts'/LV)
AuthResp3GSuccess = Struct(Const(b'\xDB'), 'res'/LV, 'ck'/LV, 'ik'/LV, 'kc'/COptional(LV))
AuthResp3GSuccess = Struct(
Const(b'\xDB'), 'res'/LV, 'ck'/LV, 'ik'/LV, 'kc'/Optional(LV))
AuthResp3G = Select(AuthResp3GSyncFail, AuthResp3GSuccess)
# build parameters
cmd_data = {'rand': rand, 'autn': autn}
@@ -674,7 +666,7 @@ class SimCardCommands:
if sw_match(sw, '63cx'):
raise RuntimeError('Failed to %s chv_no 0x%02X with code 0x%s, %i tries left.' %
(op_name, chv_no, b2h(pin_code).upper(), int(sw[3])))
if sw != '9000':
elif (sw != '9000'):
raise SwMatchError(sw, '9000')
def verify_chv(self, chv_no: int, code: Hexstr) -> ResTuple:
@@ -769,28 +761,30 @@ class SimCardCommands:
def encode_duration(secs: int) -> Hexstr:
if secs >= 10*24*60*60:
return '04%02x' % (secs // (10*24*60*60))
if secs >= 24*60*60:
elif secs >= 24*60*60:
return '03%02x' % (secs // (24*60*60))
if secs >= 60*60:
elif secs >= 60*60:
return '02%02x' % (secs // (60*60))
if secs >= 60:
elif secs >= 60:
return '01%02x' % (secs // 60)
return '00%02x' % secs
else:
return '00%02x' % secs
def decode_duration(enc: Hexstr) -> int:
time_unit = enc[:2]
length = h2i(enc[2:4])[0]
if time_unit == '04':
return length * 10*24*60*60
if time_unit == '03':
elif time_unit == '03':
return length * 24*60*60
if time_unit == '02':
elif time_unit == '02':
return length * 60*60
if time_unit == '01':
elif time_unit == '01':
return length * 60
if time_unit == '00':
elif time_unit == '00':
return length
raise ValueError('Time unit must be 0x00..0x04')
else:
raise ValueError('Time unit must be 0x00..0x04')
min_dur_enc = encode_duration(min_len_secs)
max_dur_enc = encode_duration(max_len_secs)
data, sw = self.send_apdu_checksw('8076000004' + min_dur_enc + max_dur_enc)

View File

@@ -1,20 +1,15 @@
"""Utility code related to the integration of the 'construct' declarative parser."""
from construct.lib.containers import Container, ListContainer
from construct.core import EnumIntegerString
import typing
from construct import *
from construct.core import evaluate, BitwisableString
from construct.lib import integertypes
from pySim.utils import b2h, h2b, swap_nibbles
import gsm0338
import codecs
import ipaddress
import gsm0338
from construct.lib.containers import Container, ListContainer
from construct.core import EnumIntegerString
from construct import Adapter, Prefixed, Int8ub, GreedyBytes, Default, Flag, Byte, Construct, Enum
from construct import BitsInteger, BitStruct, Bytes, StreamError, stream_read_entire, stream_write
from construct import SizeofError, IntegerError, swapbytes
from construct.core import evaluate
from construct.lib import integertypes
from pySim.utils import b2h, h2b, swap_nibbles
"""Utility code related to the integration of the 'construct' declarative parser."""
# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
#
@@ -47,7 +42,7 @@ class Utf8Adapter(Adapter):
def _decode(self, obj, context, path):
# In case the string contains only 0xff bytes we interpret it as an empty string
if obj == b'\xff' * len(obj):
return ""
return ""
return codecs.decode(obj, "utf-8")
def _encode(self, obj, context, path):
@@ -59,7 +54,7 @@ class GsmOrUcs2Adapter(Adapter):
def _decode(self, obj, context, path):
# In case the string contains only 0xff bytes we interpret it as an empty string
if obj == b'\xff' * len(obj):
return ""
return ""
# one of the magic bytes of TS 102 221 Annex A
if obj[0] in [0x80, 0x81, 0x82]:
ad = Ucs2Adapter(GreedyBytes)
@@ -82,7 +77,7 @@ class Ucs2Adapter(Adapter):
def _decode(self, obj, context, path):
# In case the string contains only 0xff bytes we interpret it as an empty string
if obj == b'\xff' * len(obj):
return ""
return ""
if obj[0] == 0x80:
# TS 102 221 Annex A Variant 1
return codecs.decode(obj[1:], 'utf_16_be')
@@ -180,7 +175,7 @@ class Ucs2Adapter(Adapter):
def _encode_variant1(instr: str) -> bytes:
"""Encode according to TS 102 221 Annex A Variant 1"""
return b'\x80' + codecs.encode(instr, 'utf_16_be')
return b'\x80' + codecs.encode(obj, 'utf_16_be')
def _encode_variant2(instr: str) -> bytes:
"""Encode according to TS 102 221 Annex A Variant 2"""
@@ -199,7 +194,7 @@ class Ucs2Adapter(Adapter):
assert codepoint_prefix == c_prefix
enc = (0x80 + (c_codepoint & 0x7f)).to_bytes(1, byteorder='big')
chars += enc
if codepoint_prefix is None:
if codepoint_prefix == None:
codepoint_prefix = 0
return hdr + codepoint_prefix.to_bytes(1, byteorder='big') + chars
@@ -269,9 +264,9 @@ class InvertAdapter(Adapter):
# skip all private entries
if k.startswith('_'):
continue
if v is False:
if v == False:
obj[k] = True
elif v is True:
elif v == True:
obj[k] = False
return obj
@@ -368,37 +363,6 @@ class Ipv6Adapter(Adapter):
ia = ipaddress.IPv6Address(obj)
return ia.packed
class StripTrailerAdapter(Adapter):
"""
Encoder removes all trailing bytes matching the default_value
Decoder pads input data up to total_length with default_value
This is used in constellations like "FlagsEnum(StripTrailerAdapter(GreedyBytes, 3), ..."
where you have a bit-mask that may have 1, 2 or 3 bytes, depending on whether or not any
of the LSBs are actually set.
"""
def __init__(self, subcon, total_length:int, default_value=b'\x00', min_len=1):
super().__init__(subcon)
assert len(default_value) == 1
self.total_length = total_length
self.default_value = default_value
self.min_len = min_len
def _decode(self, obj, context, path):
assert isinstance(obj, bytes)
# pad with suppressed/missing bytes
if len(obj) < self.total_length:
obj += self.default_value * (self.total_length - len(obj))
return int.from_bytes(obj, 'big')
def _encode(self, obj, context, path):
assert isinstance(obj, int)
obj = obj.to_bytes(self.total_length, 'big')
# remove trailing bytes if they are zero
while len(obj) > self.min_len and obj[-1] == self.default_value[0]:
obj = obj[:-1]
return obj
def filter_dict(d, exclude_prefix='_'):
"""filter the input dict to ensure no keys starting with 'exclude_prefix' remain."""
@@ -408,20 +372,20 @@ def filter_dict(d, exclude_prefix='_'):
for (key, value) in d.items():
if key.startswith(exclude_prefix):
continue
if isinstance(value, dict):
if type(value) is dict:
res[key] = filter_dict(value)
else:
res[key] = value
return res
def normalize_construct(c, exclude_prefix: str = '_'):
def normalize_construct(c):
"""Convert a construct specific type to a related base type, mostly useful
so we can serialize it."""
# we need to include the filter_dict as we otherwise get elements like this
# in the dict: '_io': <_io.BytesIO object at 0x7fdb64e05860> which we cannot json-serialize
c = filter_dict(c, exclude_prefix)
if isinstance(c, (Container, dict)):
c = filter_dict(c)
if isinstance(c, Container) or isinstance(c, dict):
r = {k: normalize_construct(v) for (k, v) in c.items()}
elif isinstance(c, ListContainer):
r = [normalize_construct(x) for x in c]
@@ -444,11 +408,11 @@ def parse_construct(c, raw_bin_data: bytes, length: typing.Optional[int] = None,
# if the input is all-ff, this means the content is undefined. Let's avoid passing StreamError
# exceptions in those situations (which might occur if a length field 0xff is 255 but then there's
# actually less bytes in the remainder of the file.
if all(v == 0xff for v in raw_bin_data):
if all([v == 0xff for v in raw_bin_data]):
return None
else:
raise e
return normalize_construct(parsed, exclude_prefix)
return normalize_construct(parsed)
def build_construct(c, decoded_data, context: dict = {}):
"""Helper function to handle total_len."""
@@ -561,7 +525,8 @@ class GreedyInteger(Construct):
# round up to the minimum number
# of bytes we anticipate
nbytes = max(nbytes, minlen)
if nbytes < minlen:
nbytes = minlen
return nbytes
@@ -572,7 +537,7 @@ class GreedyInteger(Construct):
try:
data = obj.to_bytes(length, byteorder='big', signed=self.signed)
except ValueError as e:
raise IntegerError(str(e), path=path) from e
raise IntegerError(str(e), path=path)
if evaluate(self.swapped, context):
data = swapbytes(data)
stream_write(stream, data, length, path)

View File

@@ -2,10 +2,10 @@ import sys
from typing import Optional
from importlib import resources
import asn1tools
def compile_asn1_subdir(subdir_name:str):
"""Helper function that compiles ASN.1 syntax from all files within given subdir"""
import asn1tools
asn_txt = ''
__ver = sys.version_info
if (__ver.major, __ver.minor) >= (3, 9):

View File

@@ -983,7 +983,7 @@ keyAccess [22] OCTET STRING (SIZE (1)) DEFAULT '00'H,
keyIdentifier [2] OCTET STRING (SIZE (1)),
keyVersionNumber [3] OCTET STRING (SIZE (1)),
keyCounterValue [5] OCTET STRING OPTIONAL,
keyComponents SEQUENCE (SIZE (1..MAX)) OF SEQUENCE {
keyCompontents SEQUENCE (SIZE (1..MAX)) OF SEQUENCE {
keyType [0] OCTET STRING,
keyData [6] OCTET STRING,
macLength[7] UInt8 DEFAULT 8

View File

@@ -23,6 +23,7 @@
# SGP.22 v3.0 Section 2.5.3:
# That block of data is split into segments of a maximum size of 1020 bytes (including the tag, length field and MAC).
MAX_SEGMENT_SIZE = 1020
import abc
from typing import List
@@ -41,8 +42,6 @@ from pySim.utils import bertlv_encode_len, bertlv_parse_one, b2h
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
MAX_SEGMENT_SIZE = 1020
class BspAlgo(abc.ABC):
blocksize: int
@@ -51,11 +50,11 @@ class BspAlgo(abc.ABC):
if in_len % multiple == 0:
return b''
pad_cnt = multiple - (in_len % multiple)
return bytes([padding]) * pad_cnt
return b'\x00' * pad_cnt
def _pad_to_multiple(self, indat: bytes, multiple: int, padding: int = 0) -> bytes:
"""Pad the input data to multiples of 'multiple'."""
return indat + self._get_padding(len(indat), multiple, padding)
return indat + self._get_padding(len(indat), self.blocksize, padding)
def __str__(self):
return self.__class__.__name__
@@ -82,14 +81,17 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
@abc.abstractmethod
def _unpad(self, padded: bytes) -> bytes:
"""Remove the padding from padded data."""
pass
@abc.abstractmethod
def _encrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@abc.abstractmethod
def _decrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
class BspAlgoCryptAES128(BspAlgoCrypt):
name = 'AES-CBC-128'
@@ -164,6 +166,7 @@ class BspAlgoMac(BspAlgo, abc.ABC):
@abc.abstractmethod
def _auth(self, temp_data: bytes) -> bytes:
"""To be implemented by algorithm specific derived class."""
pass
class BspAlgoMacAES128(BspAlgoMac):
name = 'AES-CMAC-128'
@@ -286,7 +289,7 @@ class BspInstance:
def demac_only_one(self, ciphertext: bytes) -> bytes:
payload = self.m_algo.verify(ciphertext)
_tdict, _l, val, _remain = bertlv_parse_one(payload)
tdict, l, val, remain = bertlv_parse_one(payload)
return val
def demac_only(self, ciphertext_list: List[bytes]) -> bytes:

View File

@@ -1,458 +0,0 @@
"""GSMA eSIM RSP ES2+ interface according to SGP.22 v2.5"""
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
import requests
import logging
import json
from datetime import datetime
import time
import base64
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class ApiParam(abc.ABC):
"""A class reprsenting a single parameter in the ES2+ API."""
@classmethod
def verify_decoded(cls, data):
"""Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd."""
pass
@classmethod
def verify_encoded(cls, data):
"""Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd."""
pass
@classmethod
def encode(cls, data):
"""[Validate and] Encode the given value."""
cls.verify_decoded(data)
encoded = cls._encode(data)
cls.verify_decoded(encoded)
return encoded
@classmethod
def _encode(cls, data):
"""encoder function, typically [but not always] overridden by derived class."""
return data
@classmethod
def decode(cls, data):
"""[Validate and] Decode the given value."""
cls.verify_encoded(data)
decoded = cls._decode(data)
cls.verify_decoded(decoded)
return decoded
@classmethod
def _decode(cls, data):
"""decoder function, typically [but not always] overridden by derived class."""
return data
class ApiParamString(ApiParam):
"""Base class representing an API parameter of 'string' type."""
pass
class ApiParamInteger(ApiParam):
"""Base class representing an API parameter of 'integer' type."""
@classmethod
def _decode(cls, data):
return int(data)
@classmethod
def _encode(cls, data):
return str(data)
@classmethod
def verify_decoded(cls, data):
if not isinstance(data, int):
raise TypeError('Expected an integer input data type')
@classmethod
def verify_encoded(cls, data):
if not data.isdecimal():
raise ValueError('integer (%s) contains non-decimal characters' % data)
assert str(int(data)) == data
class ApiParamBoolean(ApiParam):
"""Base class representing an API parameter of 'boolean' type."""
@classmethod
def _encode(cls, data):
return bool(data)
class ApiParamFqdn(ApiParam):
"""String, as a list of domain labels concatenated using the full stop (dot, period) character as
separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5
of ISO/IEC 18004"""
@classmethod
def verify_encoded(cls, data):
# FIXME
pass
class param:
class Iccid(ApiParamString):
"""String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding
character F."""
@classmethod
def _encode(cls, data):
data = str(data)
# SGP.22 version prior to 2.2 do not require support for 19-digit ICCIDs, so let's always
# encode it with padding F at the end.
if len(data) == 19:
data += 'F'
return data
@classmethod
def verify_encoded(cls, data):
if len(data) not in [19, 20]:
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
@classmethod
def _decode(cls, data):
# strip trailing padding (if it's 20 digits)
if len(data) == 20 and data[-1] in ['F', 'f']:
data = data[:-1]
return data
@classmethod
def verify_decoded(cls, data):
data = str(data)
if len(data) not in [19, 20]:
raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
if len(data) == 19:
decimal_part = data
else:
decimal_part = data[:-1]
final_part = data[-1:]
if final_part not in ['F', 'f'] and not final_part.isdecimal():
raise ValueError('ICCID (%s) contains non-decimal characters' % data)
if not decimal_part.isdecimal():
raise ValueError('ICCID (%s) contains non-decimal characters' % data)
class Eid(ApiParamString):
"""String of 32 decimal characters"""
@classmethod
def verify_encoded(cls, data):
if len(data) != 32:
raise ValueError('EID length invalid: "%s" (%u)' % (data, len(data)))
@classmethod
def verify_decoded(cls, data):
if not data.isdecimal():
raise ValueError('EID (%s) contains non-decimal characters' % data)
class ProfileType(ApiParamString):
pass
class MatchingId(ApiParamString):
pass
class ConfirmationCode(ApiParamString):
pass
class SmdsAddress(ApiParamFqdn):
pass
class SmdpAddress(ApiParamFqdn):
pass
class ReleaseFlag(ApiParamBoolean):
pass
class FinalProfileStatusIndicator(ApiParamString):
pass
class Timestamp(ApiParamString):
"""String format as specified by W3C: YYYY-MM-DDThh:mm:ssTZD"""
@classmethod
def _decode(cls, data):
return datetime.fromisoformat(data)
@classmethod
def _encode(cls, data):
return datetime.toisoformat(data)
class NotificationPointId(ApiParamInteger):
pass
class NotificationPointStatus(ApiParam):
pass
class ResultData(ApiParam):
@classmethod
def _decode(cls, data):
return base64.b64decode(data)
@classmethod
def _encode(cls, data):
return base64.b64encode(data)
class JsonResponseHeader(ApiParam):
"""SGP.22 section 6.5.1.4."""
@classmethod
def verify_decoded(cls, data):
fe_status = data.get('functionExecutionStatus')
if not fe_status:
raise ValueError('Missing mandatory functionExecutionStatus in header')
status = fe_status.get('status')
if not status:
raise ValueError('Missing mandatory status in header functionExecutionStatus')
if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
raise ValueError('Unknown/unspecified status "%s"' % status)
class HttpStatusError(Exception):
pass
class HttpHeaderError(Exception):
pass
class Es2PlusApiError(Exception):
"""Exception representing an error at the ES2+ API level (status != Executed)."""
def __init__(self, func_ex_status: dict):
self.status = func_ex_status['status']
sec = {
'subjectCode': None,
'reasonCode': None,
'subjectIdentifier': None,
'message': None,
}
actual_sec = func_ex_status.get('statusCodeData', None)
sec.update(actual_sec)
self.subject_code = sec['subjectCode']
self.reason_code = sec['reasonCode']
self.subject_id = sec['subjectIdentifier']
self.message = sec['message']
def __str__(self):
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
class Es2PlusApiFunction(abc.ABC):
"""Base classs for representing an ES2+ API Function."""
# the below class variables are expected to be overridden in derived classes
path = None
# dictionary of input parameters. key is parameter name, value is ApiParam class
input_params = {}
# list of mandatory input parameters
input_mandatory = []
# dictionary of output parameters. key is parameter name, value is ApiParam class
output_params = {}
# list of mandatory output parameters (for successful response)
output_mandatory = []
# expected HTTP status code of the response
expected_http_status = 200
def __init__(self, url_prefix: str, func_req_id: str, session):
self.url_prefix = url_prefix
self.func_req_id = func_req_id
self.session = session
def encode(self, data: dict, func_call_id: str) -> dict:
"""Validate an encode input dict into JSON-serializable dict for request body."""
output = {
'header': {
'functionRequesterIdentifier': self.func_req_id,
'functionCallIdentifier': func_call_id
}
}
for p in self.input_mandatory:
if not p in data:
raise ValueError('Mandatory input parameter %s missing' % p)
for p, v in data.items():
p_class = self.input_params.get(p)
if not p_class:
logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
output[p] = v
else:
output[p] = p_class.encode(v)
return output
def decode(self, data: dict) -> dict:
"""[further] Decode and validate the JSON-Dict of the respnse body."""
output = {}
# let's first do the header, it's special
if not 'header' in data:
raise ValueError('Mandatory output parameter "header" missing')
hdr_class = self.output_params.get('header')
output['header'] = hdr_class.decode(data['header'])
if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
raise Es2PlusApiError(output['header']['functionExecutionStatus'])
# we can only expect mandatory parameters to be present in case of successful execution
for p in self.output_mandatory:
if p == 'header':
continue
if not p in data:
raise ValueError('Mandatory output parameter "%s" missing' % p)
for p, v in data.items():
p_class = self.output_params.get(p)
if not p_class:
logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v)
output[p] = v
else:
output[p] = p_class.decode(v)
return output
def call(self, data: dict, func_call_id:str, timeout=10) -> dict:
"""Make an API call to the ES2+ API endpoint represented by this object.
Input data is passed in `data` as json-serializable dict. Output data
is returned as json-deserialized dict."""
url = self.url_prefix + self.path
encoded = json.dumps(self.encode(data, func_call_id))
headers = {
'Content-Type': 'application/json',
'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
}
logger.debug("HTTP REQ %s - '%s'" % (url, encoded))
response = self.session.post(url, data=encoded, headers=headers, timeout=timeout)
logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
logger.debug("HTTP RSP: %s" % (response.content))
if response.status_code != self.expected_http_status:
raise HttpStatusError(response)
if not response.headers.get('Content-Type').startswith(headers['Content-Type']):
raise HttpHeaderError(response)
if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
raise HttpHeaderError(response)
return self.decode(response.json())
# ES2+ DownloadOrder function (SGP.22 section 5.3.1)
class DownloadOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/downloadOrder'
input_params = {
'eid': param.Eid,
'iccid': param.Iccid,
'profileType': param.ProfileType
}
output_params = {
'header': param.JsonResponseHeader,
'iccid': param.Iccid,
}
output_mandatory = ['header', 'iccid']
# ES2+ ConfirmOrder function (SGP.22 section 5.3.2)
class ConfirmOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/confirmOrder'
input_params = {
'iccid': param.Iccid,
'eid': param.Eid,
'matchingId': param.MatchingId,
'confirmationCode': param.ConfirmationCode,
'smdsAddress': param.SmdsAddress,
'releaseFlag': param.ReleaseFlag,
}
input_mandatory = ['iccid', 'releaseFlag']
output_params = {
'header': param.JsonResponseHeader,
'eid': param.Eid,
'matchingId': param.MatchingId,
'smdpAddress': param.SmdpAddress,
}
output_mandatory = ['header', 'matchingId']
# ES2+ CancelOrder function (SGP.22 section 5.3.3)
class CancelOrder(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/cancelOrder'
input_params = {
'iccid': param.Iccid,
'eid': param.Eid,
'matchingId': param.MatchingId,
'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
}
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
output_params = {
'header': param.JsonResponseHeader,
}
output_mandatory = ['header']
# ES2+ ReleaseProfile function (SGP.22 section 5.3.4)
class ReleaseProfile(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/releaseProfile'
input_params = {
'iccid': param.Iccid,
}
input_mandatory = ['iccid']
output_params = {
'header': param.JsonResponseHeader,
}
output_mandatory = ['header']
# ES2+ HandleDownloadProgress function (SGP.22 section 5.3.5)
class HandleDownloadProgressInfo(Es2PlusApiFunction):
path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
input_params = {
'eid': param.Eid,
'iccid': param.Iccid,
'profileType': param.ProfileType,
'timestamp': param.Timestamp,
'notificationPointId': param.NotificationPointId,
'notificationPointStatus': param.NotificationPointStatus,
'resultData': param.ResultData,
}
input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
expected_http_status = 204
class Es2pApiClient:
"""Main class representing a full ES2+ API client. Has one method for each API function."""
def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
self.func_id = 0
self.session = requests.Session()
if server_cert_verify:
self.session.verify = server_cert_verify
if client_cert:
self.session.cert = client_cert
self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
def _gen_func_id(self) -> str:
"""Generate the next function call id."""
self.func_id += 1
return 'FCI-%u-%u' % (time.time(), self.func_id)
def call_downloadOrder(self, data: dict) -> dict:
"""Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
return self.downloadOrder.call(data, self._gen_func_id())
def call_confirmOrder(self, data: dict) -> dict:
"""Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
return self.confirmOrder.call(data, self._gen_func_id())
def call_cancelOrder(self, data: dict) -> dict:
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
return self.cancelOrder.call(data, self._gen_func_id())
def call_releaseProfile(self, data: dict) -> dict:
"""Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
return self.releaseProfile.call(data, self._gen_func_id())
def call_handleDownloadProgressInfo(self, data: dict) -> dict:
"""Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
return self.handleDownloadProgressInfo.call(data, self._gen_func_id())

View File

@@ -19,10 +19,12 @@
from typing import Optional
import shelve
import copyreg
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography import x509
from collections.abc import MutableMapping
from pySim.esim import compile_asn1_subdir
@@ -96,3 +98,4 @@ class RspSessionState:
class RspSessionStore(shelve.DbfilenameShelf):
"""A derived class as wrapper around the database-backed non-volatile storage 'shelve', in case we might
need to extend it in the future. We use it to store RspSessionState objects indexed by transactionId."""
pass

View File

@@ -16,124 +16,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
import io
from typing import Tuple, List, Optional, Dict, Union
import asn1tools
from pySim.utils import bertlv_parse_tag, bertlv_parse_len
from pySim.ts_102_221 import FileDescriptor
from pySim.construct import build_construct
from pySim.esim import compile_asn1_subdir
from pySim.esim.saip import templates
asn1 = compile_asn1_subdir('saip')
class File:
"""Internal representation of a file in a profile filesystem.
Parameters:
pename: Name string of the profile element
l: List of tuples [fileDescriptor, fillFileContent, fillFileOffset profile elements]
template: Applicable FileTemplate describing defaults as per SAIP spec
"""
def __init__(self, pename: str, l: Optional[List[Tuple]] = None, template: Optional[templates.FileTemplate] = None):
self.pe_name = pename
self.template = template
self.fileDescriptor = {}
self.stream = None
# apply some defaults from profile
if self.template:
self.from_template(self.template)
print("after template: %s" % repr(self))
if l:
self.from_tuples(l)
def from_template(self, template: templates.FileTemplate):
"""Determine defaults for file based on given FileTemplate."""
fdb_dec = {}
self.rec_len = None
if template.fid:
self.fileDescriptor['fileID'] = template.fid.to_bytes(2, 'big')
if template.sfi:
self.fileDescriptor['shortEFID'] = bytes([template.sfi])
if template.arr:
self.fileDescriptor['securityAttributesReferenced'] = bytes([template.arr])
# All the files defined in the templates shall have, by default, shareable/not-shareable bit in the file descriptor set to "shareable".
fdb_dec['shareable'] = True
if template.file_type in ['LF', 'CY']:
fdb_dec['file_type'] = 'working_ef'
if template.rec_len:
self.record_len = template.rec_len
if template.nb_rec and template.rec_len:
self.fileDescriptor['efFileSize'] = (template.nb_rec * template.rec_len).to_bytes(2, 'big') # FIXME
if template.file_type == 'LF':
fdb_dec['structure'] = 'linear_fixed'
elif template.file_type == 'CY':
fdb_dec['structure'] = 'cyclic'
elif template.file_type in ['TR', 'BT']:
fdb_dec['file_type'] = 'working_ef'
if template.file_size:
self.fileDescriptor['efFileSize'] = template.file_size.to_bytes(2, 'big') # FIXME
if template.file_type == 'BT':
fdb_dec['structure'] = 'ber_tlv'
elif template.file_type == 'TR':
fdb_dec['structure'] = 'transparent'
elif template.file_type in ['MF', 'DF', 'ADF']:
fdb_dec['file_type'] = 'df'
fdb_dec['structure'] = 'no_info_given'
# build file descriptor based on above input data
fd_dict = {'file_descriptor_byte': fdb_dec}
if self.rec_len:
fd_dict['record_len'] = self.rec_len
self.fileDescriptor['fileDescriptor'] = build_construct(FileDescriptor._construct, fd_dict)
# FIXME: default_val
# FIXME: high_update
# FIXME: params?
def from_tuples(self, l:List[Tuple]):
"""Parse a list of fileDescriptor, fillFileContent, fillFileOffset tuples into this instance."""
def get_fileDescriptor(l:List[Tuple]):
for k, v in l:
if k == 'fileDescriptor':
return v
fd = get_fileDescriptor(l)
if not fd:
raise ValueError("No fileDescriptor found")
self.fileDescriptor.update(dict(fd))
self.stream = self.linearize_file_content(l)
def to_tuples(self) -> List[Tuple]:
"""Generate a list of fileDescriptor, fillFileContent, fillFileOffset tuples into this instance."""
raise NotImplementedError
@staticmethod
def linearize_file_content(l: List[Tuple]) -> Optional[io.BytesIO]:
"""linearize a list of fillFileContent / fillFileOffset tuples into a stream of bytes."""
stream = io.BytesIO()
for k, v in l:
if k == 'doNotCreate':
return None
if k == 'fileDescriptor':
pass
elif k == 'fillFileOffset':
stream.write(b'\xff' * v)
elif k == 'fillFileContent':
stream.write(v)
else:
return ValueError("Unknown key '%s' in tuple list" % k)
return stream
def __str__(self) -> str:
return "File(%s)" % self.pe_name
def __repr__(self) -> str:
return "File(%s): %s" % (self.pe_name, self.fileDescriptor)
class ProfileElement:
"""Class representing a Profile Element (PE) within a SAIP Profile."""
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
'csim', 'opt-csim', 'eap', 'df-5gs', 'df-saip', 'df-snpn', 'df-5gprose', 'iot', 'opt-iot']
def _fixup_sqnInit_dec(self) -> None:
"""asn1tools has a bug when working with SEQUENCE OF that have DEFAULT values. Let's work around
this."""
@@ -167,31 +59,6 @@ class ProfileElement:
# work around asn1tools bug regarding DEFAULT for a SEQUENCE OF
self._fixup_sqnInit_dec()
@property
def header_name(self) -> str:
"""Return the name of the header field within the profile element."""
# unneccessarry compliaction by inconsistent naming :(
if self.type.startswith('opt-'):
return self.type.replace('-','') + '-header'
return self.type + '-header'
@property
def header(self):
"""Return the decoded ProfileHeader."""
return self.decoded.get(self.header_name, None)
@property
def templateID(self):
"""Return the decoded templateID used by this profile element (if any)."""
return self.decoded.get('templateID', None)
@property
def files(self):
"""Return dict of decoded 'File' ASN.1 items."""
if not self.type in self.FILE_BEARING:
return {}
return {k:v for (k,v) in self.decoded.items() if k not in ['templateID', self.header_name]}
@classmethod
def from_der(cls, der: bytes) -> 'ProfileElement':
"""Construct an instance from given raw, DER encoded bytes."""
@@ -212,7 +79,7 @@ class ProfileElement:
def bertlv_first_segment(binary: bytes) -> Tuple[bytes, bytes]:
"""obtain the first segment of a binary concatenation of BER-TLV objects.
Returns: tuple of first TLV and remainder."""
_tagdict, remainder = bertlv_parse_tag(binary)
tagdict, remainder = bertlv_parse_tag(binary)
length, remainder = bertlv_parse_len(remainder)
tl_length = len(binary) - len(remainder)
tlv_length = tl_length + length
@@ -226,12 +93,9 @@ class ProfileElementSequence:
self.pes_by_naa: Dict = {}
def get_pes_for_type(self, tname: str) -> List[ProfileElement]:
"""Return list of profile elements present for given profile element type."""
return self.pe_by_type.get(tname, [])
def get_pe_for_type(self, tname: str) -> Optional[ProfileElement]:
"""Return a single profile element for given profile element type. Works only for
types of which there is only a signle instance in the PE Sequence!"""
l = self.get_pes_for_type(tname)
if len(l) == 0:
return None
@@ -282,7 +146,7 @@ class ProfileElementSequence:
cur_naa_list = []
cur_naa_list.append(pe)
# append the final one
if cur_naa and len(cur_naa_list) > 0:
if cur_naa and len(cur_naa_list):
if not cur_naa in self.pes_by_naa:
self.pes_by_naa[cur_naa] = []
self.pes_by_naa[cur_naa].append(cur_naa_list)

View File

@@ -1,90 +0,0 @@
# Data sources: Provding data for profile personalization
#
# (C) 2024 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
import secrets
from Cryptodome.Random import get_random_bytes
class DataSource(abc.ABC):
"""Base class for something that can provide data during a personalization process."""
@abc.abstractmethod
def generate_one(self):
pass
class DataSourceFixed(DataSource):
"""A data source that provides a fixed value (of any type).
Parameters:
fixed_value: The fixed value that shall be used during each data generation
"""
def __init__(self, fixed_value, **kwargs):
self.fixed_value = fixed_value
super().__init__(**kwargs)
def generate_one(self):
return self.fixed_value
class DataSourceIncrementing(DataSource):
"""A data source that provides incrementing integer numbers.
Parameters:
base_value: The start value (value returned during first data generation)
step_size: Increment step size (Default: 1)
"""
def __init__(self, base_value: int, **kwargs):
self.base_value = int(base_value)
self.step_size = kwargs.pop('step_size', 1)
self.i = 0
super().__init__(**kwargs)
def generate_one(self):
val = self.base_value + self.i
self.i += self.step_size
return val
class DataSourceRandomBytes(DataSource):
"""A data source that provides a configurable number of random bytes.
Parameters:
size: Number of bytes to generate each turn
"""
def __init__(self, size: int, **kwargs):
self.size = size
super().__init__(**kwargs)
def generate_one(self):
return get_random_bytes(self.size)
class DataSourceRandomUInt(DataSource):
"""A data source that provides a configurable unsigned integer value.
Parameters:
below: Number one greater than the maximum permitted random unsigned integer
"""
def __init__(self, below: int, **kwargs):
self.below = below
super().__init__(**kwargs)
def generate_one(self):
return secrets.randbelow(self.below)

View File

@@ -27,7 +27,7 @@ class OID:
return '.'.join([str(x) for x in intlist])
def __init__(self, initializer: Union[List[int], str]):
if isinstance(initializer, str):
if type(initializer) == str:
self.intlist = self.intlist_from_str(initializer)
else:
self.intlist = initializer
@@ -43,7 +43,7 @@ class eOID(OID):
"""OID helper for TCA eUICC prefix"""
__prefix = [2,23,143,1]
def __init__(self, initializer):
if isinstance(initializer, str):
if type(initializer) == str:
initializer = self.intlist_from_str(initializer)
super().__init__(self.__prefix + initializer)

View File

@@ -16,11 +16,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
import io
from typing import List, Tuple
from typing import List, Tuple, Optional
from pySim.tlv import camel_to_snake
from pySim.utils import enc_iccid, enc_imsi, h2b, rpad, sanitize_iccid
from pySim.esim.saip import ProfileElement, ProfileElementSequence
def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]:
@@ -35,170 +32,42 @@ def file_replace_content(file: List[Tuple], new_content: bytes):
return file
class ClassVarMeta(abc.ABCMeta):
"""Metaclass that puts all additional keyword-args into the class. We use this to have one
class definition for something like a PIN, and then have derived classes for PIN1, PIN2, ..."""
"""Metaclass that puts all additional keyword-args into the class."""
def __new__(metacls, name, bases, namespace, **kwargs):
#print("Meta_new_(metacls=%s, name=%s, bases=%s, namespace=%s, kwargs=%s)" % (metacls, name, bases, namespace, kwargs))
x = super().__new__(metacls, name, bases, namespace)
for k, v in kwargs.items():
setattr(x, k, v)
setattr(x, 'name', camel_to_snake(name))
return x
class ConfigurableParameter(abc.ABC, metaclass=ClassVarMeta):
"""Base class representing a part of the eSIM profile that is configurable during the
personalization process (with dynamic data from elsewhere)."""
def __init__(self, input_value):
self.input_value = input_value # the raw input value as given by caller
self.value = None # the processed input value (e.g. with check digit) as produced by validate()
def validate(self):
"""Optional validation method. Can be used by derived classes to perform validation
of the input value (self.value). Will raise an exception if validation fails."""
# default implementation: simply copy input_value over to value
self.value = self.input_value
def __init__(self, value):
self.value = value
@abc.abstractmethod
def apply(self, pes: ProfileElementSequence):
def apply(self, pe_seq: ProfileElementSequence):
pass
class Iccid(ConfigurableParameter):
"""Configurable ICCID. Expects the value to be a string of decimal digits.
If the string of digits is only 18 digits long, a Luhn check digit will be added."""
def validate(self):
# convert to string as it migt be an integer
iccid_str = str(self.input_value)
if len(iccid_str) < 18 or len(iccid_str) > 20:
raise ValueError('ICCID must be 18, 19 or 20 digits long')
if not iccid_str.isdecimal():
raise ValueError('ICCID must only contain decimal digits')
self.value = sanitize_iccid(iccid_str)
"""Configurable ICCID. Expects the value to be in EF.ICCID format."""
name = 'iccid'
def apply(self, pes: ProfileElementSequence):
# patch the header
pes.get_pe_for_type('header').decoded['iccid'] = h2b(rpad(self.value, 20))
# patch the header; FIXME: swap nibbles!
pes.get_pe_for_type('header').decoded['iccid'] = self.value
# patch MF/EF.ICCID
file_replace_content(pes.get_pe_for_type('mf').decoded['ef-iccid'], h2b(enc_iccid(self.value)))
file_replace_content(pes.get_pe_for_type('mf').decoded['ef-iccid'], bytes(self.value))
class Imsi(ConfigurableParameter):
"""Configurable IMSI. Expects value to be a string of digits. Automatically sets the ACC to
the last digit of the IMSI."""
def validate(self):
# convert to string as it migt be an integer
imsi_str = str(self.input_value)
if len(imsi_str) < 6 or len(imsi_str) > 15:
raise ValueError('IMSI must be 6..15 digits long')
if not imsi_str.isdecimal():
raise ValueError('IMSI must only contain decimal digits')
self.value = imsi_str
"""Configurable IMSI. Expects value to be n EF.IMSI format."""
name = 'imsi'
def apply(self, pes: ProfileElementSequence):
imsi_str = self.value
# we always use the least significant byte of the IMSI as ACC
acc = (1 << int(imsi_str[-1]))
# patch ADF.USIM/EF.IMSI
for pe in pes.get_pes_for_type('usim'):
file_replace_content(pe.decoded['ef-imsi'], h2b(enc_imsi(imsi_str)))
file_replace_content(pe.decoded['ef-acc'], acc.to_bytes(2, 'big'))
for pe in pes.get_pes_by_type('usim'):
file_replace_content(pe.decoded['ef-imsi'], self.value)
# TODO: DF.GSM_ACCESS if not linked?
class SdKey(ConfigurableParameter, metaclass=ClassVarMeta):
"""Configurable Security Domain (SD) Key. Value is presented as bytes."""
# these will be set by derived classes
key_type = None
key_id = None
kvn = None
key_usage_qual = None
permitted_len = None
def validate(self):
if not isinstance(self.input_value, (io.BytesIO, bytes, bytearray)):
raise ValueError('Value must be of bytes-like type')
if self.permitted_len:
if len(self.input_value) not in self.permitted_len:
raise ValueError('Value length must be %s' % self.permitted_len)
self.value = self.input_value
def _apply_sd(self, pe: ProfileElement):
assert pe.type == 'securityDomain'
for key in pe.decoded['keyList']:
if key['keyIdentifier'][0] == self.key_id and key['keyVersionNumber'][0] == self.kvn:
assert len(key['keyComponents']) == 1
key['keyComponents'][0]['keyData'] = self.value
return
# Could not find matching key to patch, create a new one
key = {
'keyUsageQualifier': bytes([self.key_usage_qual]),
'keyIdentifier': bytes([self.key_id]),
'keyVersionNumber': bytes([self.kvn]),
'keyComponents': [
{ 'keyType': bytes([self.key_type]), 'keyData': self.value },
]
}
pe.decoded['keyList'].append(key)
def apply(self, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('securityDomain'):
self._apply_sd(pe)
class SdKeyScp80_01(SdKey, kvn=0x01, key_type=0x88, permitted_len=[16,24,32]): # AES key type
pass
class SdKeyScp80_01Kic(SdKeyScp80_01, key_id=0x01, key_usage_qual=0x18): # FIXME: ordering?
pass
class SdKeyScp80_01Kid(SdKeyScp80_01, key_id=0x02, key_usage_qual=0x14):
pass
class SdKeyScp80_01Kik(SdKeyScp80_01, key_id=0x03, key_usage_qual=0x48):
pass
class SdKeyScp81_01(SdKey, kvn=0x81): # FIXME
pass
class SdKeyScp81_01Psk(SdKeyScp81_01, key_id=0x01, key_type=0x85, key_usage_qual=0x3C):
pass
class SdKeyScp81_01Dek(SdKeyScp81_01, key_id=0x02, key_type=0x88, key_usage_qual=0x48):
pass
class SdKeyScp02_20(SdKey, kvn=0x20, key_type=0x88, permitted_len=[16,24,32]): # AES key type
pass
class SdKeyScp02_20Enc(SdKeyScp02_20, key_id=0x01, key_usage_qual=0x18):
pass
class SdKeyScp02_20Mac(SdKeyScp02_20, key_id=0x02, key_usage_qual=0x14):
pass
class SdKeyScp02_20Dek(SdKeyScp02_20, key_id=0x03, key_usage_qual=0x48):
pass
class SdKeyScp03_30(SdKey, kvn=0x30, key_type=0x88, permitted_len=[16,24,32]): # AES key type
pass
class SdKeyScp03_30Enc(SdKeyScp03_30, key_id=0x01, key_usage_qual=0x18):
pass
class SdKeyScp03_30Mac(SdKeyScp03_30, key_id=0x02, key_usage_qual=0x14):
pass
class SdKeyScp03_30Dek(SdKeyScp03_30, key_id=0x03, key_usage_qual=0x48):
pass
class SdKeyScp03_31(SdKey, kvn=0x31, key_type=0x88, permitted_len=[16,24,32]): # AES key type
pass
class SdKeyScp03_31Enc(SdKeyScp03_31, key_id=0x01, key_usage_qual=0x18):
pass
class SdKeyScp03_31Mac(SdKeyScp03_31, key_id=0x02, key_usage_qual=0x14):
pass
class SdKeyScp03_31Dek(SdKeyScp03_31, key_id=0x03, key_usage_qual=0x48):
pass
class SdKeyScp03_32(SdKey, kvn=0x32, key_type=0x88, permitted_len=[16,24,32]): # AES key type
pass
class SdKeyScp03_32Enc(SdKeyScp03_32, key_id=0x01, key_usage_qual=0x18):
pass
class SdKeyScp03_32Mac(SdKeyScp03_32, key_id=0x02, key_usage_qual=0x14):
pass
class SdKeyScp03_32Dek(SdKeyScp03_32, key_id=0x03, key_usage_qual=0x48):
pass
def obtain_singleton_pe_from_pelist(l: List[ProfileElement], wanted_type: str) -> ProfileElement:
filtered = list(filter(lambda x: x.type == wanted_type, l))
assert len(filtered) == 1
@@ -209,25 +78,13 @@ def obtain_first_pe_from_pelist(l: List[ProfileElement], wanted_type: str) -> Pr
return filtered[0]
class Puk(ConfigurableParameter, metaclass=ClassVarMeta):
"""Configurable PUK (Pin Unblock Code). String ASCII-encoded digits."""
keyReference = None
def validate(self):
if isinstance(self.input_value, int):
self.value = '%08d' % self.input_value
else:
self.value = self.input_value
# FIXME: valid length?
if not self.value.isdecimal():
raise ValueError('PUK must only contain decimal digits')
def apply(self, pes: ProfileElementSequence):
puk = ''.join(['%02x' % (ord(x)) for x in self.value])
padded_puk = rpad(puk, 16)
mf_pes = pes.pes_by_naa['mf'][0]
pukCodes = obtain_singleton_pe_from_pelist(mf_pes, 'pukCodes')
for pukCode in pukCodes.decoded['pukCodes']:
if pukCode['keyReference'] == self.keyReference:
pukCode['pukValue'] = h2b(padded_puk)
pukCode['pukValue'] = self.value
return
raise ValueError('cannot find pukCode')
class Puk1(Puk, keyReference=0x01):
@@ -236,52 +93,29 @@ class Puk2(Puk, keyReference=0x81):
pass
class Pin(ConfigurableParameter, metaclass=ClassVarMeta):
"""Configurable PIN (Personal Identification Number). String of digits."""
keyReference = None
def validate(self):
if isinstance(self.input_value, int):
self.value = '%04d' % self.input_value
else:
self.value = self.input_value
if len(self.value) < 4 or len(self.value) > 8:
raise ValueError('PIN mus be 4..8 digits long')
if not self.value.isdecimal():
raise ValueError('PIN must only contain decimal digits')
def apply(self, pes: ProfileElementSequence):
pin = ''.join(['%02x' % (ord(x)) for x in self.value])
padded_pin = rpad(pin, 16)
mf_pes = pes.pes_by_naa['mf'][0]
pinCodes = obtain_first_pe_from_pelist(mf_pes, 'pinCodes')
if pinCodes.decoded['pinCodes'][0] != 'pinconfig':
return
for pinCode in pinCodes.decoded['pinCodes'][1]:
if pinCode['keyReference'] == self.keyReference:
pinCode['pinValue'] = h2b(padded_pin)
pinCode['pinValue'] = self.value
return
raise ValueError('cannot find pinCode')
class AppPin(ConfigurableParameter, metaclass=ClassVarMeta):
"""Configurable PIN (Personal Identification Number). String of digits."""
keyReference = None
def validate(self):
if isinstance(self.input_value, int):
self.value = '%04d' % self.input_value
else:
self.value = self.input_value
if len(self.value) < 4 or len(self.value) > 8:
raise ValueError('PIN mus be 4..8 digits long')
if not self.value.isdecimal():
raise ValueError('PIN must only contain decimal digits')
def _apply_one(self, pe: ProfileElement):
pin = ''.join(['%02x' % (ord(x)) for x in self.value])
padded_pin = rpad(pin, 16)
pinCodes = obtain_first_pe_from_pelist(pe, 'pinCodes')
if pinCodes.decoded['pinCodes'][0] != 'pinconfig':
return
for pinCode in pinCodes.decoded['pinCodes'][1]:
if pinCode['keyReference'] == self.keyReference:
pinCode['pinValue'] = h2b(padded_pin)
pinCode['pinValue'] = self.value
return
raise ValueError('cannot find pinCode')
def apply(self, pes: ProfileElementSequence):
for naa in pes.pes_by_naa:
if naa not in ['usim','isim','csim','telecom']:
@@ -300,12 +134,7 @@ class Adm2(Pin, keyReference=0x0B):
class AlgoConfig(ConfigurableParameter, metaclass=ClassVarMeta):
"""Configurable Algorithm parameter. bytes."""
key = None
def validate(self):
if not isinstance(self.input_value, (io.BytesIO, bytes, bytearray)):
raise ValueError('Value must be of bytes-like type')
self.value = self.input_value
def apply(self, pes: ProfileElementSequence):
for pe in pes.get_pes_for_type('akaParameter'):
algoConfiguration = pe.decoded['algoConfiguration']
@@ -318,7 +147,5 @@ class K(AlgoConfig, key='key'):
class Opc(AlgoConfig, key='opc'):
pass
class AlgorithmID(AlgoConfig, key='algorithmID'):
def validate(self):
if self.input_value not in [1, 2, 3]:
raise ValueError('Invalid algorithmID %s' % (self.input_value))
self.value = self.input_value
pass

View File

@@ -53,9 +53,9 @@ class FileTemplate:
return "FileTemplate(%s)" % (self.name)
def __repr__(self) -> str:
s_fid = "%04x" % self.fid if self.fid is not None else 'None'
s_arr = self.arr if self.arr is not None else 'None'
s_sfi = "%02x" % self.sfi if self.sfi is not None else 'None'
s_fid = "%04x" % self.fid if self.fid != None else 'None'
s_arr = self.arr if self.arr != None else 'None'
s_sfi = "%02x" % self.sfi if self.sfi != None else 'None'
return "FileTemplate(%s/%s, %s, %s, arr=%s, sfi=%s)" % (self.name, self.pe_name, s_fid,
self.file_type, s_arr, s_sfi)
@@ -93,7 +93,7 @@ class ProfileTemplateRegistry:
def get_by_oid(cls, oid: Union[List[int], str]) -> Optional[ProfileTemplate]:
"""Look-up the ProfileTemplate based on its OID. The OID can be given either in dotted-string format,
or as a list of integers."""
if not isinstance(oid, str):
if type(oid) is not str:
oid = OID.OID.str_from_intlist(oid)
return cls.by_oid.get(oid, None)
@@ -103,7 +103,7 @@ class ProfileTemplateRegistry:
# Section 9.2
class FilesAtMF(ProfileTemplate):
created_by_default = True
oid = OID.MF
oid = OID.MF,
files = [
FileTemplate(0x3f00, 'MF', 'MF', None, None, 14, None, None, None, params=['pinStatusTemplateDO']),
FileTemplate(0x2f05, 'EF.PL', 'TR', None, 2, 1, 0x05, 'FF...FF', None),
@@ -588,7 +588,7 @@ class FilesIsimMandatory(ProfileTemplate):
created_by_default = True
oid = OID.ADF_ISIM_by_default
files = [
FileTemplate( None, 'ADF.ISIM', 'ADF', None, None, 14, None, None, False, ['aid','temporary_fid','pinStatusTemplateDO']),
FileTemplate( None, 'ADF.ISIM', 'ADF', None, None, 14, None, None, False, ['aid','temporary_fid''pinStatusTemplateDO']),
FileTemplate(0x6f02, 'EF.IMPI', 'TR', None, None, 2, 0x02, None, True, ['size']),
FileTemplate(0x6f04, 'EF.IMPU', 'LF', 1, None, 2, 0x04, None, True, ['size']),
FileTemplate(0x6f03, 'EF.Domain', 'TR', None, None, 2, 0x05, None, True, ['size']),

View File

@@ -92,37 +92,5 @@ class CheckBasicStructure(ProfileConstraintChecker):
raise ProfileError('get-identity mandatory, but no usim or isim')
if 'profile-a-x25519' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs):
raise ProfileError('profile-a-x25519 mandatory, but no usim or isim')
if 'profile-a-p256' in m_svcs and not ('usim' in m_svcs or 'isim' in m_svcs):
if 'profile-a-p256' in m_svcs and not not ('usim' in m_svcs or 'isim' in m_svcs):
raise ProfileError('profile-a-p256 mandatory, but no usim or isim')
FileChoiceList = List[Tuple]
class FileError(ProfileError):
pass
class FileConstraintChecker:
def check(self, l: FileChoiceList):
for name in dir(self):
if name.startswith('check_'):
method = getattr(self, name)
method(l)
class FileCheckBasicStructure(FileConstraintChecker):
def check_seqence(self, l: FileChoiceList):
by_type = {}
for k, v in l:
if k in by_type:
by_type[k].append(v)
else:
by_type[k] = [v]
if 'doNotCreate' in by_type:
if len(l) != 1:
raise FileError("doNotCreate must be the only element")
if 'fileDescriptor' in by_type:
if len(by_type['fileDescriptor']) != 1:
raise FileError("fileDescriptor must be the only element")
if l[0][0] != 'fileDescriptor':
raise FileError("fileDescriptor must be the first element")
def check_forbidden(self, l: FileChoiceList):
"""Perform checks for forbidden parameters as described in Section 8.3.3."""

View File

@@ -19,11 +19,11 @@
import requests
from typing import Optional, List
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
from cryptography import x509
from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
from pySim.utils import b2h
@@ -67,6 +67,7 @@ class oid:
class VerifyError(Exception):
"""An error during certificate verification,"""
pass
class CertificateSet:
"""A set of certificates consisting of a trusted [self-signed] CA root certificate,
@@ -87,7 +88,7 @@ class CertificateSet:
self.crl = None
def load_crl(self, urls: Optional[List[str]] = None):
if urls and isinstance(urls, str):
if urls and type(urls) is str:
urls = [urls]
if not urls:
# generate list of CRL URLs from root CA certificate
@@ -101,7 +102,7 @@ class CertificateSet:
for url in urls:
try:
crl_bytes = requests.get(url, timeout=10)
crl_bytes = requests.get(url)
except requests.exceptions.ConnectionError:
continue
crl = x509.load_der_x509_crl(crl_bytes)
@@ -159,6 +160,7 @@ class CertificateSet:
if depth > max_depth:
raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth)
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:
"""convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes."""

View File

@@ -21,20 +21,20 @@ Related Specs: GSMA SGP.22, GSMA SGP.02, etc.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
from construct import Array, Struct, FlagsEnum, GreedyRange
from cmd2 import cmd2, CommandSet, with_default_category
from pySim.tlv import *
from pySim.construct import *
from construct import Optional as COptional
from construct import *
import argparse
from cmd2 import cmd2, CommandSet, with_default_category
from pySim.commands import SimCardCommands
from pySim.utils import Hexstr, SwHexstr, SwMatchstr
from pySim.filesystem import CardADF, CardApplication
from pySim.utils import Hexstr, SwHexstr
import pySim.global_platform
def compute_eid_checksum(eid) -> str:
"""Compute and add/replace check digits of an EID value according to GSMA SGP.29 Section 10."""
if isinstance(eid, str):
if type(eid) == str:
if len(eid) == 30:
# first pad by 2 digits
eid += "00"
@@ -44,7 +44,7 @@ def compute_eid_checksum(eid) -> str:
else:
raise ValueError("and EID must be 30 or 32 digits")
eid_int = int(eid)
elif isinstance(eid, int):
elif type(eid) == int:
eid_int = eid
if eid_int % 100:
# zero the last two digits
@@ -315,14 +315,14 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
self.adf.shell_commands += [self.AddlShellCommands()]
@staticmethod
def store_data(scc: SimCardCommands, tx_do: Hexstr, exp_sw: SwMatchstr ="9000") -> Tuple[Hexstr, SwHexstr]:
def store_data(scc: SimCardCommands, tx_do: Hexstr) -> Tuple[Hexstr, SwHexstr]:
"""Perform STORE DATA according to Table 47+48 in Section 5.7.2 of SGP.22.
Only single-block store supported for now."""
capdu = '%sE29100%02x%s' % (scc.cla4lchan('80'), len(tx_do)//2, tx_do)
return scc.send_apdu_checksw(capdu, exp_sw)
return scc.send_apdu_checksw(capdu)
@staticmethod
def store_data_tlv(scc: SimCardCommands, cmd_do, resp_cls, exp_sw: SwMatchstr = '9000'):
def store_data_tlv(scc: SimCardCommands, cmd_do, resp_cls, exp_sw='9000'):
"""Transceive STORE DATA APDU with the card, transparently encoding the command data from TLV
and decoding the response data tlv."""
if cmd_do:
@@ -332,7 +332,7 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
return ValueError('DO > 255 bytes not supported yet')
else:
cmd_do_enc = b''
(data, _sw) = CardApplicationISDR.store_data(scc, b2h(cmd_do_enc), exp_sw=exp_sw)
(data, sw) = CardApplicationISDR.store_data(scc, b2h(cmd_do_enc))
if data:
if resp_cls:
resp_do = resp_cls()
@@ -358,9 +358,9 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
@cmd2.with_argparser(es10x_store_data_parser)
def do_es10x_store_data(self, opts):
"""Perform a raw STORE DATA command as defined for the ES10x eUICC interface."""
(_data, _sw) = CardApplicationISDR.store_data(self._cmd.lchan.scc, opts.TX_DO)
(data, sw) = CardApplicationISDR.store_data(self._cmd.lchan.scc, opts.TX_DO)
def do_get_euicc_configured_addresses(self, _opts):
def do_get_euicc_configured_addresses(self, opts):
"""Perform an ES10a GetEuiccConfiguredAddresses function."""
eca = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, EuiccConfiguredAddresses(), EuiccConfiguredAddresses)
d = eca.to_dict()
@@ -377,25 +377,25 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
d = sdda.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['set_default_dp_address']))
def do_get_euicc_challenge(self, _opts):
def do_get_euicc_challenge(self, opts):
"""Perform an ES10b GetEUICCChallenge function."""
gec = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, GetEuiccChallenge(), GetEuiccChallenge)
d = gec.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['get_euicc_challenge']))
def do_get_euicc_info1(self, _opts):
def do_get_euicc_info1(self, opts):
"""Perform an ES10b GetEUICCInfo (1) function."""
ei1 = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, EuiccInfo1(), EuiccInfo1)
d = ei1.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['euicc_info1']))
def do_get_euicc_info2(self, _opts):
def do_get_euicc_info2(self, opts):
"""Perform an ES10b GetEUICCInfo (2) function."""
ei2 = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, EuiccInfo2(), EuiccInfo2)
d = ei2.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['euicc_info2']))
def do_list_notification(self, _opts):
def do_list_notification(self, opts):
"""Perform an ES10b ListNotification function."""
ln = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, ListNotificationReq(), ListNotificationResp)
d = ln.to_dict()
@@ -412,7 +412,7 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
d = rn.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['notification_sent_resp']))
def do_get_profiles_info(self, _opts):
def do_get_profiles_info(self, opts):
"""Perform an ES10c GetProfilesInfo function."""
pi = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, ProfileInfoListReq(), ProfileInfoListResp)
d = pi.to_dict()
@@ -475,9 +475,9 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
self._cmd.poutput_json(flatten_dict_lists(d['delete_profile_resp']))
def do_get_eid(self, _opts):
def do_get_eid(self, opts):
"""Perform an ES10c GetEID function."""
(_data, _sw) = CardApplicationISDR.store_data(self._cmd.lchan.scc, 'BF3E035C015A')
(data, sw) = CardApplicationISDR.store_data(self._cmd.lchan.scc, 'BF3E035C015A')
ged_cmd = GetEuiccData(children=[TagList(decoded=[0x5A])])
ged = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, ged_cmd, GetEuiccData)
d = ged.to_dict()
@@ -497,13 +497,13 @@ class CardApplicationISDR(pySim.global_platform.CardApplicationSD):
d = sn.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['set_nickname_resp']))
def do_get_certs(self, _opts):
def do_get_certs(self, opts):
"""Perform an ES10c GetCerts() function on an IoT eUICC."""
gc = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, GetCertsReq(), GetCertsResp)
d = gc.to_dict()
self._cmd.poutput_json(flatten_dict_lists(d['get_certficiates_resp']))
def do_get_eim_configuration_data(self, _opts):
def do_get_eim_configuration_data(self, opts):
"""Perform an ES10b GetEimConfigurationData function on an Iot eUICC."""
gec = CardApplicationISDR.store_data_tlv(self._cmd.lchan.scc, GetEimConfigurationData(),
GetEimConfigurationData)

View File

@@ -24,14 +24,17 @@
class NoCardError(Exception):
"""No card was found in the reader."""
pass
class ProtocolError(Exception):
"""Some kind of protocol level error interfacing with the card."""
pass
class ReaderError(Exception):
"""Some kind of general error with the card reader."""
pass
class SwMatchError(Exception):

View File

@@ -24,20 +24,24 @@ not the actual contents / runtime state of interacting with a given smart card.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import cast, Optional, Iterable, List, Dict, Tuple, Union
import argparse
import code
import tempfile
import json
import abc
import inspect
import cmd2
from cmd2 import CommandSet, with_default_category
from cmd2 import CommandSet, with_default_category, with_argparser
import argparse
from typing import cast, Optional, Iterable, List, Dict, Tuple, Union
from smartcard.util import toBytes
from pySim.utils import sw_match, h2b, b2h, is_hex, auto_int, auto_uint8, auto_uint16, is_hexstr
from pySim.utils import sw_match, h2b, b2h, i2h, is_hex, auto_int, auto_uint8, auto_uint16, Hexstr, is_hexstr
from pySim.construct import filter_dict, parse_construct, build_construct
from pySim.jsonpath import js_path_modify
from pySim.exceptions import *
from pySim.jsonpath import js_path_find, js_path_modify
from pySim.commands import SimCardCommands
# int: a single service is associated with this file
@@ -67,7 +71,7 @@ class CardFile:
profile : Card profile that this file should be part of
service : Service (SST/UST/IST) associated with the file
"""
if not isinstance(self, CardADF) and fid is None:
if not isinstance(self, CardADF) and fid == None:
raise ValueError("fid is mandatory")
if fid:
fid = fid.lower()
@@ -132,7 +136,6 @@ class CardFile:
return ret
def build_select_path_to(self, target: 'CardFile') -> Optional[List['CardFile']]:
"""Build the relative sequence of files we need to traverse to get from us to 'target'."""
# special-case handling for applications. Applications may be selected
# any time from any location. If there is an ADF somewhere in the path,
@@ -143,6 +146,7 @@ class CardFile:
return inter_path[i:]
return inter_path
"""Build the relative sequence of files we need to traverse to get from us to 'target'."""
# special-case handling for selecting MF while the MF is selected
if target == target.get_mf():
return [target]
@@ -163,7 +167,7 @@ class CardFile:
def get_mf(self) -> Optional['CardMF']:
"""Return the MF (root) of the file system."""
if self.parent is None:
if self.parent == None:
return None
# iterate towards the top. MF has parent == self
node = self
@@ -278,22 +282,23 @@ class CardFile:
"""Assuming the provided list of activated services, should this file exist and be activated?."""
if self.service is None:
return None
if isinstance(self.service, int):
elif isinstance(self.service, int):
# a single service determines the result
return self.service in services
if isinstance(self.service, list):
elif isinstance(self.service, list):
# any of the services active -> true
for s in self.service:
if s in services:
return True
return False
if isinstance(self.service, tuple):
elif isinstance(self.service, tuple):
# all of the services active -> true
for s in self.service:
if not s in services:
return False
return True
raise ValueError("self.service must be either int or list or tuple")
else:
raise ValueError("self.service must be either int or list or tuple")
class CardDF(CardFile):
@@ -301,14 +306,15 @@ class CardDF(CardFile):
@with_default_category('DF/ADF Commands')
class ShellCommands(CommandSet):
pass
def __init__(self):
super().__init__()
def __init__(self, **kwargs):
if not isinstance(self, CardADF):
if 'fid' not in kwargs:
if not 'fid' in kwargs:
raise TypeError('fid is mandatory for all DF')
super().__init__(**kwargs)
self.children = {}
self.children = dict()
self.shell_commands = [self.ShellCommands()]
# dict of CardFile affected by service(int), indexed by service
self.files_by_service = {}
@@ -409,7 +415,7 @@ class CardDF(CardFile):
def lookup_file_by_name(self, name: Optional[str]) -> Optional[CardFile]:
"""Find a file with given name within current DF."""
if name is None:
if name == None:
return None
for i in self.children.values():
if i.name and i.name == name:
@@ -418,7 +424,7 @@ class CardDF(CardFile):
def lookup_file_by_sfid(self, sfid: Optional[str]) -> Optional[CardFile]:
"""Find a file with given short file ID within current DF."""
if sfid is None:
if sfid == None:
return None
for i in self.children.values():
if i.sfid == int(str(sfid)):
@@ -443,7 +449,7 @@ class CardMF(CardDF):
# cannot be overridden; use assignment
kwargs['parent'] = self
super().__init__(**kwargs)
self.applications = {}
self.applications = dict()
def __str__(self):
return "MF(%s)" % (self.fid)
@@ -567,6 +573,9 @@ class TransparentEF(CardEF):
class ShellCommands(CommandSet):
"""Shell commands specific for transparent EFs."""
def __init__(self):
super().__init__()
dec_hex_parser = argparse.ArgumentParser()
dec_hex_parser.add_argument('--oneline', action='store_true',
help='No JSON pretty-printing, dump as a single line')
@@ -587,7 +596,7 @@ class TransparentEF(CardEF):
@cmd2.with_argparser(read_bin_parser)
def do_read_binary(self, opts):
"""Read binary data from a transparent EF"""
(data, _sw) = self._cmd.lchan.read_binary(opts.length, opts.offset)
(data, sw) = self._cmd.lchan.read_binary(opts.length, opts.offset)
self._cmd.poutput(data)
read_bin_dec_parser = argparse.ArgumentParser()
@@ -597,7 +606,7 @@ class TransparentEF(CardEF):
@cmd2.with_argparser(read_bin_dec_parser)
def do_read_binary_decoded(self, opts):
"""Read + decode data from a transparent EF"""
(data, _sw) = self._cmd.lchan.read_binary_dec()
(data, sw) = self._cmd.lchan.read_binary_dec()
self._cmd.poutput_json(data, opts.oneline)
upd_bin_parser = argparse.ArgumentParser()
@@ -608,7 +617,7 @@ class TransparentEF(CardEF):
@cmd2.with_argparser(upd_bin_parser)
def do_update_binary(self, opts):
"""Update (Write) data of a transparent EF"""
(data, _sw) = self._cmd.lchan.update_binary(opts.data, opts.offset)
(data, sw) = self._cmd.lchan.update_binary(opts.data, opts.offset)
if data:
self._cmd.poutput(data)
@@ -621,18 +630,18 @@ class TransparentEF(CardEF):
def do_update_binary_decoded(self, opts):
"""Encode + Update (Write) data of a transparent EF"""
if opts.json_path:
(data_json, _sw) = self._cmd.lchan.read_binary_dec()
(data_json, sw) = self._cmd.lchan.read_binary_dec()
js_path_modify(data_json, opts.json_path,
json.loads(opts.data))
else:
data_json = json.loads(opts.data)
(data, _sw) = self._cmd.lchan.update_binary_dec(data_json)
(data, sw) = self._cmd.lchan.update_binary_dec(data_json)
if data:
self._cmd.poutput_json(data)
def do_edit_binary_decoded(self, _opts):
def do_edit_binary_decoded(self, opts):
"""Edit the JSON representation of the EF contents in an editor."""
(orig_json, _sw) = self._cmd.lchan.read_binary_dec()
(orig_json, sw) = self._cmd.lchan.read_binary_dec()
with tempfile.TemporaryDirectory(prefix='pysim_') as dirname:
filename = '%s/file' % dirname
# write existing data as JSON to file
@@ -645,7 +654,7 @@ class TransparentEF(CardEF):
if edited_json == orig_json:
self._cmd.poutput("Data not modified, skipping write")
else:
(data, _sw) = self._cmd.lchan.update_binary_dec(edited_json)
(data, sw) = self._cmd.lchan.update_binary_dec(edited_json)
if data:
self._cmd.poutput_json(data)
@@ -686,7 +695,7 @@ class TransparentEF(CardEF):
return method(b2h(raw_bin_data))
if self._construct:
return parse_construct(self._construct, raw_bin_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_tlv(raw_bin_data)
return t.to_dict()
@@ -713,7 +722,7 @@ class TransparentEF(CardEF):
return method(raw_bin_data)
if self._construct:
return parse_construct(self._construct, raw_bin_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_tlv(raw_bin_data)
return t.to_dict()
@@ -739,7 +748,7 @@ class TransparentEF(CardEF):
return h2b(method(abstract_data))
if self._construct:
return build_construct(self._construct, abstract_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_dict(abstract_data)
return t.to_tlv()
@@ -767,7 +776,7 @@ class TransparentEF(CardEF):
return b2h(raw_bin_data)
if self._construct:
return b2h(build_construct(self._construct, abstract_data))
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_dict(abstract_data)
return b2h(t.to_tlv())
@@ -784,6 +793,10 @@ class LinFixedEF(CardEF):
@with_default_category('Linear Fixed EF Commands')
class ShellCommands(CommandSet):
"""Shell commands specific for Linear Fixed EFs."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
dec_hex_parser = argparse.ArgumentParser()
dec_hex_parser.add_argument('--oneline', action='store_true',
help='No JSON pretty-printing, dump as a single line')
@@ -806,8 +819,8 @@ class LinFixedEF(CardEF):
"""Read one or multiple records from a record-oriented EF"""
for r in range(opts.count):
recnr = opts.record_nr + r
(data, _sw) = self._cmd.lchan.read_record(recnr)
if len(data) > 0:
(data, sw) = self._cmd.lchan.read_record(recnr)
if (len(data) > 0):
recstr = str(data)
else:
recstr = "(empty)"
@@ -822,18 +835,18 @@ class LinFixedEF(CardEF):
@cmd2.with_argparser(read_rec_dec_parser)
def do_read_record_decoded(self, opts):
"""Read + decode a record from a record-oriented EF"""
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
(data, sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
self._cmd.poutput_json(data, opts.oneline)
read_recs_parser = argparse.ArgumentParser()
@cmd2.with_argparser(read_recs_parser)
def do_read_records(self, _opts):
def do_read_records(self, opts):
"""Read all records from a record-oriented EF"""
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
for recnr in range(1, 1 + num_of_rec):
(data, _sw) = self._cmd.lchan.read_record(recnr)
if len(data) > 0:
(data, sw) = self._cmd.lchan.read_record(recnr)
if (len(data) > 0):
recstr = str(data)
else:
recstr = "(empty)"
@@ -850,7 +863,7 @@ class LinFixedEF(CardEF):
# collect all results in list so they are rendered as JSON list when printing
data_list = []
for recnr in range(1, 1 + num_of_rec):
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
(data, sw) = self._cmd.lchan.read_record_dec(recnr)
data_list.append(data)
self._cmd.poutput_json(data_list, opts.oneline)
@@ -862,7 +875,7 @@ class LinFixedEF(CardEF):
@cmd2.with_argparser(upd_rec_parser)
def do_update_record(self, opts):
"""Update (write) data to a record-oriented EF"""
(data, _sw) = self._cmd.lchan.update_record(opts.record_nr, opts.data)
(data, sw) = self._cmd.lchan.update_record(opts.record_nr, opts.data)
if data:
self._cmd.poutput(data)
@@ -877,12 +890,12 @@ class LinFixedEF(CardEF):
def do_update_record_decoded(self, opts):
"""Encode + Update (write) data to a record-oriented EF"""
if opts.json_path:
(data_json, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
(data_json, sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
js_path_modify(data_json, opts.json_path,
json.loads(opts.data))
else:
data_json = json.loads(opts.data)
(data, _sw) = self._cmd.lchan.update_record_dec(
(data, sw) = self._cmd.lchan.update_record_dec(
opts.record_nr, data_json)
if data:
self._cmd.poutput(data)
@@ -894,7 +907,7 @@ class LinFixedEF(CardEF):
@cmd2.with_argparser(edit_rec_dec_parser)
def do_edit_record_decoded(self, opts):
"""Edit the JSON representation of one record in an editor."""
(orig_json, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
(orig_json, sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
with tempfile.TemporaryDirectory(prefix='pysim_') as dirname:
filename = '%s/file' % dirname
# write existing data as JSON to file
@@ -907,7 +920,7 @@ class LinFixedEF(CardEF):
if edited_json == orig_json:
self._cmd.poutput("Data not modified, skipping write")
else:
(data, _sw) = self._cmd.lchan.update_record_dec(
(data, sw) = self._cmd.lchan.update_record_dec(
opts.record_nr, edited_json)
if data:
self._cmd.poutput_json(data)
@@ -953,7 +966,7 @@ class LinFixedEF(CardEF):
return method(raw_bin_data, record_nr=record_nr)
if self._construct:
return parse_construct(self._construct, raw_bin_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_tlv(raw_bin_data)
return t.to_dict()
@@ -981,7 +994,7 @@ class LinFixedEF(CardEF):
return method(raw_hex_data, record_nr=record_nr)
if self._construct:
return parse_construct(self._construct, raw_bin_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_tlv(raw_bin_data)
return t.to_dict()
@@ -1009,7 +1022,7 @@ class LinFixedEF(CardEF):
return b2h(raw_bin_data)
if self._construct:
return b2h(build_construct(self._construct, abstract_data))
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_dict(abstract_data)
return b2h(t.to_tlv())
@@ -1037,7 +1050,7 @@ class LinFixedEF(CardEF):
return h2b(method(abstract_data, record_nr=record_nr))
if self._construct:
return build_construct(self._construct, abstract_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_dict(abstract_data)
return t.to_tlv()
@@ -1100,7 +1113,7 @@ class TransRecEF(TransparentEF):
return method(raw_bin_data)
if self._construct:
return parse_construct(self._construct, raw_bin_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_tlv(raw_bin_data)
return t.to_dict()
@@ -1127,7 +1140,7 @@ class TransRecEF(TransparentEF):
return method(raw_hex_data)
if self._construct:
return parse_construct(self._construct, raw_bin_data)
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_tlv(raw_bin_data)
return t.to_dict()
@@ -1153,7 +1166,7 @@ class TransRecEF(TransparentEF):
return b2h(method(abstract_data))
if self._construct:
return b2h(filter_dict(build_construct(self._construct, abstract_data)))
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_dict(abstract_data)
return b2h(t.to_tlv())
@@ -1180,7 +1193,7 @@ class TransRecEF(TransparentEF):
return h2b(method(abstract_data))
if self._construct:
return filter_dict(build_construct(self._construct, abstract_data))
if self._tlv:
elif self._tlv:
t = self._tlv() if inspect.isclass(self._tlv) else self._tlv
t.from_dict(abstract_data)
return t.to_tlv()
@@ -1210,6 +1223,9 @@ class BerTlvEF(CardEF):
class ShellCommands(CommandSet):
"""Shell commands specific for BER-TLV EFs."""
def __init__(self):
super().__init__()
retrieve_data_parser = argparse.ArgumentParser()
retrieve_data_parser.add_argument(
'tag', type=auto_int, help='BER-TLV Tag of value to retrieve')
@@ -1217,10 +1233,10 @@ class BerTlvEF(CardEF):
@cmd2.with_argparser(retrieve_data_parser)
def do_retrieve_data(self, opts):
"""Retrieve (Read) data from a BER-TLV EF"""
(data, _sw) = self._cmd.lchan.retrieve_data(opts.tag)
(data, sw) = self._cmd.lchan.retrieve_data(opts.tag)
self._cmd.poutput(data)
def do_retrieve_tags(self, _opts):
def do_retrieve_tags(self, opts):
"""List tags available in a given BER-TLV EF"""
tags = self._cmd.lchan.retrieve_tags()
self._cmd.poutput(tags)
@@ -1233,7 +1249,7 @@ class BerTlvEF(CardEF):
@cmd2.with_argparser(set_data_parser)
def do_set_data(self, opts):
"""Set (Write) data for a given tag in a BER-TLV EF"""
(data, _sw) = self._cmd.lchan.set_data(opts.tag, opts.data)
(data, sw) = self._cmd.lchan.set_data(opts.tag, opts.data)
if data:
self._cmd.poutput(data)
@@ -1244,7 +1260,7 @@ class BerTlvEF(CardEF):
@cmd2.with_argparser(del_data_parser)
def do_delete_data(self, opts):
"""Delete data for a given tag in a BER-TLV EF"""
(data, _sw) = self._cmd.lchan.set_data(opts.tag, None)
(data, sw) = self._cmd.lchan.set_data(opts.tag, None)
if data:
self._cmd.poutput(data)
@@ -1296,7 +1312,7 @@ class CardApplication:
"""
self.name = name
self.adf = adf
self.sw = sw or {}
self.sw = sw or dict()
# back-reference from ADF to Applicaiton
if self.adf:
self.aid = aid or self.adf.aid

View File

@@ -17,13 +17,12 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import io
from copy import deepcopy
from typing import Optional, List, Dict, Tuple
from construct import Optional as COptional
from construct import Struct, GreedyRange, FlagsEnum, Int16ub, Int24ub, Padding, Bit, Const
from construct import *
from copy import deepcopy
from bidict import bidict
from Cryptodome.Random import get_random_bytes
from Cryptodome.Cipher import DES, DES3, AES
from pySim.global_platform.scp import SCP02, SCP03
from pySim.construct import *
from pySim.utils import *
@@ -125,16 +124,15 @@ class KeyInformation(BER_TLV_IE, tag=0xe0, nested=[KeyInformationData]):
pass
# GP v2.3 11.1.9
KeyUsageQualifier = FlagsEnum(StripTrailerAdapter(GreedyBytes, 2),
verification_encryption=0x8000,
computation_decipherment=0x4000,
sm_response=0x2000,
sm_command=0x1000,
confidentiality=0x0800,
crypto_checksum=0x0400,
digital_signature=0x0200,
crypto_authorization=0x0100,
key_agreement=0x0080)
KeyUsageQualifier = Struct('byte1'/FlagsEnum(Byte, verification_encryption=0x80,
computation_decipherment=0x40,
sm_response=0x20,
sm_command=0x10,
confidentiality=0x08,
crypto_checksum=0x04,
digital_signature=0x02,
crypto_authorization=0x01),
'byte2'/COptional(FlagsEnum(Byte, key_agreement=0x80)))
# GP v2.3 11.1.10
KeyAccess = Enum(Byte, sd_and_any_assoc_app=0x00, sd_only=0x01, any_assoc_app_but_not_sd=0x02,
@@ -212,13 +210,13 @@ class SupportedLFDBHAlgorithms(BER_TLV_IE, tag=0x83):
class CiphersForLFDBEncryption(BER_TLV_IE, tag=0x84):
_construct = Enum(Byte, tripledes16=0x01, aes128=0x02, aes192=0x04, aes256=0x08,
icv_supported_for_lfdb=0x80)
CipherSuitesForSignatures = FlagsEnum(StripTrailerAdapter(GreedyBytes, 2),
rsa1024_pkcsv15_sha1=0x0100,
rsa_gt1024_pss_sha256=0x0200,
single_des_plus_final_triple_des_mac_16b=0x0400,
cmac_aes128=0x0800, cmac_aes192=0x1000, cmac_aes256=0x2000,
ecdsa_ecc256_sha256=0x4000, ecdsa_ecc384_sha384=0x8000,
ecdsa_ecc512_sha512=0x0001, ecdsa_ecc_521_sha512=0x0002)
CipherSuitesForSignatures = Struct('byte1'/FlagsEnum(Byte, rsa1024_pkcsv15_sha1=0x01,
rsa_gt1024_pss_sha256=0x02,
single_des_plus_final_triple_des_mac_16b=0x04,
cmac_aes128=0x08, cmac_aes192=0x10, cmac_aes256=0x20,
ecdsa_ecc256_sha256=0x40, ecdsa_ecc384_sha384=0x80),
'byte2'/COptional(FlagsEnum(Byte, ecdsa_ecc512_sha512=0x01,
ecdsa_ecc_521_sha512=0x02)))
class CiphersForTokens(BER_TLV_IE, tag=0x85):
_construct = CipherSuitesForSignatures
class CiphersForReceipts(BER_TLV_IE, tag=0x86):
@@ -389,17 +387,15 @@ class LifeCycleState(BER_TLV_IE, tag=0x9f70):
# Section 11.4.3.1 Table 11-36 + Section 11.1.2
class Privileges(BER_TLV_IE, tag=0xc5):
# we only support 3-byte encoding. Can't use StripTrailerAdapter as length==2 is not permitted. sigh.
_construct = FlagsEnum(Int24ub,
security_domain=0x800000, dap_verification=0x400000,
delegated_management=0x200000, card_lock=0x100000, card_terminate=0x080000,
card_reset=0x040000, cvm_management=0x020000,
mandated_dap_verification=0x010000,
trusted_path=0x8000, authorized_management=0x4000,
token_management=0x2000, global_delete=0x1000, global_lock=0x0800,
global_registry=0x0400, final_application=0x0200, global_service=0x0100,
receipt_generation=0x80, ciphered_load_file_data_block=0x40,
contactless_activation=0x20, contactless_self_activation=0x10)
_construct = Struct('byte1'/FlagsEnum(Byte, security_domain=0x80, dap_verification=0x40,
delegated_management=0x20, card_lock=0x10, card_terminate=0x08,
card_reset=0x04, cvm_management=0x02,
mandated_dap_verification=0x01),
'byte2'/COptional(FlagsEnum(Byte, trusted_path=0x80, authorized_management=0x40,
token_management=0x20, global_delete=0x10, global_lock=0x08,
global_registry=0x04, final_application=0x02, global_service=0x01)),
'byte3'/COptional(FlagsEnum(Byte, receipt_generation=0x80, ciphered_load_file_data_block=0x40,
contactless_activation=0x20, contactless_self_activation=0x10)))
# Section 11.4.3.1 Table 11-36 + Section 11.1.7
class ImplicitSelectionParameter(BER_TLV_IE, tag=0xcf):
@@ -435,23 +431,6 @@ class GpRegistryRelatedData(BER_TLV_IE, tag=0xe3, nested=[ApplicationAID, LifeCy
ExecutableModuleAID, AssociatedSecurityDomainAID]):
pass
# Section 11.6.2.3 / Table 11-58
class SecurityDomainAid(BER_TLV_IE, tag=0x4f):
_construct = GreedyBytes
class LoadFileDataBlockSignature(BER_TLV_IE, tag=0xc3):
_construct = GreedyBytes
class DapBlock(BER_TLV_IE, tag=0xe2, nested=[SecurityDomainAid, LoadFileDataBlockSignature]):
pass
class LoadFileDataBlock(BER_TLV_IE, tag=0xc4):
_construct = GreedyBytes
class Icv(BER_TLV_IE, tag=0xd3):
_construct = GreedyBytes
class CipheredLoadFileDataBlock(BER_TLV_IE, tag=0xd4):
_construct = GreedyBytes
class LoadFile(TLV_IE_Collection, nested=[DapBlock, LoadFileDataBlock, Icv, CipheredLoadFileDataBlock]):
pass
# Application Dedicated File of a Security Domain
class ADF_SD(CardADF):
StoreData = BitStruct('last_block'/Flag,
@@ -464,11 +443,15 @@ class ADF_SD(CardADF):
super().__init__(aid=aid, fid=None, sfid=None, name=name, desc=desc)
self.shell_commands += [self.AddlShellCommands()]
def decode_select_response(self, data_hex: str) -> object:
return decode_select_response(data_hex)
@staticmethod
def decode_select_response(res_hex: str) -> object:
return decode_select_response(res_hex)
@with_default_category('Application-Specific Commands')
class AddlShellCommands(CommandSet):
def __init__(self):
super().__init__()
get_data_parser = argparse.ArgumentParser()
get_data_parser.add_argument('data_object_name', type=str,
help='Name of the data object to be retrieved from the card')
@@ -484,7 +467,7 @@ class ADF_SD(CardADF):
self._cmd.poutput('Unknown data object "%s", available options: %s' % (tlv_cls_name,
do_names))
return
(data, _sw) = self._cmd.lchan.scc.get_data(cla=0x80, tag=tlv_cls.tag)
(data, sw) = self._cmd.lchan.scc.get_data(cla=0x80, tag=tlv_cls.tag)
ie = tlv_cls()
ie.from_tlv(h2b(data))
self._cmd.poutput_json(ie.to_dict())
@@ -510,19 +493,18 @@ class ADF_SD(CardADF):
def store_data(self, data: bytes, structure:str = 'none', encryption:str = 'none', response_permitted: bool = False) -> bytes:
"""Perform the GlobalPlatform GET DATA command in order to store some card-specific data.
See GlobalPlatform CardSpecification v2.3Section 11.11 for details."""
max_cmd_len = self._cmd.lchan.scc.max_cmd_len
# Table 11-89 of GP Card Specification v2.3
remainder = data
block_nr = 0
response = ''
while len(remainder):
chunk = remainder[:max_cmd_len]
remainder = remainder[max_cmd_len:]
chunk = remainder[:255]
remainder = remainder[255:]
p1b = build_construct(ADF_SD.StoreData,
{'last_block': len(remainder) == 0, 'encryption': encryption,
'structure': structure, 'response': response_permitted})
hdr = "80E2%02x%02x%02x" % (p1b[0], block_nr, len(chunk))
data, _sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(chunk))
data, sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(chunk))
block_nr += 1
response += data
return data
@@ -534,17 +516,12 @@ class ADF_SD(CardADF):
put_key_parser.add_argument('--key-type', choices=KeyType.ksymapping.values(), action='append', required=True, help='Key Type')
put_key_parser.add_argument('--key-data', type=is_hexstr, action='append', required=True, help='Key Data Block')
put_key_parser.add_argument('--key-check', type=is_hexstr, action='append', help='Key Check Value')
put_key_parser.add_argument('--suppress-key-check', action='store_true', help='Suppress generation of Key Check Values')
@cmd2.with_argparser(put_key_parser)
def do_put_key(self, opts):
"""Perform the GlobalPlatform PUT KEY command in order to store a new key on the card.
See GlobalPlatform CardSpecification v2.3 Section 11.8 for details.
The KCV (Key Check Values) can either be explicitly specified using `--key-check`, or will
otherwise be automatically generated for DES and AES keys. You can suppress the latter using
`--suppress-key-check`.
Example (SCP80 KIC/KID/KIK):
put_key --key-version-nr 1 --key-id 0x01 --key-type aes --key-data 000102030405060708090a0b0c0d0e0f
--key-type aes --key-data 101112131415161718191a1b1c1d1e1f
@@ -561,18 +538,9 @@ class ADF_SD(CardADF):
for i in range(0, len(opts.key_type)):
if opts.key_check and len(opts.key_check) > i:
kcv = opts.key_check[i]
elif opts.suppress_key_check:
else:
kcv = ''
else:
kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b''
kcv = b2h(kcv_bin)
if self._cmd.lchan.scc.scp:
# encrypte key data with DEK of current SCP
kcb = b2h(self._cmd.lchan.scc.scp.card_keys.encrypt_key(h2b(opts.key_data[i])))
else:
# (for example) during personalization, DEK might not be required)
kcb = opts.key_data[i]
kdb.append({'key_type': opts.key_type[i], 'kcb': kcb, 'kcv': kcv})
kdb.append({'key_type': opts.key_type[i], 'kcb': opts.key_data[i], 'kcv': kcv})
p2 = opts.key_id
if len(opts.key_type) > 1:
p2 |= 0x80
@@ -588,7 +556,7 @@ class ADF_SD(CardADF):
See GlobalPlatform CardSpecification v2.3 Section 11.8 for details."""
key_data = kvn.to_bytes(1, 'big') + build_construct(ADF_SD.AddlShellCommands.KeyDataBasic, key_dict)
hdr = "80D8%02x%02x%02x" % (old_kvn, kid, len(key_data))
data, _sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(key_data))
data, sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(key_data))
return data
get_status_parser = argparse.ArgumentParser()
@@ -618,7 +586,7 @@ class ADF_SD(CardADF):
while len(remainder):
# tlv sequence, each element is one GpRegistryRelatedData()
grd = GpRegistryRelatedData()
_dec, remainder = grd.from_tlv(remainder)
dec, remainder = grd.from_tlv(remainder)
grd_list.append(grd)
if sw != '6310':
return grd_list
@@ -644,79 +612,19 @@ class ADF_SD(CardADF):
def set_status(self, scope:str, status:str, aid:Hexstr = ''):
SetStatus = Struct(Const(0x80, Byte), Const(0xF0, Byte),
'scope'/SetStatusScope, 'status'/CLifeCycleState,
'aid'/HexAdapter(Prefixed(Int8ub, COptional(GreedyBytes))))
'aid'/HexAdapter(Prefixed(Int8ub, Optional(GreedyBytes))))
apdu = build_construct(SetStatus, {'scope':scope, 'status':status, 'aid':aid})
_data, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(apdu))
data, sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(apdu))
inst_perso_parser = argparse.ArgumentParser()
inst_perso_parser.add_argument('application-aid', type=is_hexstr, help='Application AID')
@cmd2.with_argparser(inst_perso_parser)
def do_install_for_personalization(self, opts):
"""Perform GlobalPlatform INSTALL [for personalization] command in order to inform a Security
"""Perform GlobalPlatform INSTALL [for personalization] command in order toinform a Security
Domain that the following STORE DATA commands are meant for a specific AID (specified here)."""
# Section 11.5.2.3.6 / Table 11-47
self.install(0x20, 0x00, "0000%02x%s000000" % (len(opts.application_aid)//2, opts.application_aid))
inst_inst_parser = argparse.ArgumentParser()
inst_inst_parser.add_argument('--load-file-aid', type=is_hexstr, default='',
help='Executable Load File AID')
inst_inst_parser.add_argument('--module-aid', type=is_hexstr, default='',
help='Executable Module AID')
inst_inst_parser.add_argument('--application-aid', type=is_hexstr, required=True,
help='Application AID')
inst_inst_parser.add_argument('--install-parameters', type=is_hexstr, default='',
help='Install Parameters')
inst_inst_parser.add_argument('--privilege', action='append', dest='privileges', default=[],
choices=Privileges._construct.flags.keys(),
help='Privilege granted to newly installed Application')
inst_inst_parser.add_argument('--install-token', type=is_hexstr, default='',
help='Install Token (Section GPCS C.4.2/C.4.7)')
inst_inst_parser.add_argument('--make-selectable', action='store_true',
help='Install and make selectable')
@cmd2.with_argparser(inst_inst_parser)
def do_install_for_install(self, opts):
"""Perform GlobalPlatform INSTALL [for install] command in order to install an application."""
InstallForInstallCD = Struct('load_file_aid'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'module_aid'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'application_aid'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'privileges'/Prefixed(Int8ub, Privileges._construct),
'install_parameters'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'install_token'/HexAdapter(Prefixed(Int8ub, GreedyBytes)))
p1 = 0x04
if opts.make_selectable:
p1 |= 0x08
decoded = vars(opts)
# convert from list to "true-dict" as required by construct.FlagsEnum
decoded['privileges'] = {x: True for x in decoded['privileges']}
ifi_bytes = build_construct(InstallForInstallCD, decoded)
self.install(p1, 0x00, b2h(ifi_bytes))
inst_load_parser = argparse.ArgumentParser()
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
help='AID of the loded file')
inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='',
help='AID of the Security Domain into which the file shalle be added')
inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='',
help='Load File Data Block Hash')
inst_inst_parser.add_argument('--load-parameters', type=is_hexstr, default='',
help='Load Token (Section GPCS C.4.1)')
inst_inst_parser.add_argument('--load-token', type=is_hexstr, default='',
help='Load Token (Section GPCS C.4.1)')
@cmd2.with_argparser(inst_load_parser)
def do_install_for_load(self, opts):
"""Perform GlobalPlatform INSTALL [for load] command."""
if opts.load_token != '' and opts.load_file_hash == '':
raise ValueError('Load File Data Block Hash is mandatory if a Load Token is present')
InstallForLoadCD = Struct('load_file_aid'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'security_domain_aid'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'load_file_hash'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'load_parameters'/HexAdapter(Prefixed(Int8ub, GreedyBytes)),
'load_token'/HexAdapter(Prefixed(Int8ub, GreedyBytes)))
ifl_bytes = build_construct(InstallForLoadCD, vars(opts))
self.install(0x02, 0x00, b2h(ifl_bytes))
self.install(0x20, 0x00, "0000%02u%s000000" % (len(opts.application_aid)//2, opts.application_aid))
def install(self, p1:int, p2:int, data:Hexstr) -> ResTuple:
cmd_hex = "80E6%02x%02x%02x%s" % (p1, p2, len(data)//2, data)
@@ -747,13 +655,13 @@ class ADF_SD(CardADF):
"""Perform GlobalPlaform DELETE (Key) command.
If both KID and KVN are specified, exactly one key is deleted. If only either of the two is
specified, multiple matching keys may be deleted."""
if opts.key_id is None and opts.key_ver is None:
if opts.key_id == None and opts.key_ver == None:
raise ValueError('At least one of KID or KVN must be specified')
p2 = 0x80 if opts.delete_related_objects else 0x00
cmd = ""
if opts.key_id is not None:
if opts.key_id != None:
cmd += "d001%02x" % opts.key_id
if opts.key_ver is not None:
if opts.key_ver != None:
cmd += "d201%02x" % opts.key_ver
self.delete(0x00, p2, cmd)
@@ -761,37 +669,6 @@ class ADF_SD(CardADF):
cmd_hex = "80E4%02x%02x%02x%s" % (p1, p2, len(data)//2, data)
return self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
load_parser = argparse.ArgumentParser()
# we make this a required --optional argument now, so we can later have other sources for load data
load_parser.add_argument('--from-file', required=True)
@cmd2.with_argparser(load_parser)
def do_load(self, opts):
"""Perform a GlobalPlatform LOAD command. We currently only support loading without DAP and
without ciphering."""
with open(opts.from_file, 'rb') as f:
self.load(f)
def load(self, stream: io.RawIOBase, chunk_len:int = 240):
# we might want to tune chunk_len based on the overhead of the used SCP?
contents = stream.readall()
# build TLV according to 11.6.2.3 / Table 11-58 for unencrypted case
remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
# transfer this in vaious chunks to the card
total_size = len(remainder)
block_nr = 0
while len(remainder):
block = remainder[:chunk_len]
remainder = remainder[chunk_len:]
# build LOAD command APDU according to 11.6.2 / Table 11-56
p1 = 0x80 if len(remainder) else 0x00
p2 = block_nr % 256
block_nr += 1
cmd_hex = "80E8%02x%02x%02x%s" % (p1, p2, len(block), b2h(block))
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
self._cmd.poutput("Loaded a total of %u bytes in %u blocks. Don't forget install_for_load now!" % (total_size, block_nr))
est_scp02_parser = argparse.ArgumentParser()
est_scp02_parser.add_argument('--key-ver', type=auto_uint8, required=True,
help='Key Version Number (KVN)')
@@ -837,17 +714,17 @@ class ADF_SD(CardADF):
def _establish_scp(self, scp, host_challenge, security_level):
# perform the common functionality shared by SCP02 and SCP03 establishment
init_update_apdu = scp.gen_init_update_apdu(host_challenge=host_challenge)
init_update_resp, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(init_update_apdu))
init_update_resp, sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(init_update_apdu))
scp.parse_init_update_resp(h2b(init_update_resp))
ext_auth_apdu = scp.gen_ext_auth_apdu(security_level)
_ext_auth_resp, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(ext_auth_apdu))
ext_auth_resp, sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(ext_auth_apdu))
self._cmd.poutput("Successfully established a %s secure channel" % str(scp))
# store a reference to the SCP instance
self._cmd.lchan.scc.scp = scp
self._cmd.update_prompt()
def do_release_scp(self, _opts):
def do_release_scp(self, opts):
"""Release a previously establiehed secure channel."""
if not self._cmd.lchan.scc.scp:
self._cmd.poutput("Cannot release SCP as none is established")
@@ -879,7 +756,7 @@ class CardApplicationISD(CardApplicationSD):
class GpCardKeyset:
"""A single set of GlobalPlatform card keys and the associated KVN."""
def __init__(self, kvn: int, enc: bytes, mac: bytes, dek: bytes):
assert 0 < kvn < 256
assert kvn >= 0 and kvn < 256
assert len(enc) == len(mac) == len(dek)
self.kvn = kvn
self.enc = enc
@@ -888,39 +765,8 @@ class GpCardKeyset:
@classmethod
def from_single_key(cls, kvn: int, base_key: bytes) -> 'GpCardKeyset':
return cls(kvn, base_key, base_key, base_key)
return cls(int, base_key, base_key, base_key)
def __str__(self):
return "%s(KVN=%u, ENC=%s, MAC=%s, DEK=%s)" % (self.__class__.__name__,
self.kvn, b2h(self.enc), b2h(self.mac), b2h(self.dek))
def compute_kcv_des(key:bytes) -> bytes:
# GP Card Spec B.6: For a DES key, the key check value is computed by encrypting 8 bytes, each with
# value '00', with the key to be checked and retaining the 3 highest-order bytes of the encrypted
# result.
plaintext = b'\x00' * 8
cipher = DES3.new(key, DES.MODE_ECB)
return cipher.encrypt(plaintext)
def compute_kcv_aes(key:bytes) -> bytes:
# GP Card Spec B.6: For a AES key, the key check value is computed by encrypting 16 bytes, each with
# value '01', with the key to be checked and retaining the 3 highest-order bytes of the encrypted
# result.
plaintext = b'\x01' * 16
cipher = AES.new(key, AES.MODE_ECB)
return cipher.encrypt(plaintext)
# dict is keyed by the string name of the KeyType enum above in this file
KCV_CALCULATOR = {
'aes': compute_kcv_aes,
'des': compute_kcv_des,
}
def compute_kcv(key_type: str, key: bytes) -> Optional[bytes]:
"""Compute the KCV (Key Check Value) for given key type and key."""
kcv_calculator = KCV_CALCULATOR.get(key_type)
if not kcv_calculator:
return None
else:
return kcv_calculator(key)[:3]

View File

@@ -22,16 +22,16 @@ from Cryptodome.Cipher import DES3, DES
from Cryptodome.Util.strxor import strxor
from construct import Struct, Bytes, Int8ub, Int16ub, Const
from construct import Optional as COptional
from pySim.utils import b2h, bertlv_parse_len, bertlv_encode_len
from pySim.utils import b2h
from pySim.secure_channel import SecureChannel
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def scp02_key_derivation(constant: bytes, counter: int, base_key: bytes) -> bytes:
assert len(constant) == 2
assert(len(constant) == 2)
assert(counter >= 0 and counter <= 65535)
assert len(base_key) == 16
assert(len(base_key) == 16)
derivation_data = constant + counter.to_bytes(2, 'big') + b'\x00' * 12
cipher = DES3.new(base_key, DES.MODE_CBC, b'\x00' * 8)
@@ -58,7 +58,6 @@ class Scp02SessionKeys:
DERIV_CONST_RMAC = b'\x01\x02'
DERIV_CONST_ENC = b'\x01\x82'
DERIV_CONST_DENC = b'\x01\x81'
blocksize = 8
def calc_mac_1des(self, data: bytes, reset_icv: bool = False) -> bytes:
"""Pad and calculate MAC according to B.1.2.2 - Single DES plus final 3DES"""
@@ -155,7 +154,8 @@ class SCP(SecureChannel, abc.ABC):
# only protect those APDUs that actually are global platform commands
if apdu[0] & 0x80:
return self._wrap_cmd_apdu(apdu, *args, **kwargs)
return apdu
else:
return apdu
@abc.abstractmethod
def _wrap_cmd_apdu(self, apdu: bytes, *args, **kwargs) -> bytes:
@@ -174,42 +174,6 @@ class SCP(SecureChannel, abc.ABC):
def gen_ext_auth_apdu(self, security_level: int = 0x01) -> bytes:
pass
def encrypt_key(self, key: bytes) -> bytes:
"""Encrypt a key with the DEK."""
num_pad = len(key) % self.sk.blocksize
if num_pad:
return bertlv_encode_len(len(key)) + self.dek_encrypt(key + b'\x00'*num_pad)
return self.dek_encrypt(key)
def decrypt_key(self, encrypted_key:bytes) -> bytes:
"""Decrypt a key with the DEK."""
if len(encrypted_key) % self.sk.blocksize:
# If the length of the Key Component Block is not a multiple of the block size of the encryption #
# algorithm (i.e. 8 bytes for DES, 16 bytes for AES), then it shall be assumed that the key
# component value was right-padded prior to encryption and that the Key Component Block was
# formatted as described in Table 11-70. In this case, the first byte(s) of the Key Component
# Block provides the actual length of the key component value, which allows recovering the
# clear-text key component value after decryption of the encrypted key component value and removal
# of padding bytes.
decrypted = self.dek_decrypt(encrypted_key)
key_len, remainder = bertlv_parse_len(decrypted)
return remainder[:key_len]
else:
# If the length of the Key Component Block is a multiple of the block size of the encryption
# algorithm (i.e. 8 bytes for DES, 16 bytes for AES), then it shall be assumed that no padding
# bytes were added before encrypting the key component value and that the Key Component Block is
# only composed of the encrypted key component value (as shown in Table 11-71). In this case, the
# clear-text key component value is simply recovered by decrypting the Key Component Block.
return self.dek_decrypt(encrypted_key)
@abc.abstractmethod
def dek_encrypt(self, plaintext:bytes) -> bytes:
pass
@abc.abstractmethod
def dek_decrypt(self, ciphertext:bytes) -> bytes:
pass
class SCP02(SCP):
"""An instance of the GlobalPlatform SCP02 secure channel protocol."""
@@ -218,18 +182,6 @@ class SCP02(SCP):
'seq_counter'/Int16ub, 'card_challenge'/Bytes(6), 'card_cryptogram'/Bytes(8))
kvn_range = [0x20, 0x2f]
def __init__(self, *args, **kwargs):
self.overhead = 8
super().__init__(*args, **kwargs)
def dek_encrypt(self, plaintext:bytes) -> bytes:
cipher = DES.new(self.card_keys.dek, DES.MODE_ECB)
return cipher.encrypt(plaintext)
def dek_decrypt(self, ciphertext:bytes) -> bytes:
cipher = DES.new(self.card_keys.dek, DES.MODE_ECB)
return cipher.decrypt(ciphertext)
def _compute_cryptograms(self, card_challenge: bytes, host_challenge: bytes):
logger.debug("host_challenge(%s), card_challenge(%s)", b2h(host_challenge), b2h(card_challenge))
self.host_cryptogram = self.sk.calc_mac_3des(self.sk.counter.to_bytes(2, 'big') + card_challenge + host_challenge)
@@ -264,7 +216,7 @@ class SCP02(SCP):
mac = self.sk.calc_mac_1des(header + self.host_cryptogram, True)
return bytes([self._cla(True), INS_EXT_AUTH, self.security_level, 0, 16]) + self.host_cryptogram + mac
def _wrap_cmd_apdu(self, apdu: bytes, *args, **kwargs) -> bytes:
def _wrap_cmd_apdu(self, apdu: bytes) -> bytes:
"""Wrap Command APDU for SCP02: calculate MAC and encrypt."""
lc = len(apdu) - 5
assert len(apdu) >= 5, "Wrong APDU length: %d" % len(apdu)
@@ -296,9 +248,9 @@ class SCP02(SCP):
apdu = bytes([self._cla(True, b8)]) + apdu[1:4] + bytes([lc]) + data + mac
return apdu
def unwrap_rsp_apdu(self, sw: bytes, rsp_apdu: bytes) -> bytes:
def unwrap_rsp_apdu(self, sw: bytes, apdu: bytes) -> bytes:
# TODO: Implement R-MAC / R-ENC
return rsp_apdu
return apdu
@@ -312,7 +264,7 @@ def scp03_key_derivation(constant: bytes, context: bytes, base_key: bytes, l: Op
def prf(key: bytes, data:bytes):
return CMAC.new(key, data, AES).digest()
if l is None:
if l == None:
l = len(base_key) * 8
logger.debug("scp03_kdf(constant=%s, context=%s, base_key=%s, l=%u)", b2h(constant), b2h(context), b2h(base_key), l)
@@ -414,17 +366,8 @@ class SCP03(SCP):
def __init__(self, *args, **kwargs):
self.s_mode = kwargs.pop('s_mode', 8)
self.overhead = self.s_mode
super().__init__(*args, **kwargs)
def dek_encrypt(self, plaintext:bytes) -> bytes:
cipher = AES.new(self.card_keys.dek, AES.MODE_CBC, b'\x00'*16)
return cipher.encrypt(plaintext)
def dek_decrypt(self, ciphertext:bytes) -> bytes:
cipher = AES.new(self.card_keys.dek, AES.MODE_CBC, b'\x00'*16)
return cipher.decrypt(ciphertext)
def _compute_cryptograms(self):
logger.debug("host_challenge(%s), card_challenge(%s)", b2h(self.host_challenge), b2h(self.card_challenge))
# Card + Host Authentication Cryptogram: Section 6.2.2.2 + 6.2.2.3
@@ -435,7 +378,7 @@ class SCP03(SCP):
def gen_init_update_apdu(self, host_challenge: Optional[bytes] = None) -> bytes:
"""Generate INITIALIZE UPDATE APDU."""
if host_challenge is None:
if host_challenge == None:
host_challenge = b'\x00' * self.s_mode
if len(host_challenge) != self.s_mode:
raise ValueError('Host Challenge must be %u bytes long' % self.s_mode)
@@ -505,20 +448,20 @@ class SCP03(SCP):
return mapdu
def unwrap_rsp_apdu(self, sw: bytes, rsp_apdu: bytes) -> bytes:
def unwrap_rsp_apdu(self, sw: bytes, apdu: bytes) -> bytes:
# No R-MAC shall be generated and no protection shall be applied to a response that includes an error
# status word: in this case only the status word shall be returned in the response. All status words
# except '9000' and warning status words (i.e. '62xx' and '63xx') shall be interpreted as error status
# words.
logger.debug("unwrap_rsp_apdu(sw=%s, rsp_apdu=%s)", sw, rsp_apdu)
logger.debug("unwrap_rsp_apdu(sw=%s, apdu=%s)", sw, apdu)
if not self.do_rmac:
assert not self.do_renc
return rsp_apdu
return apdu
if sw != b'\x90\x00' and sw[0] not in [0x62, 0x63]:
return rsp_apdu
response_data = rsp_apdu[:-self.s_mode]
rmac = rsp_apdu[-self.s_mode:]
return apdu
response_data = apdu[:-self.s_mode]
rmac = apdu[-self.s_mode:]
rmac_exp = self.sk.calc_rmac(response_data + sw)[:self.s_mode]
if rmac != rmac_exp:
raise ValueError("R-MAC value not matching: received: %s, computed: %s" % (rmac, rmac_exp))

View File

@@ -1,10 +1,13 @@
# -*- coding: utf-8 -*-
# without this, pylint will fail when inner classes are used
# within the 'nested' kwarg of our TlvMeta metaclass on python 3.7 :(
# pylint: disable=undefined-variable
"""
The File (and its derived classes) uses the classes of pySim.filesystem in
order to describe the files specified in UIC Reference P38 T 9001 5.0 "FFFIS for GSM-R SIM Cards"
"""
# without this, pylint will fail when inner classes are used
# within the 'nested' kwarg of our TlvMeta metaclass on python 3.7 :(
# pylint: disable=undefined-variable
#
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
@@ -25,13 +28,16 @@ order to describe the files specified in UIC Reference P38 T 9001 5.0 "FFFIS for
from pySim.utils import *
#from pySim.tlv import *
from struct import pack, unpack
from construct import Struct, Bytes, Int8ub, Int16ub, Int24ub, Int32ub, FlagsEnum
from construct import *
from construct import Optional as COptional
from pySim.construct import *
import enum
from pySim.profile import CardProfileAddon
from pySim.filesystem import *
import pySim.ts_51_011
######################################################################
# DF.EIRENE (FFFIS for GSM-R SIM Cards)
@@ -71,15 +77,15 @@ class PlConfAdapter(Adapter):
num = int(obj) & 0x7
if num == 0:
return 'None'
if num == 1:
elif num == 1:
return 4
if num == 2:
elif num == 2:
return 3
if num == 3:
elif num == 3:
return 2
if num == 4:
elif num == 4:
return 1
if num == 5:
elif num == 5:
return 0
def _encode(self, obj, context, path):
@@ -88,13 +94,13 @@ class PlConfAdapter(Adapter):
obj = int(obj)
if obj == 4:
return 1
if obj == 3:
elif obj == 3:
return 2
if obj == 2:
elif obj == 2:
return 3
if obj == 1:
elif obj == 1:
return 4
if obj == 0:
elif obj == 0:
return 5
@@ -105,19 +111,19 @@ class PlCallAdapter(Adapter):
num = int(obj) & 0x7
if num == 0:
return 'None'
if num == 1:
elif num == 1:
return 4
if num == 2:
elif num == 2:
return 3
if num == 3:
elif num == 3:
return 2
if num == 4:
elif num == 4:
return 1
if num == 5:
elif num == 5:
return 0
if num == 6:
elif num == 6:
return 'B'
if num == 7:
elif num == 7:
return 'A'
def _encode(self, obj, context, path):
@@ -125,17 +131,17 @@ class PlCallAdapter(Adapter):
return 0
if obj == 4:
return 1
if obj == 3:
elif obj == 3:
return 2
if obj == 2:
elif obj == 2:
return 3
if obj == 1:
elif obj == 1:
return 4
if obj == 0:
elif obj == 0:
return 5
if obj == 'B':
elif obj == 'B':
return 6
if obj == 'A':
elif obj == 'A':
return 7

View File

@@ -23,9 +23,9 @@ telecom-related protocol traces over UDP.
#
import socket
from typing import List, Dict, Optional
from construct import Optional as COptional
from construct import Int8ub, Int8sb, Int32ub, BitStruct, Enum, GreedyBytes, Struct, Switch
from construct import this, PaddedString
from construct import *
from pySim.construct import *
# The root definition of GSMTAP can be found at

View File

@@ -17,7 +17,7 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from construct import GreedyBytes, GreedyString
from construct import *
from pySim.construct import *
from pySim.utils import *
from pySim.filesystem import *

View File

@@ -1,3 +1,8 @@
# coding=utf-8
import json
import pprint
import jsonpath_ng
"""JSONpath utility functions as needed within pysim.
As pySim-sell has the ability to represent SIM files as JSON strings,
@@ -5,8 +10,6 @@ adding JSONpath allows us to conveniently modify individual sub-fields
of a file or record in its JSON representation.
"""
import jsonpath_ng
# (C) 2021 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify

View File

@@ -15,16 +15,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.construct import *
from pySim.utils import b2h
from pySim.sms import UserDataHeader
from construct import *
import zlib
import abc
import struct
from typing import Optional
from construct import Enum, Int8ub, Int16ub, Struct, Bytes, GreedyBytes, BitsInteger, BitStruct
from construct import Flag, Padding, Switch, this
from pySim.construct import *
from pySim.utils import b2h
from pySim.sms import UserDataHeader
# ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS
# 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP
@@ -114,12 +112,12 @@ class OtaKeyset:
@property
def auth(self):
"""Return an instance of the matching OtaAlgoAuth."""
return OtaAlgoAuth.from_keyset(self)
return OtaAlgoAuth.fromKeyset(self)
@property
def crypt(self):
"""Return an instance of the matching OtaAlgoCrypt."""
return OtaAlgoCrypt.from_keyset(self)
return OtaAlgoCrypt.fromKeyset(self)
class OtaCheckError(Exception):
pass
@@ -130,24 +128,26 @@ class OtaDialect(abc.ABC):
def _compute_sig_len(self, spi:SPI):
if spi['rc_cc_ds'] == 'no_rc_cc_ds':
return 0
if spi['rc_cc_ds'] == 'rc': # CRC-32
elif spi['rc_cc_ds'] == 'rc': # CRC-32
return 4
if spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
# TODO: this is not entirely correct, as in AES case it could be 4 or 8
return 8
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
else:
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
@abc.abstractmethod
def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes:
def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes:
pass
@abc.abstractmethod
def decode_resp(self, otak: OtaKeyset, spi: dict, apdu: bytes) -> (object, Optional["CompactRemoteResp"]):
def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]):
"""Decode a response into a response packet and, if indicted (by a
response status of `"por_ok"`) a decoded response.
The response packet's common characteristics are not fully determined,
and (so far) completely proprietary per dialect."""
pass
from Cryptodome.Cipher import DES, DES3, AES
@@ -190,7 +190,7 @@ class OtaAlgoCrypt(OtaAlgo, abc.ABC):
def encrypt(self, data:bytes) -> bytes:
"""Encrypt given input bytes using the key material given in constructor."""
padded_data = self.pad_to_blocksize(data)
return self._encrypt(padded_data)
return self._encrypt(data)
def decrypt(self, data:bytes) -> bytes:
"""Decrypt given input bytes using the key material given in constructor."""
@@ -199,13 +199,15 @@ class OtaAlgoCrypt(OtaAlgo, abc.ABC):
@abc.abstractmethod
def _encrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@abc.abstractmethod
def _decrypt(self, data:bytes) -> bytes:
"""Actual implementation, to be implemented by derived class."""
pass
@classmethod
def from_keyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
"""Resolve the class for the encryption algorithm of otak and instantiate it."""
for subc in cls.__subclasses__():
if subc.enum_name == otak.algo_crypt:
@@ -237,7 +239,7 @@ class OtaAlgoAuth(OtaAlgo, abc.ABC):
pass
@classmethod
def from_keyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
"""Resolve the class for the authentication algorithm of otak and instantiate it."""
for subc in cls.__subclasses__():
if subc.enum_name == otak.algo_auth:
@@ -397,7 +399,7 @@ class OtaDialectSms(OtaDialect):
# POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
if data[0] != 0x02:
raise ValueError('Unexpected UDL=0x%02x' % data[0])
udhd, remainder = UserDataHeader.from_bytes(data)
udhd, remainder = UserDataHeader.fromBytes(data)
if not udhd.has_ie(0x71):
raise ValueError('RPI 0x71 not found in UDH')
rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered

View File

@@ -21,13 +21,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from pySim.commands import SimCardCommands
from pySim.filesystem import CardApplication, interpret_sw
from pySim.utils import all_subclasses
import abc
import operator
from typing import List
from pySim.commands import SimCardCommands
from pySim.filesystem import CardApplication, interpret_sw
from pySim.utils import all_subclasses
def _mf_select_test(scc: SimCardCommands,
cla_byte: str, sel_ctrl: str,
@@ -166,7 +166,7 @@ class CardProfile:
return None
def add_addon(self, addon: 'CardProfileAddon'):
assert addon not in self.addons
assert(addon not in self.addons)
# we don't install any additional files, as that is happening in the RuntimeState.
self.addons.append(addon)
@@ -186,6 +186,7 @@ class CardProfileAddon(abc.ABC):
self.desc = kw.get("desc", None)
self.files_in_mf = kw.get("files_in_mf", [])
self.shell_cmdsets = kw.get("shell_cmdsets", [])
pass
def __str__(self):
return self.name
@@ -193,3 +194,4 @@ class CardProfileAddon(abc.ABC):
@abc.abstractmethod
def probe(self, card: 'CardBase') -> bool:
"""Probe a given card to determine whether or not this add-on is present/supported."""
pass

View File

@@ -19,7 +19,7 @@
from typing import Optional, Tuple
from pySim.utils import h2b, i2h, is_hex, bertlv_parse_one, Hexstr
from pySim.utils import sw_match, h2b, i2h, is_hex, bertlv_parse_one, Hexstr
from pySim.exceptions import *
from pySim.filesystem import *
@@ -29,10 +29,11 @@ def lchan_nr_from_cla(cla: int) -> int:
if cla >> 4 in [0x0, 0xA, 0x8]:
# Table 10.3
return cla & 0x03
if cla & 0xD0 in [0x40, 0xC0]:
elif cla & 0xD0 in [0x40, 0xC0]:
# Table 10.4a
return 4 + (cla & 0x0F)
raise ValueError('Could not determine logical channel for CLA=%2X' % cla)
else:
raise ValueError('Could not determine logical channel for CLA=%2X' % cla)
class RuntimeState:
"""Represent the runtime state of a session with a card."""
@@ -115,7 +116,7 @@ class RuntimeState:
# no problem when we access the card object directly without caring
# about updating other states. For normal selects at runtime, the
# caller must use the lchan provided methods select or select_file!
_data, sw = self.card.select_adf_by_aid(f.aid)
data, sw = self.card.select_adf_by_aid(f.aid)
self.selected_adf = f
if sw == "9000":
print(" %s: %s" % (f.name, f.aid))
@@ -263,20 +264,20 @@ class RuntimeLchan:
# run time. In case the file does not exist on the card, we just abort.
# The state on the card (selected file/application) wont't be changed,
# so we do not have to update any state in that case.
(data, _sw) = self.scc.select_file(fid)
(data, sw) = self.scc.select_file(fid)
except SwMatchError as swm:
self._select_post(cmd_app)
k = self.interpret_sw(swm.sw_actual)
if not k:
raise swm
raise RuntimeError("%s: %s - %s" % (swm.sw_actual, k[0], k[1])) from swm
raise(swm)
raise RuntimeError("%s: %s - %s" % (swm.sw_actual, k[0], k[1]))
select_resp = self.selected_file.decode_select_response(data)
if select_resp['file_descriptor']['file_descriptor_byte']['file_type'] == 'df':
if (select_resp['file_descriptor']['file_descriptor_byte']['file_type'] == 'df'):
f = CardDF(fid=fid, sfid=None, name="DF." + str(fid).upper(),
desc="dedicated file, manually added at runtime")
else:
if select_resp['file_descriptor']['file_descriptor_byte']['structure'] == 'transparent':
if (select_resp['file_descriptor']['file_descriptor_byte']['structure'] == 'transparent'):
f = TransparentEF(fid=fid, sfid=None, name="EF." + str(fid).upper(),
desc="elementary file, manually added at runtime")
else:
@@ -339,13 +340,13 @@ class RuntimeLchan:
# card directly since this would lead into an incoherence of the
# card state and the state of the lchan.
if isinstance(f, CardADF):
(data, _sw) = self.rs.card.select_adf_by_aid(f.aid, scc=self.scc)
(data, sw) = self.rs.card.select_adf_by_aid(f.aid, scc=self.scc)
else:
(data, _sw) = self.scc.select_file(f.fid)
(data, sw) = self.scc.select_file(f.fid)
selected_file = f
except SwMatchError as swm:
self._select_post(cmd_app, selected_file, data)
raise swm
raise(swm)
self._select_post(cmd_app, f, data)
@@ -393,7 +394,7 @@ class RuntimeLchan:
def status(self):
"""Request STATUS (current selected file FCP) from card."""
(data, _sw) = self.scc.status()
(data, sw) = self.scc.status()
return self.selected_file.decode_select_response(data)
def get_file_for_selectable(self, name: str):
@@ -523,8 +524,8 @@ class RuntimeLchan:
"""
if not isinstance(self.selected_file, BerTlvEF):
raise TypeError("Only works with BER-TLV EF")
data, _sw = self.scc.retrieve_data(self.selected_file.fid, 0x5c)
_tag, _length, value, _remainder = bertlv_parse_one(h2b(data))
data, sw = self.scc.retrieve_data(self.selected_file.fid, 0x5c)
tag, length, value, remainder = bertlv_parse_one(h2b(data))
return list(value)
def set_data(self, tag: int, data_hex: str):
@@ -543,3 +544,6 @@ class RuntimeLchan:
if cmd_app and self.selected_file.shell_commands:
for c in self.selected_file.shell_commands:
cmd_app.unregister_command_set(c)

View File

@@ -20,7 +20,7 @@
import typing
import abc
from bidict import bidict
from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger
from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag
from construct import Struct, Enum, Tell, BitStruct, this, Padding
from construct import Prefixed, GreedyRange, GreedyBytes
@@ -51,13 +51,13 @@ class UserDataHeader:
return False
@classmethod
def from_bytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
if isinstance(inb, str):
inb = h2b(inb)
res = cls._construct.parse(inb)
return cls(res['ies']), res['data']
def to_bytes(self) -> bytes:
def toBytes(self) -> bytes:
return self._construct.build({'ies':self.ies, 'data':b''})
@@ -117,7 +117,7 @@ class AddressField:
return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits)
@classmethod
def from_bytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
"""Construct an AddressField instance from the binary T-PDU address format."""
if isinstance(inb, str):
inb = h2b(inb)
@@ -129,16 +129,16 @@ class AddressField:
return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:]
@classmethod
def from_smpp(cls, addr, ton, npi) -> 'AddressField':
def fromSmpp(cls, addr, ton, npi) -> 'AddressField':
"""Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu."""
# return the resulting instance
return cls(addr.decode('ascii'), AddressField.smpp_map_ton[ton.name], AddressField.smpp_map_npi[npi.name])
def to_smpp(self):
def toSmpp(self):
"""Return smpp.pdo.*.source,dest}_addr_{,ton,npi} attributes for given AddressField."""
return (self.digits, self.smpp_map_ton.inverse[self.ton], self.smpp_map_npi.inverse[self.npi])
def to_bytes(self) -> bytes:
def toBytes(self) -> bytes:
"""Encode the AddressField into the binary representation as used in T-PDU."""
num_digits = len(self.digits)
if num_digits % 2:
@@ -185,12 +185,13 @@ class SMS_DELIVER(SMS_TPDU):
return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud)
@classmethod
def from_bytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
"""Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU."""
if isinstance(inb, str):
inb = h2b(inb)
flags = inb[0]
d = SMS_DELIVER.flags_construct.parse(inb)
oa, remainder = AddressField.from_bytes(inb[1:])
oa, remainder = AddressField.fromBytes(inb[1:])
d['tp_oa'] = oa
offset = 0
d['tp_pid'] = remainder[offset]
@@ -205,7 +206,7 @@ class SMS_DELIVER(SMS_TPDU):
d['tp_ud'] = remainder[offset:]
return cls(**d)
def to_bytes(self) -> bytes:
def toBytes(self) -> bytes:
"""Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU."""
outb = bytearray()
d = {
@@ -214,7 +215,7 @@ class SMS_DELIVER(SMS_TPDU):
}
flags = SMS_DELIVER.flags_construct.build(d)
outb.extend(flags)
outb.extend(self.tp_oa.to_bytes())
outb.extend(self.tp_oa.toBytes())
outb.append(self.tp_pid)
outb.append(self.tp_dcs)
outb.extend(self.tp_scts)
@@ -224,18 +225,18 @@ class SMS_DELIVER(SMS_TPDU):
return outb
@classmethod
def from_smpp(cls, smpp_pdu) -> 'SMS_DELIVER':
def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER':
"""Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu."""
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
return cls.from_smpp_submit(smpp_pdu)
return cls.fromSmppSubmit(smpp_pdu)
else:
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
@classmethod
def from_smpp_submit(cls, smpp_pdu) -> 'SMS_DELIVER':
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER':
"""Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
tp_oa = AddressField.from_smpp(smpp_pdu.params['source_addr'],
tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'],
smpp_pdu.params['source_addr_ton'],
smpp_pdu.params['source_addr_npi'])
tp_ud = smpp_pdu.params['short_message']
@@ -275,7 +276,7 @@ class SMS_SUBMIT(SMS_TPDU):
return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud)
@classmethod
def from_bytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
"""Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU."""
offset = 0
if isinstance(inb, str):
@@ -284,7 +285,7 @@ class SMS_SUBMIT(SMS_TPDU):
offset += 1
d['tp_mr']= inb[offset]
offset += 1
da, remainder = AddressField.from_bytes(inb[2:])
da, remainder = AddressField.fromBytes(inb[2:])
d['tp_da'] = da
offset = 0
@@ -302,10 +303,12 @@ class SMS_SUBMIT(SMS_TPDU):
# TODO: further decode
d['tp_vp'] = remainder[offset:offset+7]
offset += 7
pass
elif d['tp_vpf'] == 'absolute':
# TODO: further decode
d['tp_vp'] = remainder[offset:offset+7]
offset += 7
pass
else:
raise ValueError('Invalid VPF: %s' % d['tp_vpf'])
d['tp_udl'] = remainder[offset]
@@ -313,7 +316,7 @@ class SMS_SUBMIT(SMS_TPDU):
d['tp_ud'] = remainder[offset:]
return cls(**d)
def to_bytes(self) -> bytes:
def toBytes(self) -> bytes:
"""Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU."""
outb = bytearray()
d = {
@@ -323,7 +326,7 @@ class SMS_SUBMIT(SMS_TPDU):
flags = SMS_SUBMIT.flags_construct.build(d)
outb.extend(flags)
outb.append(self.tp_mr)
outb.extend(self.tp_da.to_bytes())
outb.extend(self.tp_da.toBytes())
outb.append(self.tp_pid)
outb.append(self.tp_dcs)
if self.tp_vpf != 'none':
@@ -333,20 +336,20 @@ class SMS_SUBMIT(SMS_TPDU):
return outb
@classmethod
def from_smpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
"""Construct a SMS_SUBMIT instance from the format used by smpp.pdu."""
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
return cls.from_smpp_submit(smpp_pdu)
return cls.fromSmppSubmit(smpp_pdu)
else:
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
@classmethod
def from_smpp_submit(cls, smpp_pdu) -> 'SMS_SUBMIT':
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT':
"""Construct a SMS_SUBMIT instance from the submit format used by smpp.pdu."""
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
tp_da = AddressField.from_smpp(smpp_pdu.params['destination_addr'],
smpp_pdu.params['dest_addr_ton'],
smpp_pdu.params['dest_addr_npi'])
tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'],
smpp_pdu.params['dest_addr_ton'],
smpp_pdu.params['dest_addr_npi'])
tp_ud = smpp_pdu.params['short_message']
#vp_smpp = smpp_pdu.params['validity_period']
#if not vp_smpp:
@@ -367,7 +370,7 @@ class SMS_SUBMIT(SMS_TPDU):
}
return cls(**d)
def to_smpp(self) -> pdu_types.PDU:
def toSmpp(self) -> pdu_types.PDU:
"""Translate a SMS_SUBMIT instance to a smpp.pdu.operations.SubmitSM instance."""
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT)
reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED)
@@ -379,7 +382,7 @@ class SMS_SUBMIT(SMS_TPDU):
if self.tp_dcs != 0xF6:
raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now')
dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
(daddr, ton, npi) = self.tp_da.to_smpp()
(daddr, ton, npi) = self.tp_da.toSmpp()
return operations.SubmitSM(service_type='',
source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC,
source_addr_npi=pdu_types.AddrNpi.UNKNOWN,

View File

@@ -17,15 +17,14 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from struct import unpack
from construct import FlagsEnum, Byte, Struct, Int8ub, Bytes, Mapping, Enum, Padding, BitsInteger
from construct import Bit, this, Int32ub, Int16ub, Nibble, BytesInteger, GreedyRange
from construct import Optional as COptional
from pytlv.TLV import *
from struct import pack, unpack
from pySim.utils import *
from pySim.filesystem import *
from pySim.runtime import RuntimeState
from pySim.ts_102_221 import CardProfileUICC
from pySim.construct import *
from construct import *
import pySim
key_type2str = {
@@ -71,7 +70,7 @@ class EF_PIN(TransparentEF):
'attempts_remaining'/Int8ub,
'maximum_attempts'/Int8ub,
'pin'/HexAdapter(Rpad(Bytes(8))),
'puk'/COptional(PukStruct))
'puk'/Optional(PukStruct))
class EF_MILENAGE_CFG(TransparentEF):

View File

@@ -16,18 +16,22 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect
import abc
import re
from typing import List, Tuple
from typing import Optional, List, Dict, Any, Tuple
from bidict import bidict
from construct import *
from pySim.utils import bertlv_encode_len, bertlv_parse_len, bertlv_encode_tag, bertlv_parse_tag
from pySim.utils import comprehensiontlv_encode_tag, comprehensiontlv_parse_tag
from pySim.utils import bertlv_parse_tag_raw, comprehensiontlv_parse_tag_raw
from pySim.utils import dgi_parse_tag_raw, dgi_parse_len, dgi_encode_tag, dgi_encode_len
from pySim.construct import build_construct, parse_construct
from pySim.construct import build_construct, parse_construct, LV, HexAdapter, BcdAdapter, BitsRFU, GsmStringAdapter
from pySim.exceptions import *
import inspect
import abc
import re
def camel_to_snake(name):
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
@@ -37,9 +41,9 @@ class TlvMeta(abc.ABCMeta):
"""Metaclass which we use to set some class variables at the time of defining a subclass.
This allows us to create subclasses for each TLV/IE type, where the class represents fixed
parameters like the tag/type and instances of it represent the actual TLV data."""
def __new__(mcs, name, bases, namespace, **kwargs):
#print("TlvMeta_new_(mcs=%s, name=%s, bases=%s, namespace=%s, kwargs=%s)" % (mcs, name, bases, namespace, kwargs))
x = super().__new__(mcs, name, bases, namespace)
def __new__(metacls, name, bases, namespace, **kwargs):
#print("TlvMeta_new_(metacls=%s, name=%s, bases=%s, namespace=%s, kwargs=%s)" % (metacls, name, bases, namespace, kwargs))
x = super().__new__(metacls, name, bases, namespace)
# this becomes a _class_ variable, not an instance variable
x.tag = namespace.get('tag', kwargs.get('tag', None))
x.desc = namespace.get('desc', kwargs.get('desc', None))
@@ -60,9 +64,9 @@ class TlvCollectionMeta(abc.ABCMeta):
"""Metaclass which we use to set some class variables at the time of defining a subclass.
This allows us to create subclasses for each Collection type, where the class represents fixed
parameters like the nested IE classes and instances of it represent the actual TLV data."""
def __new__(mcs, name, bases, namespace, **kwargs):
#print("TlvCollectionMeta_new_(mcs=%s, name=%s, bases=%s, namespace=%s, kwargs=%s)" % (mcs, name, bases, namespace, kwargs))
x = super().__new__(mcs, name, bases, namespace)
def __new__(metacls, name, bases, namespace, **kwargs):
#print("TlvCollectionMeta_new_(metacls=%s, name=%s, bases=%s, namespace=%s, kwargs=%s)" % (metacls, name, bases, namespace, kwargs))
x = super().__new__(metacls, name, bases, namespace)
# this becomes a _class_ variable, not an instance variable
x.possible_nested = namespace.get('nested', kwargs.get('nested', None))
return x
@@ -83,7 +87,7 @@ class Transcodable(abc.ABC):
def to_bytes(self, context: dict = {}) -> bytes:
"""Convert from internal representation to binary bytes. Store the binary result
in the internal state and return it."""
if self.decoded is None:
if self.decoded == None:
do = b''
elif self._construct:
do = build_construct(self._construct, self.decoded, context)
@@ -164,7 +168,10 @@ class IE(Transcodable, metaclass=TlvMeta):
def is_constructed(self):
"""Is this IE constructed by further nested IEs?"""
return bool(len(self.children) > 0)
if len(self.children):
return True
else:
return False
@abc.abstractmethod
def to_ie(self, context: dict = {}) -> bytes:
@@ -193,6 +200,9 @@ class IE(Transcodable, metaclass=TlvMeta):
class TLV_IE(IE):
"""Abstract base class for various TLV type Information Elements."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def _compute_tag(self) -> int:
"""Compute the tag (sometimes the tag encodes part of the value)."""
return self.tag
@@ -245,6 +255,9 @@ class TLV_IE(IE):
class BER_TLV_IE(TLV_IE):
"""TLV_IE formatted as ASN.1 BER described in ITU-T X.690 8.1.2."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@classmethod
def _decode_tag(cls, do: bytes) -> Tuple[dict, bytes]:
return bertlv_parse_tag(do)
@@ -293,6 +306,9 @@ class COMPR_TLV_IE(TLV_IE):
class DGI_TLV_IE(TLV_IE):
"""TLV_IE formated as GlobalPlatform Systems Scripting Language Specification v1.1.0 Annex B."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@classmethod
def _parse_tag_raw(cls, do: bytes) -> Tuple[int, bytes]:
return dgi_parse_tag_raw(do)
@@ -364,14 +380,14 @@ class TLV_IE_Collection(metaclass=TlvCollectionMeta):
while len(remainder):
context['siblings'] = res
# obtain the tag at the start of the remainder
tag, _r = first._parse_tag_raw(remainder)
if tag is None:
tag, r = first._parse_tag_raw(remainder)
if tag == None:
break
if tag in self.members_by_tag:
cls = self.members_by_tag[tag]
# create an instance and parse accordingly
inst = cls()
_dec, remainder = inst.from_tlv(remainder, context=context)
dec, remainder = inst.from_tlv(remainder, context=context)
res.append(inst)
else:
# unknown tag; create the related class on-the-fly using the same base class
@@ -382,7 +398,7 @@ class TLV_IE_Collection(metaclass=TlvCollectionMeta):
cls._to_bytes = lambda s: bytes.fromhex(s.decoded['raw'])
# create an instance and parse accordingly
inst = cls()
_dec, remainder = inst.from_tlv(remainder, context=context)
dec, remainder = inst.from_tlv(remainder, context=context)
res.append(inst)
self.children = res
return res
@@ -443,7 +459,7 @@ def flatten_dict_lists(inp):
return True
def are_elements_unique(lod):
set_of_keys = {list(x.keys())[0] for x in lod}
set_of_keys = set([list(x.keys())[0] for x in lod])
return len(lod) == len(set_of_keys)
if isinstance(inp, list):
@@ -455,10 +471,10 @@ def flatten_dict_lists(inp):
newdict[key] = e[key]
inp = newdict
# process result as any native dict
return {k:flatten_dict_lists(v) for k,v in inp.items()}
return {k:flatten_dict_lists(inp[k]) for k in inp.keys()}
else:
return [flatten_dict_lists(x) for x in inp]
elif isinstance(inp, dict):
return {k:flatten_dict_lists(v) for k,v in inp.items()}
return {k:flatten_dict_lists(inp[k]) for k in inp.keys()}
else:
return inp

View File

@@ -136,7 +136,7 @@ class LinkBase(abc.ABC):
# available. There are two SWs commonly used for this 9fxx (sim) and 61xx (usim), where
# xx is the number of response bytes available.
# See also:
if sw is not None:
if (sw is not None):
while ((sw[0:2] == '9f') or (sw[0:2] == '61')):
# SW1=9F: 3GPP TS 51.011 9.4.1, Responses to commands which are correctly executed
# SW1=61: ISO/IEC 7816-4, Table 5 — General meaning of the interindustry values of SW1-SW2

View File

@@ -24,7 +24,7 @@ import argparse
from typing import Optional
from pySim.transport import LinkBase
from pySim.exceptions import ReaderError, ProtocolError
from pySim.exceptions import *
from pySim.utils import h2b, b2h, Hexstr, ResTuple
@@ -58,9 +58,9 @@ class L1CTLMessageReset(L1CTLMessage):
L1CTL_RES_T_FULL = 0x01
L1CTL_RES_T_SCHED = 0x02
def __init__(self, ttype=L1CTL_RES_T_FULL):
super().__init__(self.L1CTL_RESET_REQ)
self.data += struct.pack("Bxxx", ttype)
def __init__(self, type=L1CTL_RES_T_FULL):
super(L1CTLMessageReset, self).__init__(self.L1CTL_RESET_REQ)
self.data += struct.pack("Bxxx", type)
class L1CTLMessageSIM(L1CTLMessage):
@@ -70,7 +70,7 @@ class L1CTLMessageSIM(L1CTLMessage):
L1CTL_SIM_CONF = 0x17
def __init__(self, pdu):
super().__init__(self.L1CTL_SIM_REQ)
super(L1CTLMessageSIM, self).__init__(self.L1CTL_SIM_REQ)
self.data += pdu

View File

@@ -17,15 +17,15 @@
#
import logging as log
import serial
import time
import re
import argparse
from typing import Optional
import serial
from pySim.utils import Hexstr, ResTuple
from pySim.transport import LinkBase
from pySim.exceptions import ReaderError, ProtocolError
from pySim.exceptions import *
# HACK: if somebody needs to debug this thing
# log.root.setLevel(log.DEBUG)
@@ -57,7 +57,7 @@ class ModemATCommandLink(LinkBase):
def send_at_cmd(self, cmd, timeout=0.2, patience=0.002):
# Convert from string to bytes, if needed
bcmd = cmd if isinstance(cmd, bytes) else cmd.encode()
bcmd = cmd if type(cmd) is bytes else cmd.encode()
bcmd += b'\r'
# Clean input buffer from previous/unexpected data
@@ -67,9 +67,9 @@ class ModemATCommandLink(LinkBase):
log.debug('Sending AT command: %s', cmd)
try:
wlen = self._sl.write(bcmd)
assert wlen == len(bcmd)
except Exception as exc:
raise ReaderError('Failed to send AT command: %s' % cmd) from exc
assert(wlen == len(bcmd))
except:
raise ReaderError('Failed to send AT command: %s' % cmd)
rsp = b''
its = 1
@@ -91,7 +91,8 @@ class ModemATCommandLink(LinkBase):
break
time.sleep(patience)
its += 1
log.debug('Command took %0.6fs (%d cycles a %fs)', time.time() - t_start, its, patience)
log.debug('Command took %0.6fs (%d cycles a %fs)',
time.time() - t_start, its, patience)
if self._echo:
# Skip echo chars
@@ -119,10 +120,11 @@ class ModemATCommandLink(LinkBase):
if result[-1] == b'OK':
self._echo = False
return
if result[-1] == b'AT\r\r\nOK':
elif result[-1] == b'AT\r\r\nOK':
self._echo = True
return
raise ReaderError('Interface \'%s\' does not respond to \'AT\' command' % self._device)
raise ReaderError(
'Interface \'%s\' does not respond to \'AT\' command' % self._device)
def reset_card(self):
# Reset the modem, just to be sure
@@ -133,7 +135,7 @@ class ModemATCommandLink(LinkBase):
if self.send_at_cmd('AT+CSIM=?') != [b'OK']:
raise ReaderError('The modem does not seem to support SIM access')
log.info('Modem at \'%s\' is ready!', self._device)
log.info('Modem at \'%s\' is ready!' % self._device)
def connect(self):
pass # Nothing to do really ...
@@ -163,9 +165,9 @@ class ModemATCommandLink(LinkBase):
# Make sure that the response has format: b'+CSIM: %d,\"%s\"'
try:
result = re.match(b'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
(_rsp_pdu_len, rsp_pdu) = result.groups()
except Exception as exc:
raise ReaderError('Failed to parse response from modem: %s' % rsp) from exc
(rsp_pdu_len, rsp_pdu) = result.groups()
except:
raise ReaderError('Failed to parse response from modem: %s' % rsp)
# TODO: make sure we have at least SW
data = rsp_pdu[:-4].decode().lower()

View File

@@ -19,11 +19,11 @@
import argparse
import re
from typing import Optional
from typing import Optional, Union
from smartcard.CardConnection import CardConnection
from smartcard.CardRequest import CardRequest
from smartcard.Exceptions import NoCardException, CardRequestTimeoutException, CardConnectionException
from smartcard.Exceptions import NoCardException, CardRequestTimeoutException, CardConnectionException, CardConnectionException
from smartcard.System import readers
from pySim.exceptions import NoCardError, ProtocolError, ReaderError
@@ -63,14 +63,15 @@ class PcscSimLink(LinkBase):
self._con.disconnect()
except:
pass
return
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
cr = CardRequest(readers=[self._reader],
timeout=timeout, newcardonly=newcardonly)
try:
cr.waitforcard()
except CardRequestTimeoutException as exc:
raise NoCardError() from exc
except CardRequestTimeoutException:
raise NoCardError()
self.connect()
def connect(self):
@@ -81,10 +82,10 @@ class PcscSimLink(LinkBase):
# Explicitly select T=0 communication protocol
self._con.connect(CardConnection.T0_protocol)
except CardConnectionException as exc:
raise ProtocolError() from exc
except NoCardException as exc:
raise NoCardError() from exc
except CardConnectionException:
raise ProtocolError()
except NoCardException:
raise NoCardError()
def get_atr(self) -> Hexstr:
return self._con.getATR()

View File

@@ -16,11 +16,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import serial
import time
import os
import argparse
from typing import Optional
import serial
from pySim.exceptions import NoCardError, ProtocolError
from pySim.transport import LinkBase
@@ -51,7 +51,7 @@ class SerialSimLink(LinkBase):
self._atr = None
def __del__(self):
if hasattr(self, "_sl"):
if (hasattr(self, "_sl")):
self._sl.close()
def wait_for_card(self, timeout: Optional[int] = None, newcardonly: bool = False):
@@ -62,7 +62,8 @@ class SerialSimLink(LinkBase):
self.reset_card()
if not newcardonly:
return
existing = True
else:
existing = True
except NoCardError:
pass
@@ -85,7 +86,7 @@ class SerialSimLink(LinkBase):
# Tolerate a couple of protocol error ... can happen if
# we try when the card is 'half' inserted
pe += 1
if pe > 2:
if (pe > 2):
raise
# Timed out ...
@@ -104,7 +105,7 @@ class SerialSimLink(LinkBase):
rv = self._reset_card()
if rv == 0:
raise NoCardError()
if rv < 0:
elif rv < 0:
raise ProtocolError()
return rv
@@ -119,8 +120,8 @@ class SerialSimLink(LinkBase):
try:
rst_meth = rst_meth_map[self._rst_pin[1:]]
rst_val = rst_val_map[self._rst_pin[0]]
except Exception as exc:
raise ValueError('Invalid reset pin %s' % self._rst_pin) from exc
except:
raise ValueError('Invalid reset pin %s' % self._rst_pin)
rst_meth(rst_val)
time.sleep(0.1) # 100 ms
@@ -202,7 +203,7 @@ class SerialSimLink(LinkBase):
b = self._rx_byte()
if ord(b) == pdu[1]:
break
if b != '\x60':
elif b != '\x60':
# Ok, it 'could' be SW1
sw1 = b
sw2 = self._rx_byte()
@@ -221,7 +222,7 @@ class SerialSimLink(LinkBase):
to_recv = data_len - len(pdu) + 5 + 2
data = bytes(0)
while len(data) < to_recv:
while (len(data) < to_recv):
b = self._rx_byte()
if (to_recv == 2) and (b == '\x60'): # Ignore NIL if we have no RX data (hack ?)
continue

View File

@@ -1,7 +1,7 @@
# coding=utf-8
"""Utilities / Functions related to ETSI TS 102 221, the core UICC spec.
(C) 2021-2024 by Harald Welte <laforge@osmocom.org>
(C) 2021 by Harald Welte <laforge@osmocom.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -16,22 +16,22 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from bidict import bidict
from construct import Select, Const, Bit, Struct, Int16ub, FlagsEnum, GreedyString, ValidationError
from construct import *
from construct import Optional as COptional
from pySim.construct import *
from pySim.utils import *
from pySim.filesystem import *
from pySim.tlv import *
from bidict import bidict
from pySim.profile import CardProfile
from pySim.profile import match_uicc
from pySim import iso7816_4
from pySim.profile import match_sim
import pySim.iso7816_4 as iso7816_4
# A UICC will usually also support 2G functionality. If this is the case, we
# need to add DF_GSM and DF_TELECOM along with the UICC related files
from pySim.ts_51_011 import AddonSIM
from pySim.ts_51_011 import DF_GSM, DF_TELECOM, AddonSIM
from pySim.gsm_r import AddonGSMR
from pySim.cdma_ruim import AddonRUIM
@@ -80,10 +80,6 @@ ts_102_22x_cmdset = CardCommandSet('TS 102 22x', [
CardCommand('RESIZE FILE', 0xD4, ['8X', 'CX']),
])
# ETSI TS 102 221 6.2.1
SupplyVoltageClasses = FlagsEnum(Int8ub, a=0x1, b=0x2, c=0x4, d=0x8, e=0x10)
# ETSI TS 102 221 11.1.1.4.2
class FileSize(BER_TLV_IE, tag=0x80):
_construct = GreedyInteger(minlen=2)
@@ -135,7 +131,7 @@ class UiccCharacteristics(BER_TLV_IE, tag=0x80):
# ETSI TS 102 221 11.1.1.4.6.2
class ApplicationPowerConsumption(BER_TLV_IE, tag=0x81):
_construct = Struct('voltage_class'/SupplyVoltageClasses,
_construct = Struct('voltage_class'/Int8ub,
'power_consumption_ma'/Int8ub,
'reference_freq_100k'/Int8ub)
@@ -238,19 +234,20 @@ class LifeCycleStatusInteger(BER_TLV_IE, tag=0x8A):
def _to_bytes(self):
if self.decoded == 'no_information':
return b'\x00'
if self.decoded == 'creation':
elif self.decoded == 'creation':
return b'\x01'
if self.decoded == 'initialization':
elif self.decoded == 'initialization':
return b'\x03'
if self.decoded == 'operational_activated':
elif self.decoded == 'operational_activated':
return b'\x05'
if self.decoded == 'operational_deactivated':
elif self.decoded == 'operational_deactivated':
return b'\x04'
if self.decoded == 'termination':
elif self.decoded == 'termination':
return b'\x0c'
if isinstance(self.decoded, int):
elif isinstance(self.decoded, int):
return self.decoded.to_bytes(1, 'big')
raise ValueError
else:
raise ValueError
# ETSI TS 102 221 11.1.1.4.9
class PS_DO(BER_TLV_IE, tag=0x90):
@@ -287,33 +284,6 @@ def tlv_val_interpret(inmap, indata):
return val
return {d[0]: newval(inmap, d[0], d[1]) for d in indata.items()}
# TS 102 221 11.1.19.2.1
class TerminalPowerSupply(BER_TLV_IE, tag=0x80):
_construct = Struct('used_supply_voltage_class'/SupplyVoltageClasses,
'maximum_available_power_supply'/Int8ub,
'actual_used_freq_100k'/Int8ub)
# TS 102 221 11.1.19.2.2
class ExtendedLchanTerminalSupport(BER_TLV_IE, tag=0x81):
_construct = GreedyBytes
# TS 102 221 11.1.19.2.3
class AdditionalInterfacesSupport(BER_TLV_IE, tag=0x82):
_construct = FlagsEnum(Int8ub, uicc_clf=0x01)
# TS 102 221 11.1.19.2.4 + SGP.32 v3.0 3.4.2 RSP Device Capabilities
class AdditionalTermCapEuicc(BER_TLV_IE, tag=0x83):
_construct = FlagsEnum(Int8ub, lui_d=0x01, lpd_d=0x02, lds_d=0x04, lui_e_scws=0x08,
metadata_update_alerting=0x10,
enterprise_capable_device=0x20,
lui_e_e4e=0x40,
lpr=0x80)
# TS 102 221 11.1.19.2.0
class TerminalCapability(BER_TLV_IE, tag=0xa9, nested=[TerminalPowerSupply, ExtendedLchanTerminalSupport,
AdditionalInterfacesSupport, AdditionalTermCapEuicc]):
pass
# ETSI TS 102 221 Section 9.2.7 + ISO7816-4 9.3.3/9.3.4
class _AM_DO_DF(DataObject):
def __init__(self):
@@ -511,11 +481,12 @@ class CRT_DO(DataObject):
def from_bytes(self, do: bytes):
"""Decode a Control Reference Template DO."""
if len(do) != 6:
raise ValueError('Unsupported CRT DO length: %s' %do)
raise ValueError('Unsupported CRT DO length: %s', do)
if do[0] != 0x83 or do[1] != 0x01:
raise ValueError('Unsupported Key Ref Tag or Len in CRT DO %s' % do)
raise ValueError('Unsupported Key Ref Tag or Len in CRT DO %s', do)
if do[3:] != b'\x95\x01\x08':
raise ValueError('Unsupported Usage Qualifier Tag or Len in CRT DO %s' % do)
raise ValueError(
'Unsupported Usage Qualifier Tag or Len in CRT DO %s', do)
self.encoded = do[0:6]
self.decoded = pin_names[do[2]]
return do[6:]
@@ -549,7 +520,7 @@ class SecCondByte_DO(DataObject):
if inb & 0x10:
res.append('user_auth')
rd = {'mode': cond}
if len(res) > 0:
if len(res):
rd['conditions'] = res
self.decoded = rd
@@ -752,7 +723,7 @@ class EF_ARR(LinFixedEF):
raise ValueError
return by_mode
def _decode_record_bin(self, raw_bin_data, **_kwargs):
def _decode_record_bin(self, raw_bin_data, **kwargs):
# we can only guess if we should decode for EF or DF here :(
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
dec = arr_seq.decode_multi(raw_bin_data)
@@ -760,17 +731,20 @@ class EF_ARR(LinFixedEF):
# 'un-flattening' decoder, and hence would be unable to encode :(
return dec[0]
def _encode_record_bin(self, in_json, **_kwargs):
def _encode_record_bin(self, in_json, **kwargs):
# we can only guess if we should decode for EF or DF here :(
arr_seq = DataObjectSequence('arr', sequence=[AM_DO_EF, SC_DO])
return arr_seq.encode_multi(in_json)
@with_default_category('File-Specific Commands')
class AddlShellCommands(CommandSet):
def __init__(self):
super().__init__()
@cmd2.with_argparser(LinFixedEF.ShellCommands.read_rec_dec_parser)
def do_read_arr_record(self, opts):
"""Read one EF.ARR record in flattened, human-friendly form."""
(data, _sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
(data, sw) = self._cmd.lchan.read_record_dec(opts.record_nr)
data = self._cmd.lchan.selected_file.flatten(data)
self._cmd.poutput_json(data, opts.oneline)
@@ -781,7 +755,7 @@ class EF_ARR(LinFixedEF):
# collect all results in list so they are rendered as JSON list when printing
data_list = []
for recnr in range(1, 1 + num_of_rec):
(data, _sw) = self._cmd.lchan.read_record_dec(recnr)
(data, sw) = self._cmd.lchan.read_record_dec(recnr)
data = self._cmd.lchan.selected_file.flatten(data)
data_list.append(data)
self._cmd.poutput_json(data_list, opts.oneline)
@@ -895,10 +869,10 @@ class CardProfileUICC(CardProfile):
shell_cmdsets = [self.AddlShellCommands()], addons = addons)
@staticmethod
def decode_select_response(data_hex: str) -> object:
def decode_select_response(resp_hex: str) -> object:
"""ETSI TS 102 221 Section 11.1.1.3"""
t = FcpTemplate()
t.from_tlv(h2b(data_hex))
t.from_tlv(h2b(resp_hex))
d = t.to_dict()
return flatten_dict_lists(d['fcp_template'])
@@ -932,81 +906,3 @@ class CardProfileUICC(CardProfile):
of the card is required between SUSPEND and RESUME, and only very few non-RESUME
commands are permitted between SUSPEND and RESUME. See TS 102 221 Section 11.1.22."""
self._cmd.card._scc.resume_uicc(opts.token)
term_cap_parser = argparse.ArgumentParser()
# power group
tc_power_grp = term_cap_parser.add_argument_group('Terminal Power Supply')
tc_power_grp.add_argument('--used-supply-voltage-class', type=str, choices=['a','b','c','d','e'],
help='Actual used Supply voltage class')
tc_power_grp.add_argument('--maximum-available-power-supply', type=auto_uint8,
help='Maximum available power supply of the terminal')
tc_power_grp.add_argument('--actual-used-freq-100k', type=auto_uint8,
help='Actual used clock frequency (in units of 100kHz)')
# no separate groups for those two
tc_elc_grp = term_cap_parser.add_argument_group('Extended logical channels terminal support')
tc_elc_grp.add_argument('--extended-logical-channel', action='store_true',
help='Extended Logical Channel supported')
tc_aif_grp = term_cap_parser.add_argument_group('Additional interfaces support')
tc_aif_grp.add_argument('--uicc-clf', action='store_true',
help='Local User Interface in the Device (LUId) supported')
# eUICC group
tc_euicc_grp = term_cap_parser.add_argument_group('Additional Terminal capability indications related to eUICC')
tc_euicc_grp.add_argument('--lui-d', action='store_true',
help='Local User Interface in the Device (LUId) supported')
tc_euicc_grp.add_argument('--lpd-d', action='store_true',
help='Local Profile Download in the Device (LPDd) supported')
tc_euicc_grp.add_argument('--lds-d', action='store_true',
help='Local Discovery Service in the Device (LPDd) supported')
tc_euicc_grp.add_argument('--lui-e-scws', action='store_true',
help='LUIe based on SCWS supported')
tc_euicc_grp.add_argument('--metadata-update-alerting', action='store_true',
help='Metadata update alerting supported')
tc_euicc_grp.add_argument('--enterprise-capable-device', action='store_true',
help='Enterprise Capable Device')
tc_euicc_grp.add_argument('--lui-e-e4e', action='store_true',
help='LUIe using E4E (ENVELOPE tag E4) supported')
tc_euicc_grp.add_argument('--lpr', action='store_true',
help='LPR (LPA Proxy) supported')
@cmd2.with_argparser(term_cap_parser)
def do_terminal_capability(self, opts):
"""Perform the TERMINAL CAPABILITY function. Used to inform the UICC about terminal capability."""
ps_flags = {}
addl_if_flags = {}
euicc_flags = {}
opts_dict = vars(opts)
power_items = ['used_supply_voltage_class', 'maximum_available_power_supply', 'actual_used_freq_100k']
if any(opts_dict[x] for x in power_items):
if not all(opts_dict[x] for x in power_items):
raise argparse.ArgumentTypeError('If any of the Terminal Power Supply group options are used, all must be specified')
for k, v in opts_dict.items():
if k in AdditionalInterfacesSupport._construct.flags.keys():
addl_if_flags[k] = v
elif k in AdditionalTermCapEuicc._construct.flags.keys():
euicc_flags[k] = v
elif k in [f.name for f in TerminalPowerSupply._construct.subcons]:
if k == 'used_supply_voltage_class' and v:
v = {v: True}
ps_flags[k] = v
child_list = []
if any(x for x in ps_flags.values()):
child_list.append(TerminalPowerSupply(decoded=ps_flags))
if opts.extended_logical_channel:
child_list.append(ExtendedLchanTerminalSupport())
if any(x for x in addl_if_flags.values()):
child_list.append(AdditionalInterfacesSupport(decoded=addl_if_flags))
if any(x for x in euicc_flags.values()):
child_list.append(AdditionalTermCapEuicc(decoded=euicc_flags))
print(child_list)
tc = TerminalCapability(children=child_list)
self.terminal_capability(b2h(tc.to_tlv()))
def terminal_capability(self, data:Hexstr):
cmd_hex = "80AA0000%02x%s" % (len(data)//2, data)
_rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)

View File

@@ -18,12 +18,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import List
import argparse
import cmd2
from cmd2 import CommandSet, with_default_category
from cmd2 import CommandSet, with_default_category, with_argparser
import argparse
from pySim.utils import b2h, auto_uint8, auto_uint16, is_hexstr
from pySim.exceptions import *
from pySim.utils import h2b, swap_nibbles, b2h, JsonEncoder, auto_uint8, auto_uint16
from pySim.ts_102_221 import *
@@ -31,6 +32,9 @@ from pySim.ts_102_221 import *
class Ts102222Commands(CommandSet):
"""Administrative commands for telecommunication applications."""
def __init__(self):
super().__init__()
delfile_parser = argparse.ArgumentParser()
delfile_parser.add_argument('--force-delete', action='store_true',
help='I really want to permanently delete the file. I know pySim cannot re-create it yet!')
@@ -45,7 +49,7 @@ class Ts102222Commands(CommandSet):
self._cmd.perror("Refusing to permanently delete the file, please read the help text.")
return
f = self._cmd.lchan.get_file_for_selectable(opts.NAME)
(_data, _sw) = self._cmd.lchan.scc.delete_file(f.fid)
(data, sw) = self._cmd.lchan.scc.delete_file(f.fid)
def complete_delete_file(self, text, line, begidx, endidx) -> List[str]:
"""Command Line tab completion for DELETE FILE"""
@@ -66,7 +70,7 @@ class Ts102222Commands(CommandSet):
self._cmd.perror("Refusing to terminate the file, please read the help text.")
return
f = self._cmd.lchan.get_file_for_selectable(opts.NAME)
(_data, _sw) = self._cmd.lchan.scc.terminate_df(f.fid)
(data, sw) = self._cmd.lchan.scc.terminate_df(f.fid)
def complete_terminate_df(self, text, line, begidx, endidx) -> List[str]:
"""Command Line tab completion for TERMINATE DF"""
@@ -82,7 +86,7 @@ class Ts102222Commands(CommandSet):
self._cmd.perror("Refusing to terminate the file, please read the help text.")
return
f = self._cmd.lchan.get_file_for_selectable(opts.NAME)
(_data, _sw) = self._cmd.lchan.scc.terminate_ef(f.fid)
(data, sw) = self._cmd.lchan.scc.terminate_ef(f.fid)
def complete_terminate_ef(self, text, line, begidx, endidx) -> List[str]:
"""Command Line tab completion for TERMINATE EF"""
@@ -100,7 +104,7 @@ class Ts102222Commands(CommandSet):
if not opts.force_terminate_card:
self._cmd.perror("Refusing to permanently terminate the card, please read the help text.")
return
(_data, _sw) = self._cmd.lchan.scc.terminate_card_usage()
(data, sw) = self._cmd.lchan.scc.terminate_card_usage()
create_parser = argparse.ArgumentParser()
create_parser.add_argument('FILE_ID', type=is_hexstr, help='File Identifier as 4-character hex string')
@@ -145,7 +149,7 @@ class Ts102222Commands(CommandSet):
ShortFileIdentifier(decoded=opts.short_file_id),
]
fcp = FcpTemplate(children=ies)
(_data, _sw) = self._cmd.lchan.scc.create_file(b2h(fcp.to_tlv()))
(data, sw) = self._cmd.lchan.scc.create_file(b2h(fcp.to_tlv()))
# the newly-created file is automatically selected but our runtime state knows nothing of it
self._cmd.lchan.select_file(self._cmd.lchan.selected_file)
@@ -188,15 +192,15 @@ class Ts102222Commands(CommandSet):
ies.append(TotalFileSize(decoded=opts.total_file_size))
# TODO: Spec states PIN Status Template DO is mandatory
if opts.permit_rfm_create or opts.permit_rfm_delete_terminate or opts.permit_other_applet_create or opts.permit_other_applet_delete_terminate:
toolkit_ac = {
'rfm_create': opts.permit_rfm_create,
'rfm_delete_terminate': opts.permit_rfm_delete_terminate,
'other_applet_create': opts.permit_other_applet_create,
'other_applet_delete_terminate': opts.permit_other_applet_delete_terminate,
}
ies.append(ProprietaryInformation(children=[ToolkitAccessConditions(decoded=toolkit_ac)]))
toolkit_ac = {
'rfm_create': opts.permit_rfm_create,
'rfm_delete_terminate': opts.permit_rfm_delete_terminate,
'other_applet_create': opts.permit_other_applet_create,
'other_applet_delete_terminate': opts.permit_other_applet_delete_terminate,
}
ies.append(ProprietaryInformation(children=[ToolkitAccessConditions(decoded=toolkit_ac)]))
fcp = FcpTemplate(children=ies)
(_data, _sw) = self._cmd.lchan.scc.create_file(b2h(fcp.to_tlv()))
(data, sw) = self._cmd.lchan.scc.create_file(b2h(fcp.to_tlv()))
# the newly-created file is automatically selected but our runtime state knows nothing of it
self._cmd.lchan.select_file(self._cmd.lchan.selected_file)
@@ -213,7 +217,7 @@ class Ts102222Commands(CommandSet):
ies = [FileIdentifier(decoded=f.fid),
FileSize(decoded=opts.file_size)]
fcp = FcpTemplate(children=ies)
(_data, _sw) = self._cmd.lchan.scc.resize_file(b2h(fcp.to_tlv()))
(data, sw) = self._cmd.lchan.scc.resize_file(b2h(fcp.to_tlv()))
# the resized file is automatically selected but our runtime state knows nothing of it
self._cmd.lchan.select_file(self._cmd.lchan.selected_file)

View File

@@ -10,7 +10,7 @@ Various constants from 3GPP TS 31.102 V17.9.0
#
# Copyright (C) 2020 Supreeth Herle <herlesupreeth@gmail.com>
# Copyright (C) 2021-2024 Harald Welte <laforge@osmocom.org>
# Copyright (C) 2021-2023 Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -26,12 +26,7 @@ Various constants from 3GPP TS 31.102 V17.9.0
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import enum
from construct import Optional as COptional
from construct import Int32ub, Nibble, GreedyRange, Struct, FlagsEnum, Switch, this, Int16ub, Padding
from construct import Bytewise, Int24ub, PaddedString
# Mapping between USIM Service Number and its description
import pySim.ts_102_221
from pySim.ts_51_011 import EF_ACMmax, EF_AAeM, EF_eMLPP, EF_CMI, EF_PNN
from pySim.ts_51_011 import EF_MMSN, EF_MMSICP, EF_MMSUP, EF_MMSUCP, EF_VGCS, EF_VGCSS, EF_NIA
@@ -46,8 +41,11 @@ from pySim.ts_31_102_telecom import DF_PHONEBOOK, EF_UServiceTable
from pySim.construct import *
from pySim.utils import is_hexstr
from pySim.cat import SMS_TPDU, DeviceIdentities, SMSPPDownload
# Mapping between USIM Service Number and its description
from construct import Optional as COptional
from construct import *
from typing import Tuple
from struct import unpack, pack
import enum
EF_UST_map = {
1: 'Local Phone Book',
2: 'Fixed Dialling Numbers (FDN)',
@@ -402,15 +400,15 @@ class EF_LI(TransRecEF):
desc='Language Indication'):
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, rec_len=rec_len)
def _decode_record_bin(self, in_bin, **_kwargs):
def _decode_record_bin(self, in_bin, **kwargs):
if in_bin == b'\xff\xff':
return None
else:
# officially this is 7-bit GSM alphabet with one padding bit in each byte
return in_bin.decode('ascii')
def _encode_record_bin(self, in_json, **_kwargs):
if in_json is None:
def _encode_record_bin(self, in_json, **kwargs):
if in_json == None:
return b'\xff\xff'
else:
# officially this is 7-bit GSM alphabet with one padding bit in each byte
@@ -440,6 +438,9 @@ class EF_UST(EF_UServiceTable):
@with_default_category('File-Specific Commands')
class AddlShellCommands(CommandSet):
def __init__(self):
super().__init__()
def do_ust_service_activate(self, arg):
"""Activate a service within EF.UST"""
selected_file = self._cmd.lchan.selected_file
@@ -450,7 +451,7 @@ class EF_UST(EF_UServiceTable):
selected_file = self._cmd.lchan.selected_file
selected_file.ust_update(self._cmd, [], [int(arg)])
def do_ust_service_check(self, _arg):
def do_ust_service_check(self, arg):
"""Check consistency between services of this file and files present/activated.
Many services determine if one or multiple files shall be present/activated or if they shall be
@@ -502,7 +503,7 @@ class EF_ECC(LinFixedEF):
desc='Emergency Call Codes'):
super().__init__(fid, sfid=sfid, name=name, desc=desc, rec_len=(4, 20))
def _decode_record_bin(self, in_bin, **_kwargs):
def _decode_record_bin(self, in_bin, **kwargs):
# mandatory parts
code = in_bin[:3]
if code == b'\xff\xff\xff':
@@ -516,7 +517,7 @@ class EF_ECC(LinFixedEF):
ret['alpha_id'] = parse_construct(EF_ECC.alpha_construct, alpha_id)
return ret
def _encode_record_bin(self, in_json, **_kwargs):
def _encode_record_bin(self, in_json, **kwargs):
if in_json is None:
return b'\xff\xff\xff\xff'
code = EF_ECC.cc_construct.build(in_json['call_code'])
@@ -637,6 +638,9 @@ class EF_EST(EF_UServiceTable):
@with_default_category('File-Specific Commands')
class AddlShellCommands(CommandSet):
def __init__(self):
super().__init__()
def do_est_service_enable(self, arg):
"""Enable a service within EF.EST"""
selected_file = self._cmd.lchan.selected_file
@@ -679,7 +683,7 @@ class EF_RPLMNAcT(TransRecEF):
def __init__(self, fid='6f65', sfid=None, name='EF.RPLMNAcTD', size=(2, 4), rec_len=2,
desc='RPLMN Last used Access Technology', **kwargs):
super().__init__(fid, sfid=sfid, name=name, desc=desc, size=size, rec_len=rec_len, **kwargs)
def _decode_record_hex(self, in_hex, **_kwargs):
def _decode_record_hex(self, in_hex, **kwargs):
return dec_act(in_hex)
# TODO: Encode
@@ -1135,6 +1139,9 @@ class EF_5G_PROSE_ST(EF_UServiceTable):
@with_default_category('File-Specific Commands')
class AddlShellCommands(CommandSet):
def __init__(self):
super().__init__()
def do_prose_service_activate(self, arg):
"""Activate a service within EF.5G_PROSE_ST"""
selected_file = self._cmd.lchan.selected_file
@@ -1443,13 +1450,14 @@ class DF_SAIP(CardDF):
class ADF_USIM(CardADF):
def __init__(self, aid='a0000000871002', has_fs=True, name='ADF.USIM', fid=None, sfid=None,
desc='USIM Application', has_imsi=True):
desc='USIM Application'):
super().__init__(aid=aid, has_fs=has_fs, fid=fid, sfid=sfid, name=name, desc=desc)
# add those commands to the general commands of a TransparentEF
self.shell_commands += [self.AddlShellCommands()]
files = [
EF_LI(sfid=0x02),
EF_IMSI(sfid=0x07),
EF_Keys(),
EF_Keys('6f09', 0x09, 'EF.KeysPS',
desc='Ciphering and Integrity Keys for PS domain'),
@@ -1570,10 +1578,6 @@ class ADF_USIM(CardADF):
DF_5G_ProSe(service=139),
DF_SAIP(),
]
if has_imsi:
files.append(EF_IMSI(sfid=0x07))
self.add_files(files)
def decode_select_response(self, data_hex):
@@ -1581,6 +1585,9 @@ class ADF_USIM(CardADF):
@with_default_category('Application-Specific Commands')
class AddlShellCommands(CommandSet):
def __init__(self):
super().__init__()
authenticate_parser = argparse.ArgumentParser()
authenticate_parser.add_argument('rand', type=is_hexstr, help='Random challenge')
authenticate_parser.add_argument('autn', type=is_hexstr, help='Authentication Nonce')
@@ -1589,7 +1596,7 @@ class ADF_USIM(CardADF):
@cmd2.with_argparser(authenticate_parser)
def do_authenticate(self, opts):
"""Perform Authentication and Key Agreement (AKA)."""
(data, _sw) = self._cmd.lchan.scc.authenticate(opts.rand, opts.autn)
(data, sw) = self._cmd.lchan.scc.authenticate(opts.rand, opts.autn)
self._cmd.poutput_json(data)
term_prof_parser = argparse.ArgumentParser()
@@ -1647,7 +1654,7 @@ class ADF_USIM(CardADF):
context = 0x01 # SUCI
if opts.nswo_context:
context = 0x02 # SUCI 5G NSWO
(data, _sw) = self._cmd.lchan.scc.get_identity(context)
(data, sw) = self._cmd.lchan.scc.get_identity(context)
do = SUCI_TlvDataObject()
do.from_tlv(h2b(data))
do_d = do.to_dict()
@@ -1669,10 +1676,3 @@ sw_usim = {
class CardApplicationUSIM(CardApplication):
def __init__(self):
super().__init__('USIM', adf=ADF_USIM(), sw=sw_usim)
# TS 31.102 Annex N + TS 102 220 Annex E
class CardApplicationUSIMnonIMSI(CardApplication):
def __init__(self):
adf = ADF_USIM(aid='a000000087100b', name='ADF.USIM-non-IMSI', has_imsi=False,
desc='3GPP USIM (non-IMSI SUPI Type) - TS 31.102 Annex N')
super().__init__('USIM-non-IMSI', adf=adf, sw=sw_usim)

View File

@@ -26,12 +26,11 @@ Needs to be a separate python module to avoid cyclic imports
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from construct import Optional as COptional
from construct import Struct, Int16ub, Int32ub
from pySim.tlv import *
from pySim.filesystem import *
from pySim.construct import *
from construct import Optional as COptional
from construct import *
# TS 31.102 Section 4.2.8
class EF_UServiceTable(TransparentEF):
@@ -43,7 +42,7 @@ class EF_UServiceTable(TransparentEF):
def _bit_byte_offset_for_service(service: int) -> Tuple[int, int]:
i = service - 1
byte_offset = i//8
bit_offset = i % 8
bit_offset = (i % 8)
return (byte_offset, bit_offset)
def _decode_bin(self, in_bin):
@@ -74,7 +73,7 @@ class EF_UServiceTable(TransparentEF):
service_nr = int(srv)
(byte_offset, bit_offset) = EF_UServiceTable._bit_byte_offset_for_service(
service_nr)
if in_json[srv]['activated'] is True:
if in_json[srv]['activated'] == True:
bit = 1
else:
bit = 0
@@ -83,7 +82,7 @@ class EF_UServiceTable(TransparentEF):
def get_active_services(self, cmd):
# obtain list of currently active services
(service_data, _sw) = cmd.lchan.read_binary_dec()
(service_data, sw) = cmd.lchan.read_binary_dec()
active_services = []
for s in service_data.keys():
if service_data[s]['activated']:
@@ -122,7 +121,7 @@ class EF_UServiceTable(TransparentEF):
return num_problems
def ust_update(self, cmd, activate=[], deactivate=[]):
service_data, _sw = cmd.lchan.read_binary()
service_data, sw = cmd.lchan.read_binary()
service_data = h2b(service_data)
for service in activate:

View File

@@ -22,7 +22,6 @@ Various constants from 3GPP TS 31.103 V16.1.0
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from construct import Struct, Switch, this, Bytes, GreedyString
from pySim.filesystem import *
from pySim.utils import *
from pySim.tlv import *

View File

@@ -9,7 +9,7 @@ import string
import datetime
import argparse
from io import BytesIO
from typing import Optional, List, Dict, Any, Tuple, NewType, Union
from typing import Optional, List, Dict, Any, Tuple, NewType
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
@@ -150,12 +150,12 @@ def comprehensiontlv_parse_tag(binary: bytes) -> Tuple[dict, bytes]:
# three-byte tag
tag = (binary[1] & 0x7f) << 8
tag |= binary[2]
compr = bool(binary[1] & 0x80)
compr = True if binary[1] & 0x80 else False
return ({'comprehension': compr, 'tag': tag}, binary[3:])
else:
# single byte tag
tag = binary[0] & 0x7f
compr = bool(binary[0] & 0x80)
compr = True if binary[0] & 0x80 else False
return ({'comprehension': compr, 'tag': tag}, binary[1:])
@@ -163,7 +163,7 @@ def comprehensiontlv_encode_tag(tag) -> bytes:
"""Encode a single Tag according to ETSI TS 101 220 Section 7.1.1"""
# permit caller to specify tag also as integer value
if isinstance(tag, int):
compr = bool(tag < 0xff and tag & 0x80)
compr = True if tag < 0xff and tag & 0x80 else False
tag = {'tag': tag, 'comprehension': compr}
compr = tag.get('comprehension', False)
if tag['tag'] in [0x00, 0x80, 0xff] or tag['tag'] > 0xff:
@@ -219,7 +219,7 @@ def bertlv_parse_tag_raw(binary: bytes) -> Tuple[int, bytes]:
i = 1
last = False
while not last:
last = not bool(binary[i] & 0x80)
last = False if binary[i] & 0x80 else True
tag <<= 8
tag |= binary[i]
i += 1
@@ -234,7 +234,7 @@ def bertlv_parse_tag(binary: bytes) -> Tuple[dict, bytes]:
Tuple of ({class:int, constructed:bool, tag:int}, remainder:bytes)
"""
cls = binary[0] >> 6
constructed = bool(binary[0] & 0x20)
constructed = True if binary[0] & 0x20 else False
tag = binary[0] & 0x1f
if tag <= 30:
return ({'class': cls, 'constructed': constructed, 'tag': tag}, binary[1:])
@@ -243,7 +243,7 @@ def bertlv_parse_tag(binary: bytes) -> Tuple[dict, bytes]:
i = 1
last = False
while not last:
last = not bool(binary[i] & 0x80)
last = False if binary[i] & 0x80 else True
tag <<= 7
tag |= binary[i] & 0x7f
i += 1
@@ -276,7 +276,7 @@ def bertlv_encode_tag(t) -> bytes:
if isinstance(t, int):
# first convert to a dict representation
tag_size = count_int_bytes(t)
t, _remainder = bertlv_parse_tag(t.to_bytes(tag_size, 'big'))
t, remainder = bertlv_parse_tag(t.to_bytes(tag_size, 'big'))
tag = t['tag']
constructed = t['constructed']
cls = t['class']
@@ -446,30 +446,6 @@ def dec_iccid(ef: Hexstr) -> str:
def enc_iccid(iccid: str) -> Hexstr:
return swap_nibbles(rpad(iccid, 20))
def sanitize_iccid(iccid: Union[int, str]) -> str:
iccid = str(iccid)
if len(iccid) < 18:
raise ValueError('ICCID input value must be at least 18 digits')
if len(iccid) > 20:
raise ValueError('ICCID input value must be at most 20 digits')
if len(iccid) == 18:
# 18 digits means we must add a luhn check digit to reach 19 digits
iccid += str(calculate_luhn(iccid))
if len(iccid) == 20:
# 20 digits means we're actually exceeding E.118 by one digit, and
# the luhn check digit must already be included
verify_luhn(iccid)
if len(iccid) == 19:
# 19 digits means that it's either an in-spec 19-digits ICCID with
# its luhn check digit already present, or it's an out-of-spec 20-digit
# ICCID without that check digit...
try:
verify_luhn(iccid)
except ValueError:
# 19th digit was not luhn check digit; we must add it
iccid += str(calculate_luhn(iccid))
return iccid
def enc_plmn(mcc: Hexstr, mnc: Hexstr) -> Hexstr:
"""Converts integer MCC/MNC into 3 bytes for EF"""
@@ -562,7 +538,7 @@ def dec_act(twohexbytes: Hexstr) -> List[str]:
sel.add(a['name'])
# TS 31.102 Section 4.2.5 Table 4.2.5.1
eutran_bits = u16t & 0x7000
if eutran_bits in [0x4000, 0x7000]:
if eutran_bits == 0x4000 or eutran_bits == 0x7000:
sel.add("E-UTRAN WB-S1")
sel.add("E-UTRAN NB-S1")
elif eutran_bits == 0x5000:
@@ -571,7 +547,7 @@ def dec_act(twohexbytes: Hexstr) -> List[str]:
sel.add("E-UTRAN WB-S1")
# TS 31.102 Section 4.2.5 Table 4.2.5.2
gsm_bits = u16t & 0x008C
if gsm_bits in [0x0080, 0x008C]:
if gsm_bits == 0x0080 or gsm_bits == 0x008C:
sel.add("GSM")
sel.add("EC-GSM-IoT")
elif u16t & 0x008C == 0x0084:
@@ -630,17 +606,12 @@ def calculate_luhn(cc) -> int:
for d in num[::-2]]) % 10
return 0 if check_digit == 10 else check_digit
def verify_luhn(digits: str):
"""Verify the Luhn check digit; raises ValueError if it is incorrect."""
cd = calculate_luhn(digits[:-1])
if str(cd) != digits[-1]:
raise ValueError('Luhn check digit mismatch: should be %s but is %s' % (str(cd), digits[-1]))
def mcc_from_imsi(imsi: str) -> Optional[str]:
"""
Derive the MCC (Mobile Country Code) from the first three digits of an IMSI
"""
if imsi is None:
if imsi == None:
return None
if len(imsi) > 3:
@@ -653,7 +624,7 @@ def mnc_from_imsi(imsi: str, long: bool = False) -> Optional[str]:
"""
Derive the MNC (Mobile Country Code) from the 4th to 6th digit of an IMSI
"""
if imsi is None:
if imsi == None:
return None
if len(imsi) > 3:
@@ -759,7 +730,7 @@ def enc_msisdn(msisdn: str, npi: int = 0x01, ton: int = 0x03) -> Hexstr:
"""
# If no MSISDN is supplied then encode the file contents as all "ff"
if msisdn in ["", "+"]:
if msisdn == "" or msisdn == "+":
return "ff" * 14
# Leading '+' indicates International Number
@@ -800,7 +771,7 @@ def is_hex(string: str, minlen: int = 2, maxlen: Optional[int] = None) -> bool:
# Try actual encoding to be sure
try:
_try_encode = h2b(string)
try_encode = h2b(string)
return True
except:
return False
@@ -828,10 +799,12 @@ def sanitize_pin_adm(pin_adm, pin_adm_hex=None) -> Hexstr:
# Ensure that it's hex-encoded
try:
try_encode = h2b(pin_adm)
except ValueError as exc:
raise ValueError("PIN-ADM needs to be hex encoded using this option") from exc
except ValueError:
raise ValueError(
"PIN-ADM needs to be hex encoded using this option")
else:
raise ValueError("PIN-ADM needs to be exactly 16 digits (hex encoded)")
raise ValueError(
"PIN-ADM needs to be exactly 16 digits (hex encoded)")
return pin_adm
@@ -845,7 +818,7 @@ def get_addr_type(addr):
"""
# Empty address string
if len(addr) == 0:
if not len(addr):
return None
addr_list = addr.split('.')
@@ -860,7 +833,7 @@ def get_addr_type(addr):
return 0x01
elif ipa.version == 6:
return 0x02
except Exception:
except Exception as e:
invalid_ipv4 = True
for i in addr_list:
# Invalid IPv4 may qualify for a valid FQDN, so make check here
@@ -916,7 +889,7 @@ def tabulate_str_list(str_list, width: int = 79, hspace: int = 2, lspace: int =
Returns:
multi-line string containing formatted table
"""
if str_list is None:
if str_list == None:
return ""
if len(str_list) <= 0:
return ""
@@ -927,7 +900,7 @@ def tabulate_str_list(str_list, width: int = 79, hspace: int = 2, lspace: int =
table = []
for i in iter(range(rows)):
str_list_row = str_list[i::rows]
if align_left:
if (align_left):
format_str_cell = '%%-%ds'
else:
format_str_cell = '%%%ds'
@@ -1015,7 +988,7 @@ class JsonEncoder(json.JSONEncoder):
"""Extend the standard library JSONEncoder with support for more types."""
def default(self, o):
if isinstance(o, (BytesIO, bytes, bytearray)):
if isinstance(o, BytesIO) or isinstance(o, bytes) or isinstance(o, bytearray):
return b2h(o)
elif isinstance(o, datetime.datetime):
return o.isoformat()

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
from pySim.utils import b2h, h2b
from pySim.esim.saip import *
from pySim.esim.saip.validation import *
from pySim.pprint import HexBytesPrettyPrinter
pp = HexBytesPrettyPrinter(indent=4,width=500)
import abc
with open('smdpp-data/upp/TS48v2_SAIP2.3_NoBERTLV.der', 'rb') as f:
pes = ProfileElementSequence.from_der(f.read())
if False:
# iterate over each pe in the pes.pe_list
for pe in pes.pe_list:
print("="*70 + " " + pe.type)
pp.pprint(pe.decoded)
if False:
# sort by PE type and show all PE within that type
for pe_type in pes.pe_by_type.keys():
print("="*70 + " " + pe_type)
for pe in pes.pe_by_type[pe_type]:
pp.pprint(pe)
pp.pprint(pe.decoded)
checker = CheckBasicStructure()
checker.check(pes)
if False:
for naa in pes.pes_by_naa:
i = 0
for naa_instance in pes.pes_by_naa[naa]:
print("="*70 + " " + naa + str(i))
i += 1
for pe in naa_instance:
pp.pprint(pe.type)
for d in pe.decoded:
print(" %s" % d)
#pp.pprint(pe.decoded[d])
#if pe.type in ['akaParameter', 'pinCodes', 'pukCodes']:
# pp.pprint(pe.decoded)
from pySim.esim.saip.personalization import *
params = [Iccid('984944000000000000'), Imsi('901990123456789'),
Puk1(value='01234567'), Puk2(value='98765432'), Pin1('1111'), Pin2('2222'), Adm1('11111111'),
K(h2b('000102030405060708090a0b0c0d0e0f')), Opc(h2b('101112131415161718191a1b1c1d1e1f')),
SdKeyScp80_01Kic(h2b('000102030405060708090a0b0c0d0e0f'))]
from pySim.esim.saip.templates import *
for p in params:
p.apply(pes)
if False:
for pe in pes:
pp.pprint(pe.decoded)
pass
if True:
naas = pes.pes_by_naa.keys()
for naa in naas:
for pe in pes.pes_by_naa[naa][0]:
print(pe)
#pp.pprint(pe.decoded)
#print(pe.header)
tpl_id = pe.templateID
if tpl_id:
prof = ProfileTemplateRegistry.get_by_oid(tpl_id)
print(prof)
#pp.pprint(pe.decoded)
for fname, fdata in pe.files.items():
print()
print("============== %s" % fname)
ftempl = None
if prof:
ftempl = prof.files_by_pename[fname]
print("Template: %s" % repr(ftempl))
print("Data: %s" % fdata)
file = File(fname, fdata, ftempl)
print(repr(file))
#pp.pprint(pe.files)
if True:
# iterate over each pe in the pes (using its __iter__ method)
for pe in pes:
print("="*70 + " " + pe.type)
pp.pprint(pe.decoded)
#print(ProfileTemplateRegistry.by_oid)

View File

@@ -3,8 +3,7 @@ from setuptools import setup
setup(
name='pySim',
version='1.0',
packages=['pySim', 'pySim.legacy', 'pySim.transport', 'pySim.apdu', 'pySim.apdu_source',
'pySim.esim'],
packages=['pySim', 'pySim.legacy', 'pySim.transport', 'pySim.apdu', 'pySim.apdu_source'],
url='https://osmocom.org/projects/pysim/wiki',
license='GPLv2',
author_email='simtrace@lists.osmocom.org',

Binary file not shown.

Binary file not shown.

View File

@@ -1,9 +1,7 @@
#!/usr/bin/env python3
import unittest
from pySim.utils import b2h, h2b
from pySim.construct import *
from construct import FlagsEnum
tests = [
( b'\x80', 0x80 ),
@@ -77,26 +75,6 @@ class TestUcs2Adapter(unittest.TestCase):
re_enc = self.ad._encode(string, None, None)
self.assertEqual(encoded, re_enc)
class TestTrailerAdapter(unittest.TestCase):
Privileges = FlagsEnum(StripTrailerAdapter(GreedyBytes, 3), security_domain=0x800000,
dap_verification=0x400000,
delegated_management=0x200000, card_lock=0x100000,
card_terminate=0x080000, card_reset=0x040000,
cvm_management=0x020000, mandated_dap_verification=0x010000,
trusted_path=0x8000, authorized_management=0x4000,
token_management=0x2000, global_delete=0x1000,
global_lock=0x0800, global_registry=0x0400,
final_application=0x0200, global_service=0x0100,
receipt_generation=0x80, ciphered_load_file_data_block=0x40,
contactless_activation=0x20, contactless_self_activation=0x10)
examples = ['00', '80', '8040', '400010']
def test_examples(self):
for e in self.examples:
dec = self.Privileges.parse(h2b(e))
reenc = self.Privileges.build(dec)
self.assertEqual(e, b2h(reenc))
if __name__ == "__main__":

View File

@@ -21,7 +21,6 @@ import copy
from pySim.utils import h2b, b2h
from pySim.esim.saip import *
from pySim.esim.saip.data_source import *
from pySim.esim.saip.personalization import *
from pprint import pprint as pp
@@ -56,56 +55,13 @@ class SaipTest(unittest.TestCase):
def test_personalization(self):
"""Test some of the personalization operations."""
pes = copy.deepcopy(self.pes)
params = [Puk1('01234567'), Puk2(98765432), Pin1('1111'), Pin2(2222), Adm1('11111111'),
params = [Puk1(value=b'01234567'), Puk2(value=b'98765432'), Pin1(b'1111'), Pin2(b'2222'), Adm1(b'11111111'),
K(h2b('000102030405060708090a0b0c0d0e0f')), Opc(h2b('101112131415161718191a1b1c1d1e1f'))]
for p in params:
p.validate()
p.apply(pes)
# TODO: we don't actually test the results here, but we just verify there is no exception
pes.to_der()
class DataSourceTest(unittest.TestCase):
def test_fixed(self):
FIXED = b'\x01\x02\x03'
ds = DataSourceFixed(FIXED)
self.assertEqual(ds.generate_one(), FIXED)
self.assertEqual(ds.generate_one(), FIXED)
self.assertEqual(ds.generate_one(), FIXED)
def test_incrementing(self):
BASE_VALUE = 100
ds = DataSourceIncrementing(BASE_VALUE)
self.assertEqual(ds.generate_one(), BASE_VALUE)
self.assertEqual(ds.generate_one(), BASE_VALUE+1)
self.assertEqual(ds.generate_one(), BASE_VALUE+2)
self.assertEqual(ds.generate_one(), BASE_VALUE+3)
def test_incrementing_step3(self):
BASE_VALUE = 300
ds = DataSourceIncrementing(BASE_VALUE, step_size=3)
self.assertEqual(ds.generate_one(), BASE_VALUE)
self.assertEqual(ds.generate_one(), BASE_VALUE+3)
self.assertEqual(ds.generate_one(), BASE_VALUE+6)
def test_random(self):
ds = DataSourceRandomBytes(8)
res = []
for i in range(0,100):
res.append(ds.generate_one())
for r in res:
self.assertEqual(len(r), 8)
# ensure no duplicates exist
self.assertEqual(len(set(res)), len(res))
def test_random_int(self):
ds = DataSourceRandomUInt(below=256)
res = []
for i in range(0,100):
res.append(ds.generate_one())
for r in res:
self.assertTrue(r < 256)
if __name__ == "__main__":
unittest.main()

View File

@@ -16,16 +16,16 @@ class TestEid(unittest.TestCase):
self.assertFalse(verify_eid_checksum(89049032123451234512345678901234))
def test_eid_encode_with_32_digits(self):
self.assertEqual(compute_eid_checksum('89049032123451234512345678901200'), '89049032123451234512345678901235')
self.assertEqual(compute_eid_checksum('89086030202200000022000023022900'), '89086030202200000022000023022943')
self.assertEquals(compute_eid_checksum('89049032123451234512345678901200'), '89049032123451234512345678901235')
self.assertEquals(compute_eid_checksum('89086030202200000022000023022900'), '89086030202200000022000023022943')
def test_eid_encode_with_30digits(self):
self.assertEqual(compute_eid_checksum('890490321234512345123456789012'), '89049032123451234512345678901235')
self.assertEquals(compute_eid_checksum('890490321234512345123456789012'), '89049032123451234512345678901235')
def test_eid_encode_with_wrong_csum(self):
# input: EID with wrong checksum
self.assertEqual(compute_eid_checksum('89049032123451234512345678901299'), '89049032123451234512345678901235')
self.assertEqual(compute_eid_checksum(89049032123451234512345678901299), '89049032123451234512345678901235')
self.assertEquals(compute_eid_checksum('89049032123451234512345678901299'), '89049032123451234512345678901235')
self.assertEquals(compute_eid_checksum(89049032123451234512345678901299), '89049032123451234512345678901235')
if __name__ == "__main__":
unittest.main()

View File

@@ -71,14 +71,6 @@ class SCP03_Test:
get_eid_cmd_plain = h2b('80E2910006BF3E035C015A')
get_eid_rsp_plain = h2b('bf3e125a1089882119900000000000000000000005')
# must be overridden by derived classes
init_upd_cmd = b''
init_upd_rsp = b''
ext_auth_cmd = b''
get_eid_cmd = b''
get_eid_rsp = b''
keyset = None
@property
def host_challenge(self) -> bytes:
return self.init_upd_cmd[5:]
@@ -110,22 +102,18 @@ class SCP03_Test:
cls.scp = SCP03(card_keys = cls.keyset)
def test_01_initialize_update(self):
# pylint: disable=no-member
self.assertEqual(self.init_upd_cmd, self.scp.gen_init_update_apdu(self.host_challenge))
def test_02_parse_init_upd_resp(self):
self.scp.parse_init_update_resp(self.init_upd_rsp)
def test_03_gen_ext_auth_apdu(self):
# pylint: disable=no-member
self.assertEqual(self.ext_auth_cmd, self.scp.gen_ext_auth_apdu(self.security_level))
def test_04_wrap_cmd_apdu_get_eid(self):
# pylint: disable=no-member
self.assertEqual(self.get_eid_cmd, self.scp.wrap_cmd_apdu(self.get_eid_cmd_plain))
def test_05_unwrap_rsp_apdu_get_eid(self):
# pylint: disable=no-member
self.assertEqual(self.get_eid_rsp_plain, self.scp.unwrap_rsp_apdu(h2b('9000'), self.get_eid_rsp))
@@ -214,12 +202,6 @@ class SCP03_Test_AES256_33(SCP03_Test, unittest.TestCase):
# FIXME: test auth with random (0x60) vs pseudo-random (0x70) challenge
class SCP03_KCV_Test(unittest.TestCase):
def test_kcv(self):
self.assertEqual(compute_kcv('aes', KEYSET_AES128.enc), h2b('C35280'))
self.assertEqual(compute_kcv('aes', KEYSET_AES128.mac), h2b('013808'))
self.assertEqual(compute_kcv('aes', KEYSET_AES128.dek), h2b('840DE5'))
if __name__ == "__main__":
unittest.main()

View File

@@ -270,8 +270,8 @@ class SmsOtaTestCase(OtaTestCase):
tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=self.da, tp_pid=0x7F, tp_dcs=0xF6,
tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
#print("TPDU: %s" % tpdu)
#print("tpdu: %s" % b2h(tpdu.to_bytes()))
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'])
#print("tpdu: %s" % b2h(tpdu.toBytes()))
self.assertEqual(b2h(tpdu.toBytes()), t['request']['encoded_tpdu'])
def test_decode_resp(self):
for t in SmsOtaTestCase.testdatasets:

View File

@@ -6,7 +6,7 @@ from pySim.sms import *
class Test_SMS_UDH(unittest.TestCase):
def test_single_ie(self):
udh, tail = UserDataHeader.from_bytes('027100')
udh, tail = UserDataHeader.fromBytes('027100')
self.assertEqual(len(udh.ies), 1)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
@@ -15,7 +15,7 @@ class Test_SMS_UDH(unittest.TestCase):
self.assertEqual(tail, b'')
def test_single_ie_tail(self):
udh, tail = UserDataHeader.from_bytes('027100abcdef')
udh, tail = UserDataHeader.fromBytes('027100abcdef')
self.assertEqual(len(udh.ies), 1)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
@@ -24,7 +24,7 @@ class Test_SMS_UDH(unittest.TestCase):
self.assertEqual(tail, b'\xab\xcd\xef')
def test_single_ie_value(self):
udh, tail = UserDataHeader.from_bytes('03710110')
udh, tail = UserDataHeader.fromBytes('03710110')
self.assertEqual(len(udh.ies), 1)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
@@ -33,7 +33,7 @@ class Test_SMS_UDH(unittest.TestCase):
self.assertEqual(tail, b'')
def test_two_ie_data_tail(self):
udh, tail = UserDataHeader.from_bytes('0571007001ffabcd')
udh, tail = UserDataHeader.fromBytes('0571007001ffabcd')
self.assertEqual(len(udh.ies), 2)
ie = udh.ies[0]
self.assertEqual(ie.iei, 0x71)
@@ -45,42 +45,42 @@ class Test_SMS_UDH(unittest.TestCase):
self.assertEqual(ie.value, b'\xff')
self.assertEqual(tail, b'\xab\xcd')
def test_to_bytes(self):
def test_toBytes(self):
indata = h2b('0571007001ff')
udh, tail = UserDataHeader.from_bytes(indata)
encoded = udh.to_bytes()
udh, tail = UserDataHeader.fromBytes(indata)
encoded = udh.toBytes()
self.assertEqual(encoded, indata)
class Test_AddressField(unittest.TestCase):
def test_from_bytes(self):
def test_fromBytes(self):
encoded = h2b('0480214399')
af, trailer = AddressField.from_bytes(encoded)
af, trailer = AddressField.fromBytes(encoded)
self.assertEqual(trailer, b'\x99')
self.assertEqual(af.ton, 'unknown')
self.assertEqual(af.npi, 'unknown')
self.assertEqual(af.digits, '1234')
def test_from_bytes_odd(self):
af, trailer = AddressField.from_bytes('038021f399')
def test_fromBytes_odd(self):
af, trailer = AddressField.fromBytes('038021f399')
self.assertEqual(trailer, b'\x99')
self.assertEqual(af.ton, 'unknown')
self.assertEqual(af.npi, 'unknown')
self.assertEqual(af.digits, '123')
def test_to_bytes(self):
def test_toBytes(self):
encoded = h2b('04802143')
af, trailer = AddressField.from_bytes(encoded)
self.assertEqual(af.to_bytes(), encoded)
af, trailer = AddressField.fromBytes(encoded)
self.assertEqual(af.toBytes(), encoded)
def test_to_bytes_odd(self):
def test_toBytes_odd(self):
af = AddressField('12345', 'international', 'isdn_e164')
encoded = af.to_bytes()
encoded = af.toBytes()
self.assertEqual(encoded, h2b('05912143f5'))
class Test_SUBMIT(unittest.TestCase):
def test_from_bytes(self):
s = SMS_SUBMIT.from_bytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
def test_fromBytes(self):
s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
self.assertEqual(s.tp_mti, 1)
self.assertEqual(s.tp_rd, True)
self.assertEqual(s.tp_vpf, 'relative')
@@ -92,8 +92,8 @@ class Test_SUBMIT(unittest.TestCase):
self.assertEqual(s.tp_udl, 140)
class Test_DELIVER(unittest.TestCase):
def test_from_bytes(self):
d = SMS_DELIVER.from_bytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
def test_fromBytes(self):
d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
self.assertEqual(d.tp_mti, 0)
self.assertEqual(d.tp_mms, True)
self.assertEqual(d.tp_lp, False)

View File

@@ -17,7 +17,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from construct import Int8ub
from pySim.tlv import *
class TestUtils(unittest.TestCase):

View File

@@ -236,22 +236,5 @@ class TestDgiTlv(unittest.TestCase):
self.assertEqual(utils.dgi_parse_len(b'\xfe\x0b'), (254, b'\x0b'))
self.assertEqual(utils.dgi_parse_len(b'\xff\x00\xff\x0b'), (255, b'\x0b'))
class TestLuhn(unittest.TestCase):
def test_verify(self):
utils.verify_luhn('8988211000000530082')
def test_encode(self):
self.assertEqual(utils.calculate_luhn('898821100000053008'), 2)
def test_sanitize_iccid(self):
# 19 digits with correct luhn; we expect no change
self.assertEqual(utils.sanitize_iccid('8988211000000530082'), '8988211000000530082')
# 20 digits with correct luhn; we expect no change
self.assertEqual(utils.sanitize_iccid('89882110000005300811'), '89882110000005300811')
# 19 digits without correct luhn; we expect check digit to be added
self.assertEqual(utils.sanitize_iccid('8988211000000530081'), '89882110000005300811')
# 18 digits; we expect luhn check digit to be added
self.assertEqual(utils.sanitize_iccid('898821100000053008'), '8988211000000530082')
if __name__ == "__main__":
unittest.main()