Compare commits
41 Commits
ewild/ossl
...
daniel/ota
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3506940448 | ||
|
|
637276472d | ||
|
|
a105b55751 | ||
|
|
3f3f4e20e2 | ||
|
|
c2fb84251b | ||
|
|
61541e7502 | ||
|
|
579214c4d0 | ||
|
|
4a7651eb65 | ||
|
|
01a6724153 | ||
|
|
a6ca5b7cd1 | ||
|
|
bcca2bffc2 | ||
|
|
e80f96cc3b | ||
|
|
4550574e03 | ||
|
|
08565e8a98 | ||
|
|
fb20b7bc58 | ||
|
|
52df66cd56 | ||
|
|
784cebded4 | ||
|
|
4f75aa1c8f | ||
|
|
94811ab585 | ||
|
|
f3e6e85f99 | ||
|
|
7f2cb157c8 | ||
|
|
f94f366cf9 | ||
|
|
4429e1cc70 | ||
|
|
1ab2f8dd9d | ||
|
|
e5f39fbd34 | ||
|
|
947154639c | ||
|
|
4ee99c18cd | ||
|
|
5d2e2ee259 | ||
|
|
92841f2cd5 | ||
|
|
caa955b3ac | ||
|
|
4dddcf932a | ||
|
|
10fe0e3aae | ||
|
|
076fec267a | ||
|
|
b4a12ecc14 | ||
|
|
6cffb31b42 | ||
|
|
6aed97d6c8 | ||
|
|
cb7d5aa3a7 | ||
|
|
70fedb5a46 | ||
|
|
7798ea9c5c | ||
|
|
0b1d3c85fd | ||
|
|
3c1a59640c |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -7,9 +7,10 @@
|
|||||||
/.local
|
/.local
|
||||||
/build
|
/build
|
||||||
/pySim.egg-info
|
/pySim.egg-info
|
||||||
/smdpp-data/sm-dp-sessions
|
/smdpp-data/sm-dp-sessions*
|
||||||
dist
|
dist
|
||||||
tags
|
tags
|
||||||
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem
|
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_NIST.pem
|
||||||
|
smdpp-data/certs/DPtls/CERT_S_SM_DP_TLS_BRP.pem
|
||||||
smdpp-data/generated
|
smdpp-data/generated
|
||||||
smdpp-data/certs/dhparam2048.pem
|
smdpp-data/certs/dhparam2048.pem
|
||||||
|
|||||||
@@ -24,20 +24,12 @@ import argparse
|
|||||||
from Cryptodome.Cipher import AES
|
from Cryptodome.Cipher import AES
|
||||||
from osmocom.utils import h2b, b2h, Hexstr
|
from osmocom.utils import h2b, b2h, Hexstr
|
||||||
|
|
||||||
from pySim.card_key_provider import CardKeyProviderCsv
|
from pySim.card_key_provider import CardKeyFieldCryptor
|
||||||
|
|
||||||
def dict_keys_to_upper(d: dict) -> dict:
|
class CsvColumnEncryptor(CardKeyFieldCryptor):
|
||||||
return {k.upper():v for k,v in d.items()}
|
|
||||||
|
|
||||||
class CsvColumnEncryptor:
|
|
||||||
def __init__(self, filename: str, transport_keys: dict):
|
def __init__(self, filename: str, transport_keys: dict):
|
||||||
self.filename = filename
|
self.filename = filename
|
||||||
self.transport_keys = dict_keys_to_upper(transport_keys)
|
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||||
|
|
||||||
def encrypt_col(self, colname:str, value: str) -> Hexstr:
|
|
||||||
key = self.transport_keys[colname]
|
|
||||||
cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV)
|
|
||||||
return b2h(cipher.encrypt(h2b(value)))
|
|
||||||
|
|
||||||
def encrypt(self) -> None:
|
def encrypt(self) -> None:
|
||||||
with open(self.filename, 'r') as infile:
|
with open(self.filename, 'r') as infile:
|
||||||
@@ -49,9 +41,8 @@ class CsvColumnEncryptor:
|
|||||||
cw.writeheader()
|
cw.writeheader()
|
||||||
|
|
||||||
for row in cr:
|
for row in cr:
|
||||||
for key_colname in self.transport_keys:
|
for fieldname in cr.fieldnames:
|
||||||
if key_colname in row:
|
row[fieldname] = self.crypt.encrypt_field(fieldname, row[fieldname])
|
||||||
row[key_colname] = self.encrypt_col(key_colname, row[key_colname])
|
|
||||||
cw.writerow(row)
|
cw.writerow(row)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@@ -71,9 +62,5 @@ if __name__ == "__main__":
|
|||||||
print("You must specify at least one key!")
|
print("You must specify at least one key!")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
csv_column_keys = CardKeyProviderCsv.process_transport_keys(csv_column_keys)
|
|
||||||
for name, key in csv_column_keys.items():
|
|
||||||
print("Encrypting column %s using AES key %s" % (name, key))
|
|
||||||
|
|
||||||
cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
|
cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
|
||||||
cce.encrypt()
|
cce.encrypt()
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ ICCID_HELP='The ICCID of the eSIM that shall be made available'
|
|||||||
MATCHID_HELP='MatchingID that shall be used by profile download'
|
MATCHID_HELP='MatchingID that shall be used by profile download'
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description="""
|
parser = argparse.ArgumentParser(description="""
|
||||||
Utility to manuall issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
|
Utility to manually issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""")
|
||||||
parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint')
|
parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint')
|
||||||
parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+')
|
parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+')
|
||||||
parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server')
|
parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server')
|
||||||
@@ -63,7 +63,7 @@ if __name__ == '__main__':
|
|||||||
data = {}
|
data = {}
|
||||||
for k, v in vars(opts).items():
|
for k, v in vars(opts).items():
|
||||||
if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']:
|
if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']:
|
||||||
# remove keys from dict that shold not end up in JSON...
|
# remove keys from dict that should not end up in JSON...
|
||||||
continue
|
continue
|
||||||
if v is not None:
|
if v is not None:
|
||||||
data[k] = v
|
data[k] = v
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ parser_dl.add_argument('--confirmation-code',
|
|||||||
# notification
|
# notification
|
||||||
parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification')
|
parser_ntf = subparsers.add_parser('notification', help='ES9+ (other) notification')
|
||||||
parser_ntf.add_argument('operation', choices=['enable','disable','delete'],
|
parser_ntf.add_argument('operation', choices=['enable','disable','delete'],
|
||||||
help='Profile Management Opreation whoise occurrence shall be notififed')
|
help='Profile Management Operation whoise occurrence shall be notififed')
|
||||||
parser_ntf.add_argument('--sequence-nr', type=int, required=True,
|
parser_ntf.add_argument('--sequence-nr', type=int, required=True,
|
||||||
help='eUICC global notification sequence number')
|
help='eUICC global notification sequence number')
|
||||||
parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL')
|
parser_ntf.add_argument('--notification-address', help='notificationAddress, if different from URL')
|
||||||
@@ -123,8 +123,8 @@ class Es9pClient:
|
|||||||
'profileManagementOperation': PMO(self.opts.operation).to_bitstring(),
|
'profileManagementOperation': PMO(self.opts.operation).to_bitstring(),
|
||||||
'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc,
|
'notificationAddress': self.opts.notification_address or urlparse(self.opts.url).netloc,
|
||||||
}
|
}
|
||||||
if opts.iccid:
|
if self.opts.iccid:
|
||||||
ntf_metadata['iccid'] = h2b(swap_nibbles(opts.iccid))
|
ntf_metadata['iccid'] = h2b(swap_nibbles(self.opts.iccid))
|
||||||
|
|
||||||
if self.opts.operation == 'download':
|
if self.opts.operation == 'download':
|
||||||
pird = {
|
pird = {
|
||||||
|
|||||||
40
contrib/esim_gen_metadata.py
Executable file
40
contrib/esim_gen_metadata.py
Executable file
@@ -0,0 +1,40 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# (C) 2025 by Harald Welte <laforge@osmocom.org>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
from osmocom.utils import h2b, swap_nibbles
|
||||||
|
from pySim.esim.es8p import ProfileMetadata
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="""Utility program to generate profile metadata in the
|
||||||
|
StoreMetadataRequest format based on input values from the command line.""")
|
||||||
|
parser.add_argument('--iccid', required=True, help="ICCID of eSIM profile");
|
||||||
|
parser.add_argument('--spn', required=True, help="Service Provider Name");
|
||||||
|
parser.add_argument('--profile-name', required=True, help="eSIM Profile Name");
|
||||||
|
parser.add_argument('--profile-class', choices=['test', 'operational', 'provisioning'],
|
||||||
|
default='operational', help="Profile Class");
|
||||||
|
parser.add_argument('--outfile', required=True, help="Output File Name");
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
opts = parser.parse_args()
|
||||||
|
|
||||||
|
iccid_bin = h2b(swap_nibbles(opts.iccid))
|
||||||
|
pmd = ProfileMetadata(iccid_bin, spn=opts.spn, profile_name=opts.profile_name,
|
||||||
|
profile_class=opts.profile_class)
|
||||||
|
|
||||||
|
with open(opts.outfile, 'wb') as f:
|
||||||
|
f.write(pmd.gen_store_metadata_request())
|
||||||
|
print("Written StoreMetadataRequest to '%s'" % opts.outfile)
|
||||||
@@ -82,10 +82,6 @@ case "$JOB_TYPE" in
|
|||||||
|
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
|
|
||||||
# XXX: workaround for https://github.com/python-cmd2/cmd2/issues/1414
|
|
||||||
# 2.4.3 was the last stable release not affected by this bug (OS#6776)
|
|
||||||
pip install cmd2==2.4.3
|
|
||||||
|
|
||||||
rm -rf docs/_build
|
rm -rf docs/_build
|
||||||
make -C "docs" html latexpdf
|
make -C "docs" html latexpdf
|
||||||
|
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ parser_rpe.add_argument('--output-file', required=True, help='Output file name')
|
|||||||
parser_rpe.add_argument('--identification', default=[], type=int, action='append', help='Remove PEs matching specified identification')
|
parser_rpe.add_argument('--identification', default=[], type=int, action='append', help='Remove PEs matching specified identification')
|
||||||
parser_rpe.add_argument('--type', default=[], action='append', help='Remove PEs matching specified type')
|
parser_rpe.add_argument('--type', default=[], action='append', help='Remove PEs matching specified type')
|
||||||
|
|
||||||
parser_rn = subparsers.add_parser('remove-naa', help='Remove speciifed NAAs from PE-Sequence')
|
parser_rn = subparsers.add_parser('remove-naa', help='Remove specified NAAs from PE-Sequence')
|
||||||
parser_rn.add_argument('--output-file', required=True, help='Output file name')
|
parser_rn.add_argument('--output-file', required=True, help='Output file name')
|
||||||
parser_rn.add_argument('--naa-type', required=True, choices=NAAs.keys(), help='Network Access Application type to remove')
|
parser_rn.add_argument('--naa-type', required=True, choices=NAAs.keys(), help='Network Access Application type to remove')
|
||||||
# TODO: add an --naa-index or the like, so only one given instance can be removed
|
# TODO: add an --naa-index or the like, so only one given instance can be removed
|
||||||
|
|||||||
@@ -27,5 +27,5 @@ PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH add-app-i
|
|||||||
# Display the contents of the resulting application PE:
|
# Display the contents of the resulting application PE:
|
||||||
PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH info --apps
|
PYTHONPATH=$PYSIMPATH python3 $PYSIMPATH/contrib/saip-tool.py $OUTPATH info --apps
|
||||||
|
|
||||||
# For an explaination of --uicc-toolkit-app-spec-pars, see:
|
# For an explanation of --uicc-toolkit-app-spec-pars, see:
|
||||||
# ETSI TS 102 226, section 8.2.1.3.2.2.1
|
# ETSI TS 102 226, section 8.2.1.3.2.2.1
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
# A more useful verion of the 'unber' tool provided with asn1c:
|
# A more useful version of the 'unber' tool provided with asn1c:
|
||||||
# Give a hierarchical decode of BER/DER-encoded ASN.1 TLVs
|
# Give a hierarchical decode of BER/DER-encoded ASN.1 TLVs
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|||||||
10
docs/conf.py
10
docs/conf.py
@@ -18,9 +18,17 @@ sys.path.insert(0, os.path.abspath('..'))
|
|||||||
# -- Project information -----------------------------------------------------
|
# -- Project information -----------------------------------------------------
|
||||||
|
|
||||||
project = 'osmopysim-usermanual'
|
project = 'osmopysim-usermanual'
|
||||||
copyright = '2009-2023 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
copyright = '2009-2025 by Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||||
author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
author = 'Sylvain Munaut, Harald Welte, Philipp Maier, Supreeth Herle, Merlin Chlosta'
|
||||||
|
|
||||||
|
# PDF: Avoid that the authors list exceeds the page by inserting '\and'
|
||||||
|
# manually as line break (https://github.com/sphinx-doc/sphinx/issues/6875)
|
||||||
|
latex_elements = {
|
||||||
|
"maketitle":
|
||||||
|
r"""\author{Sylvain Munaut, Harald Welte, Philipp Maier, \and Supreeth Herle, Merlin Chlosta}
|
||||||
|
\sphinxmaketitle
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
|
||||||
# -- General configuration ---------------------------------------------------
|
# -- General configuration ---------------------------------------------------
|
||||||
|
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ Two modes are possible:
|
|||||||
Ki and OPc will be generated during each programming cycle. This means fresh keys are generated, even when the
|
Ki and OPc will be generated during each programming cycle. This means fresh keys are generated, even when the
|
||||||
``--num`` remains unchanged.
|
``--num`` remains unchanged.
|
||||||
|
|
||||||
The parameter ``--num`` specifies a card individual number. This number will be manged into the random seed so that
|
The parameter ``--num`` specifies a card individual number. This number will be managed into the random seed so that
|
||||||
it serves as an identifier for a particular set of randomly generated parameters.
|
it serves as an identifier for a particular set of randomly generated parameters.
|
||||||
|
|
||||||
In the example above the parameters ``--mcc``, and ``--mnc`` are specified as well, since they identify the GSM
|
In the example above the parameters ``--mcc``, and ``--mnc`` are specified as well, since they identify the GSM
|
||||||
@@ -77,7 +77,7 @@ the parameter ``--type``. The following card types are supported:
|
|||||||
|
|
||||||
Specifying the card reader:
|
Specifying the card reader:
|
||||||
|
|
||||||
It is most common to use ``pySim-prog`` together whith a PCSC reader. The PCSC reader number is specified via the
|
It is most common to use ``pySim-prog`` together with a PCSC reader. The PCSC reader number is specified via the
|
||||||
``--pcsc-device`` or ``-p`` option. However, other reader types (such as serial readers and modems) are supported. Use
|
``--pcsc-device`` or ``-p`` option. However, other reader types (such as serial readers and modems) are supported. Use
|
||||||
the ``--help`` option of ``pySim-prog`` for more information.
|
the ``--help`` option of ``pySim-prog`` for more information.
|
||||||
|
|
||||||
|
|||||||
@@ -21,9 +21,9 @@ osmo-smdpp currently
|
|||||||
|
|
||||||
* [by default] uses test certificates copied from GSMA SGP.26 into `./smdpp-data/certs`, assuming that your
|
* [by default] uses test certificates copied from GSMA SGP.26 into `./smdpp-data/certs`, assuming that your
|
||||||
osmo-smdpp would be running at the host name `testsmdpplus1.example.com`. You can of course replace those
|
osmo-smdpp would be running at the host name `testsmdpplus1.example.com`. You can of course replace those
|
||||||
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with mathcing eUICCs.
|
certificates with your own, whether SGP.26 derived or part of a *private root CA* setup with matching eUICCs.
|
||||||
* doesn't understand profile state. Any profile can always be downloaded any number of times, irrespective
|
* doesn't understand profile state. Any profile can always be downloaded any number of times, irrespective
|
||||||
of the EID or whether it was donwloaded before. This is actually very useful for R&D and testing, as it
|
of the EID or whether it was downloaded before. This is actually very useful for R&D and testing, as it
|
||||||
doesn't require you to generate new profiles all the time. This logic of course is unsuitable for
|
doesn't require you to generate new profiles all the time. This logic of course is unsuitable for
|
||||||
production usage.
|
production usage.
|
||||||
* doesn't perform any personalization, so the IMSI/ICCID etc. are always identical (the ones that are stored in
|
* doesn't perform any personalization, so the IMSI/ICCID etc. are always identical (the ones that are stored in
|
||||||
@@ -40,16 +40,21 @@ osmo-smdpp currently
|
|||||||
Running osmo-smdpp
|
Running osmo-smdpp
|
||||||
------------------
|
------------------
|
||||||
|
|
||||||
osmo-smdpp does not have built-in TLS support as the used *twisted* framework appears to have
|
osmo-smdpp comes with built-in TLS support which is enabled by default. However, it is always possible to
|
||||||
problems when using the example elliptic curve certificates (both NIST and Brainpool) from GSMA.
|
disable the built-in TLS support if needed.
|
||||||
|
|
||||||
|
In order to use osmo-smdpp without the built-in TLS support, it has to be put behind a TLS reverse proxy,
|
||||||
|
which terminates the ES9+ HTTPS traffic from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
|
||||||
|
|
||||||
|
NOTE: The built in TLS support in osmo-smdpp makes use of the python *twisted* framework. Older versions
|
||||||
|
of this framework appear to have problems when using the example elliptic curve certificates (both NIST and
|
||||||
|
Brainpool) from GSMA.
|
||||||
|
|
||||||
So in order to use it, you have to put it behind a TLS reverse proxy, which terminates the ES9+
|
|
||||||
HTTPS from the LPA, and then forwards it as plain HTTP to osmo-smdpp.
|
|
||||||
|
|
||||||
nginx as TLS proxy
|
nginx as TLS proxy
|
||||||
~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
If you use `nginx` as web server, you can use the following configuration snippet::
|
If you chose to use `nginx` as TLS reverse proxy, you can use the following configuration snippet::
|
||||||
|
|
||||||
upstream smdpp {
|
upstream smdpp {
|
||||||
server localhost:8000;
|
server localhost:8000;
|
||||||
@@ -92,32 +97,43 @@ The `smdpp-data/upp` directory contains the UPP (Unprotected Profile Package) us
|
|||||||
commandline options
|
commandline options
|
||||||
~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
Typically, you just run it without any arguments, and it will bind its plain-HTTP ES9+ interface to
|
Typically, you just run osmo-smdpp without any arguments, and it will bind its built-in HTTPS ES9+ interface to
|
||||||
`localhost` TCP port 8000.
|
`localhost` TCP port 443. In this case an external TLS reverse proxy is not needed.
|
||||||
|
|
||||||
osmo-smdpp currently doesn't have any configuration file.
|
osmo-smdpp currently doesn't have any configuration file.
|
||||||
|
|
||||||
There are command line options for binding:
|
There are command line options for binding:
|
||||||
|
|
||||||
Bind the HTTP ES9+ to a port other than 8000::
|
Bind the HTTPS ES9+ to a port other than 443::
|
||||||
|
|
||||||
./osmo-smdpp.py -p 8001
|
./osmo-smdpp.py -p 8443
|
||||||
|
|
||||||
|
Disable the built-in TLS support and bind the plain-HTTP ES9+ to a port 8000::
|
||||||
|
|
||||||
|
./osmo-smdpp.py -p 8000 --nossl
|
||||||
|
|
||||||
Bind the HTTP ES9+ to a different local interface::
|
Bind the HTTP ES9+ to a different local interface::
|
||||||
|
|
||||||
./osmo-smdpp.py -H 127.0.0.1
|
./osmo-smdpp.py -H 127.0.0.2
|
||||||
|
|
||||||
DNS setup for your LPA
|
DNS setup for your LPA
|
||||||
~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
The LPA must resolve `testsmdpplus1.example.com` to the IP address of your TLS proxy.
|
The LPA must resolve `testsmdpplus1.example.com` to the IP address of your TLS proxy.
|
||||||
|
|
||||||
It must also accept the TLS certificates used by your TLS proxy.
|
It must also accept the TLS certificates used by your TLS proxy. In case osmo-smdpp is used with built-in TLS support,
|
||||||
|
it will use the certificates provided in smdpp-data.
|
||||||
|
|
||||||
|
NOTE: The HTTPS ES9+ interface cannot be addressed by the LPA directly via its IP address. The reason for this is that
|
||||||
|
the included SGP.26 (DPtls) test certificates explicitly restrict the hostname to `testsmdpplus1.example.com` in the
|
||||||
|
`X509v3 Subject Alternative Name` extension. Using a bare IP address as hostname may cause the certificate to be
|
||||||
|
rejected by the LPA.
|
||||||
|
|
||||||
|
|
||||||
Supported eUICC
|
Supported eUICC
|
||||||
~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
If you run osmo-smdpp with the included SGP.26 certificates, you must use an eUICC with matching SGP.26
|
If you run osmo-smdpp with the included SGP.26 (DPauth, DPpb) certificates, you must use an eUICC with matching SGP.26
|
||||||
certificates, i.e. the EUM certificate must be signed by a SGP.26 test root CA and the eUICC certificate
|
certificates, i.e. the EUM certificate must be signed by a SGP.26 test root CA and the eUICC certificate
|
||||||
in turn must be signed by that SGP.26 EUM certificate.
|
in turn must be signed by that SGP.26 EUM certificate.
|
||||||
|
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ The response body is a JSON document, either
|
|||||||
#. key freshness failure
|
#. key freshness failure
|
||||||
#. unspecified card error
|
#. unspecified card error
|
||||||
|
|
||||||
Example (succcess):
|
Example (success):
|
||||||
::
|
::
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ In any case, in order to operate a SUCI-enabled 5G SA network, you will have to
|
|||||||
#. deploy the public key on your USIMs
|
#. deploy the public key on your USIMs
|
||||||
#. deploy the private key on your 5GC, specifically the UDM function
|
#. deploy the private key on your 5GC, specifically the UDM function
|
||||||
|
|
||||||
pysim contains (int its `contrib` directory) a small utility program that can make it easy to generate
|
pysim contains (in its `contrib` directory) a small utility program that can make it easy to generate
|
||||||
such keys: `suci-keytool.py`
|
such keys: `suci-keytool.py`
|
||||||
|
|
||||||
Generating keys
|
Generating keys
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ This guide covers the basic workflow of provisioning SIM cards with the 5G SUCI
|
|||||||
|
|
||||||
For specific information on sysmocom SIM cards, refer to
|
For specific information on sysmocom SIM cards, refer to
|
||||||
|
|
||||||
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the curent
|
* the `sysmoISIM-SJA5 User Manual <https://sysmocom.de/manuals/sysmoisim-sja5-manual.pdf>`__ for the current
|
||||||
sysmoISIM-SJA5 product
|
sysmoISIM-SJA5 product
|
||||||
* the `sysmoISIM-SJA2 User Manual <https://sysmocom.de/manuals/sysmousim-manual.pdf>`__ for the older
|
* the `sysmoISIM-SJA2 User Manual <https://sysmocom.de/manuals/sysmousim-manual.pdf>`__ for the older
|
||||||
sysmoISIM-SJA2 product
|
sysmoISIM-SJA2 product
|
||||||
|
|||||||
46
osmo-ras.py
Executable file
46
osmo-ras.py
Executable file
@@ -0,0 +1,46 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# Remote Application Server for Remote Application Management over HTTP
|
||||||
|
# See Amendment B of the GlobalPlatform Card Specification v2.2
|
||||||
|
#
|
||||||
|
# (C) 2025 sysmocom s.f.m.c.
|
||||||
|
# Author: Daniel Willmann <dwillmann@sysmocom.de>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
from http.server import HTTPServer, SimpleHTTPRequestHandler
|
||||||
|
from ssl import PROTOCOL_TLS_SERVER, SSLContext, TLSVersion
|
||||||
|
|
||||||
|
|
||||||
|
context = SSLContext(PROTOCOL_TLS_SERVER)
|
||||||
|
context.maximum_version = TLSVersion.TLSv1_2
|
||||||
|
CIPHERS_1_0 = "TLS_PSK_WITH_3DES_EDE_CBC_SHA,TLS_PSK_WITH_AES_128_CBC_SHA,TLS_PSK_WITH_NULL_SHA"
|
||||||
|
CIPHERS_1_2 = "TLS_PSK_WITH_AES_128_CBC_SHA256,TLS_PSK_WITH_NULL_SHA256"
|
||||||
|
context.set_ciphers(CIPHERS_1_2)
|
||||||
|
|
||||||
|
# A table using the identity of the client:
|
||||||
|
psk_table = { 'ClientId_1': bytes.fromhex('c0ffee'),
|
||||||
|
'ClientId_2': bytes.fromhex('facade')
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_psk(ident):
|
||||||
|
""" Get the PSK for the client """
|
||||||
|
print(f"Get PSK for {ident}")
|
||||||
|
return psk_table.get(ident, b'')
|
||||||
|
|
||||||
|
context.set_psk_server_callback(get_psk)
|
||||||
|
|
||||||
|
server = HTTPServer(("0.0.0.0", 8080), SimpleHTTPRequestHandler)
|
||||||
|
server.socket = context.wrap_socket(server.socket, server_side=True)
|
||||||
|
server.serve_forever()
|
||||||
301
osmo-smdpp.py
301
osmo-smdpp.py
@@ -17,35 +17,124 @@
|
|||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
|
# asn1tools issue https://github.com/eerimoq/asn1tools/issues/194
|
||||||
from cryptography import x509
|
# must be first here
|
||||||
from cryptography.exceptions import InvalidSignature
|
|
||||||
from cryptography.hazmat.primitives import hashes
|
|
||||||
from cryptography.hazmat.primitives.asymmetric import ec, dh
|
|
||||||
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption, ParameterFormat
|
|
||||||
from pathlib import Path
|
|
||||||
import json
|
|
||||||
import sys
|
|
||||||
import argparse
|
|
||||||
import uuid
|
|
||||||
import os
|
|
||||||
import functools
|
|
||||||
from typing import Optional, Dict, List
|
|
||||||
from pprint import pprint as pp
|
|
||||||
|
|
||||||
import base64
|
|
||||||
from base64 import b64decode
|
|
||||||
from klein import Klein
|
|
||||||
from twisted.web.iweb import IRequest
|
|
||||||
import asn1tools
|
import asn1tools
|
||||||
|
import asn1tools.codecs.ber
|
||||||
|
import asn1tools.codecs.der
|
||||||
|
# do not move the code
|
||||||
|
def fix_asn1_oid_decoding():
|
||||||
|
fix_asn1_schema = """
|
||||||
|
TestModule DEFINITIONS ::= BEGIN
|
||||||
|
TestOid ::= SEQUENCE {
|
||||||
|
oid OBJECT IDENTIFIER
|
||||||
|
}
|
||||||
|
END
|
||||||
|
"""
|
||||||
|
|
||||||
from osmocom.utils import h2b, b2h, swap_nibbles
|
fix_asn1_asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
|
||||||
|
fix_asn1_oid_string = '2.999.10'
|
||||||
|
fix_asn1_encoded = fix_asn1_asn1.encode('TestOid', {'oid': fix_asn1_oid_string})
|
||||||
|
fix_asn1_decoded = fix_asn1_asn1.decode('TestOid', fix_asn1_encoded)
|
||||||
|
|
||||||
import pySim.esim.rsp as rsp
|
if (fix_asn1_decoded['oid'] != fix_asn1_oid_string):
|
||||||
from pySim.esim import saip, PMO
|
# ASN.1 OBJECT IDENTIFIER Decoding Issue:
|
||||||
from pySim.esim.es8p import *
|
#
|
||||||
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
|
# In ASN.1 BER/DER encoding, the first two arcs of an OBJECT IDENTIFIER are
|
||||||
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
|
# combined into a single value: (40 * arc0) + arc1. This is encoded as a base-128
|
||||||
|
# variable-length quantity (and commonly known as VLQ or base-128 encoding)
|
||||||
|
# as specified in ITU-T X.690 §8.19, it can span multiple bytes if
|
||||||
|
# the value is large.
|
||||||
|
#
|
||||||
|
# For arc0 = 0 or 1, arc1 must be in [0, 39]. For arc0 = 2, arc1 can be any non-negative integer.
|
||||||
|
# All subsequent arcs (arc2, arc3, ...) are each encoded as a separate base-128 VLQ.
|
||||||
|
#
|
||||||
|
# The decoding bug occurs when the decoder does not properly split the first
|
||||||
|
# subidentifier for arc0 = 2 and arc1 >= 40. Instead of decoding:
|
||||||
|
# - arc0 = 2
|
||||||
|
# - arc1 = (first_subidentifier - 80)
|
||||||
|
# it may incorrectly interpret the first_subidentifier as arc0 = (first_subidentifier // 40),
|
||||||
|
# arc1 = (first_subidentifier % 40), which is only valid for arc1 < 40.
|
||||||
|
#
|
||||||
|
# This patch handles it properly for all valid OBJECT IDENTIFIERs
|
||||||
|
# with large second arcs, by applying the ASN.1 rules:
|
||||||
|
# - if first_subidentifier < 40: arc0 = 0, arc1 = first_subidentifier
|
||||||
|
# - elif first_subidentifier < 80: arc0 = 1, arc1 = first_subidentifier - 40
|
||||||
|
# - else: arc0 = 2, arc1 = first_subidentifier - 80
|
||||||
|
#
|
||||||
|
# This problem is not uncommon, see for example https://github.com/randombit/botan/issues/4023
|
||||||
|
|
||||||
|
def fixed_decode_object_identifier(data, offset, end_offset):
|
||||||
|
"""Decode ASN.1 OBJECT IDENTIFIER from bytes to dotted string, fixing large second arc handling."""
|
||||||
|
def read_subidentifier(data, offset):
|
||||||
|
value = 0
|
||||||
|
while True:
|
||||||
|
b = data[offset]
|
||||||
|
value = (value << 7) | (b & 0x7F)
|
||||||
|
offset += 1
|
||||||
|
if not (b & 0x80):
|
||||||
|
break
|
||||||
|
return value, offset
|
||||||
|
|
||||||
|
subid, offset = read_subidentifier(data, offset)
|
||||||
|
if subid < 40:
|
||||||
|
first = 0
|
||||||
|
second = subid
|
||||||
|
elif subid < 80:
|
||||||
|
first = 1
|
||||||
|
second = subid - 40
|
||||||
|
else:
|
||||||
|
first = 2
|
||||||
|
second = subid - 80
|
||||||
|
arcs = [first, second]
|
||||||
|
|
||||||
|
while offset < end_offset:
|
||||||
|
subid, offset = read_subidentifier(data, offset)
|
||||||
|
arcs.append(subid)
|
||||||
|
|
||||||
|
return '.'.join(str(x) for x in arcs)
|
||||||
|
|
||||||
|
asn1tools.codecs.ber.decode_object_identifier = fixed_decode_object_identifier
|
||||||
|
asn1tools.codecs.der.decode_object_identifier = fixed_decode_object_identifier
|
||||||
|
|
||||||
|
# test our patch
|
||||||
|
asn1 = asn1tools.compile_string(fix_asn1_schema, codec='der')
|
||||||
|
decoded = asn1.decode('TestOid', fix_asn1_encoded)['oid']
|
||||||
|
assert fix_asn1_oid_string == str(decoded)
|
||||||
|
|
||||||
|
fix_asn1_oid_decoding()
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature # noqa: E402
|
||||||
|
from cryptography import x509 # noqa: E402
|
||||||
|
from cryptography.exceptions import InvalidSignature # noqa: E402
|
||||||
|
from cryptography.hazmat.primitives import hashes # noqa: E402
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec, dh # noqa: E402
|
||||||
|
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption, ParameterFormat # noqa: E402
|
||||||
|
from pathlib import Path # noqa: E402
|
||||||
|
import json # noqa: E402
|
||||||
|
import sys # noqa: E402
|
||||||
|
import argparse # noqa: E402
|
||||||
|
import uuid # noqa: E402
|
||||||
|
import os # noqa: E402
|
||||||
|
import functools # noqa: E402
|
||||||
|
from typing import Optional, Dict, List # noqa: E402
|
||||||
|
from pprint import pprint as pp # noqa: E402
|
||||||
|
|
||||||
|
import base64 # noqa: E402
|
||||||
|
from base64 import b64decode # noqa: E402
|
||||||
|
from klein import Klein # noqa: E402
|
||||||
|
from twisted.web.iweb import IRequest # noqa: E402
|
||||||
|
|
||||||
|
from osmocom.utils import h2b, b2h, swap_nibbles # noqa: E402
|
||||||
|
|
||||||
|
import pySim.esim.rsp as rsp # noqa: E402
|
||||||
|
from pySim.esim import saip, PMO # noqa: E402
|
||||||
|
from pySim.esim.es8p import ProfileMetadata,UnprotectedProfilePackage,ProtectedProfilePackage,BoundProfilePackage,BspInstance # noqa: E402
|
||||||
|
from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id # noqa: E402
|
||||||
|
from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError # noqa: E402
|
||||||
|
|
||||||
|
import logging # noqa: E402
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# HACK: make this configurable
|
# HACK: make this configurable
|
||||||
DATA_DIR = './smdpp-data'
|
DATA_DIR = './smdpp-data'
|
||||||
@@ -82,18 +171,18 @@ def get_eum_certificate_variant(eum_cert) -> str:
|
|||||||
|
|
||||||
for policy in cert_policies_ext.value:
|
for policy in cert_policies_ext.value:
|
||||||
policy_oid = policy.policy_identifier.dotted_string
|
policy_oid = policy.policy_identifier.dotted_string
|
||||||
print(f"Found certificate policy: {policy_oid}")
|
logger.debug(f"Found certificate policy: {policy_oid}")
|
||||||
|
|
||||||
if policy_oid == '2.23.146.1.2.1.2':
|
if policy_oid == '2.23.146.1.2.1.2':
|
||||||
print("Detected EUM certificate variant: O (old)")
|
logger.debug("Detected EUM certificate variant: O (old)")
|
||||||
return 'O'
|
return 'O'
|
||||||
elif policy_oid == '2.23.146.1.2.1.0.0.0':
|
elif policy_oid == '2.23.146.1.2.1.0.0.0':
|
||||||
print("Detected EUM certificate variant: Ov3/A/B/C (new)")
|
logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)")
|
||||||
return 'NEW'
|
return 'NEW'
|
||||||
except x509.ExtensionNotFound:
|
except x509.ExtensionNotFound:
|
||||||
print("No Certificate Policies extension found")
|
logger.debug("No Certificate Policies extension found")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error checking certificate policies: {e}")
|
logger.debug(f"Error checking certificate policies: {e}")
|
||||||
|
|
||||||
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
||||||
"""Extract permitted IINs from EUM certificate using the appropriate method
|
"""Extract permitted IINs from EUM certificate using the appropriate method
|
||||||
@@ -106,17 +195,15 @@ def parse_permitted_eins_from_cert(eum_cert) -> List[str]:
|
|||||||
|
|
||||||
if cert_variant == 'O':
|
if cert_variant == 'O':
|
||||||
# Old variant - use nameConstraints extension
|
# Old variant - use nameConstraints extension
|
||||||
print("Using nameConstraints parsing for variant O certificate")
|
|
||||||
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
|
permitted_iins.extend(_parse_name_constraints_eins(eum_cert))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
|
# New variants (Ov3, A, B, C) - use GSMA permittedEins extension
|
||||||
print("Using GSMA permittedEins parsing for newer certificate variant")
|
|
||||||
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
|
permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert))
|
||||||
|
|
||||||
unique_iins = list(set(permitted_iins))
|
unique_iins = list(set(permitted_iins))
|
||||||
|
|
||||||
print(f"Total unique permitted IINs found: {len(unique_iins)}")
|
logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}")
|
||||||
return unique_iins
|
return unique_iins
|
||||||
|
|
||||||
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
||||||
@@ -130,15 +217,13 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
|||||||
|
|
||||||
for ext in eum_cert.extensions:
|
for ext in eum_cert.extensions:
|
||||||
if ext.oid == permitted_eins_oid:
|
if ext.oid == permitted_eins_oid:
|
||||||
print(f"Found GSMA permittedEins extension: {ext.oid}")
|
logger.debug(f"Found GSMA permittedEins extension: {ext.oid}")
|
||||||
|
|
||||||
# Get the DER-encoded extension value
|
# Get the DER-encoded extension value
|
||||||
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
|
ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value
|
||||||
|
|
||||||
if isinstance(ext_der, bytes):
|
if isinstance(ext_der, bytes):
|
||||||
try:
|
try:
|
||||||
import asn1tools
|
|
||||||
|
|
||||||
permitted_eins_schema = """
|
permitted_eins_schema = """
|
||||||
PermittedEins DEFINITIONS ::= BEGIN
|
PermittedEins DEFINITIONS ::= BEGIN
|
||||||
PermittedEins ::= SEQUENCE OF PrintableString
|
PermittedEins ::= SEQUENCE OF PrintableString
|
||||||
@@ -156,14 +241,14 @@ def _parse_gsma_permitted_eins(eum_cert) -> List[str]:
|
|||||||
all(c in '0123456789ABCDEF' for c in iin_clean) and
|
all(c in '0123456789ABCDEF' for c in iin_clean) and
|
||||||
len(iin_clean) % 2 == 0):
|
len(iin_clean) % 2 == 0):
|
||||||
permitted_iins.append(iin_clean)
|
permitted_iins.append(iin_clean)
|
||||||
print(f"Found permitted IIN (GSMA): {iin_clean}")
|
logger.debug(f"Found permitted IIN (GSMA): {iin_clean}")
|
||||||
else:
|
else:
|
||||||
print(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
|
logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error parsing GSMA permittedEins extension: {e}")
|
logger.debug(f"Error parsing GSMA permittedEins extension: {e}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error accessing GSMA certificate extensions: {e}")
|
logger.debug(f"Error accessing GSMA certificate extensions: {e}")
|
||||||
|
|
||||||
return permitted_iins
|
return permitted_iins
|
||||||
|
|
||||||
@@ -178,13 +263,11 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
|||||||
x509.oid.ExtensionOID.NAME_CONSTRAINTS
|
x509.oid.ExtensionOID.NAME_CONSTRAINTS
|
||||||
)
|
)
|
||||||
|
|
||||||
print("Found nameConstraints extension (variant O)")
|
|
||||||
name_constraints = name_constraints_ext.value
|
name_constraints = name_constraints_ext.value
|
||||||
|
|
||||||
# Check permittedSubtrees for IIN constraints
|
# Check permittedSubtrees for IIN constraints
|
||||||
if name_constraints.permitted_subtrees:
|
if name_constraints.permitted_subtrees:
|
||||||
for subtree in name_constraints.permitted_subtrees:
|
for subtree in name_constraints.permitted_subtrees:
|
||||||
print(f"Processing permitted subtree: {subtree}")
|
|
||||||
|
|
||||||
if isinstance(subtree, x509.DirectoryName):
|
if isinstance(subtree, x509.DirectoryName):
|
||||||
for attribute in subtree.value:
|
for attribute in subtree.value:
|
||||||
@@ -196,12 +279,12 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
|||||||
all(c in '0123456789ABCDEF' for c in serial_value) and
|
all(c in '0123456789ABCDEF' for c in serial_value) and
|
||||||
len(serial_value) % 2 == 0):
|
len(serial_value) % 2 == 0):
|
||||||
permitted_iins.append(serial_value)
|
permitted_iins.append(serial_value)
|
||||||
print(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
|
logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}")
|
||||||
|
|
||||||
except x509.ExtensionNotFound:
|
except x509.ExtensionNotFound:
|
||||||
print("No nameConstraints extension found")
|
logger.debug("No nameConstraints extension found")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error parsing nameConstraints: {e}")
|
logger.debug(f"Error parsing nameConstraints: {e}")
|
||||||
|
|
||||||
return permitted_iins
|
return permitted_iins
|
||||||
|
|
||||||
@@ -209,29 +292,29 @@ def _parse_name_constraints_eins(eum_cert) -> List[str]:
|
|||||||
def validate_eid_range(eid: str, eum_cert) -> bool:
|
def validate_eid_range(eid: str, eum_cert) -> bool:
|
||||||
"""Validate that EID is within the permitted EINs of the EUM certificate."""
|
"""Validate that EID is within the permitted EINs of the EUM certificate."""
|
||||||
if not eid or len(eid) != 32:
|
if not eid or len(eid) != 32:
|
||||||
print(f"Invalid EID format: {eid}")
|
logger.debug(f"Invalid EID format: {eid}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
|
permitted_eins = parse_permitted_eins_from_cert(eum_cert)
|
||||||
|
|
||||||
if not permitted_eins:
|
if not permitted_eins:
|
||||||
print("Warning: No permitted EINs found in EUM certificate")
|
logger.debug("Warning: No permitted EINs found in EUM certificate")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
eid_normalized = eid.upper()
|
eid_normalized = eid.upper()
|
||||||
print(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
|
logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs")
|
||||||
|
|
||||||
for permitted_ein in permitted_eins:
|
for permitted_ein in permitted_eins:
|
||||||
if eid_normalized.startswith(permitted_ein):
|
if eid_normalized.startswith(permitted_ein):
|
||||||
print(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
|
logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
print(f"EID {eid_normalized} is not in any permitted EIN list")
|
logger.debug(f"EID {eid_normalized} is not in any permitted EIN list")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error validating EID: {e}")
|
logger.debug(f"Error validating EID: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
|
def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
|
||||||
@@ -299,16 +382,30 @@ class SmDppHttpServer:
|
|||||||
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
|
def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
|
||||||
"""Find CI certificate for given key identifier."""
|
"""Find CI certificate for given key identifier."""
|
||||||
for cert in self.ci_certs:
|
for cert in self.ci_certs:
|
||||||
print("cert: %s" % cert)
|
logger.debug("cert: %s" % cert)
|
||||||
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
|
subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
|
||||||
print(subject_exts)
|
logger.debug(subject_exts)
|
||||||
subject_pkid = subject_exts[0].value
|
subject_pkid = subject_exts[0].value
|
||||||
print(subject_pkid)
|
logger.debug(subject_pkid)
|
||||||
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
|
if subject_pkid and subject_pkid.key_identifier == ci_pkid:
|
||||||
return cert
|
return cert
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def __init__(self, server_hostname: str, ci_certs_path: str, common_cert_path: str, use_brainpool: bool = False):
|
def validate_certificate_chain_for_verification(self, euicc_ci_pkid_list: List[bytes]) -> bool:
|
||||||
|
"""Validate that SM-DP+ has valid certificate chains for the given CI PKIDs."""
|
||||||
|
for ci_pkid in euicc_ci_pkid_list:
|
||||||
|
ci_cert = self.ci_get_cert_for_pkid(ci_pkid)
|
||||||
|
if ci_cert:
|
||||||
|
# Check if our DPauth certificate chains to this CI
|
||||||
|
try:
|
||||||
|
cs = CertificateSet(ci_cert)
|
||||||
|
cs.verify_cert_chain(self.dp_auth.cert)
|
||||||
|
return True
|
||||||
|
except VerifyError:
|
||||||
|
continue
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __init__(self, server_hostname: str, ci_certs_path: str, common_cert_path: str, use_brainpool: bool = False, in_memory: bool = False):
|
||||||
self.server_hostname = server_hostname
|
self.server_hostname = server_hostname
|
||||||
self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
|
self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
|
||||||
self.ci_certs = self.load_certs_from_path(ci_certs_path)
|
self.ci_certs = self.load_certs_from_path(ci_certs_path)
|
||||||
@@ -329,7 +426,15 @@ class SmDppHttpServer:
|
|||||||
else:
|
else:
|
||||||
self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
|
self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
|
||||||
self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
|
self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
|
||||||
self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
|
if in_memory:
|
||||||
|
self.rss = rsp.RspSessionStore(in_memory=True)
|
||||||
|
logger.info("Using in-memory session storage")
|
||||||
|
else:
|
||||||
|
# Use different session database files for BRP and NIST to avoid file locking during concurrent runs
|
||||||
|
session_db_suffix = "BRP" if use_brainpool else "NIST"
|
||||||
|
db_path = os.path.join(DATA_DIR, f"sm-dp-sessions-{session_db_suffix}")
|
||||||
|
self.rss = rsp.RspSessionStore(filename=db_path, in_memory=False)
|
||||||
|
logger.info(f"Using file-based session storage: {db_path}")
|
||||||
|
|
||||||
@app.handle_errors(ApiError)
|
@app.handle_errors(ApiError)
|
||||||
def handle_apierror(self, request: IRequest, failure):
|
def handle_apierror(self, request: IRequest, failure):
|
||||||
@@ -356,7 +461,7 @@ class SmDppHttpServer:
|
|||||||
validate_request_headers(request)
|
validate_request_headers(request)
|
||||||
|
|
||||||
content = json.loads(request.content.read())
|
content = json.loads(request.content.read())
|
||||||
print("Rx JSON: %s" % json.dumps(content))
|
logger.debug("Rx JSON: %s" % json.dumps(content))
|
||||||
set_headers(request)
|
set_headers(request)
|
||||||
|
|
||||||
output = func(self, request, content)
|
output = func(self, request, content)
|
||||||
@@ -364,7 +469,7 @@ class SmDppHttpServer:
|
|||||||
return ''
|
return ''
|
||||||
|
|
||||||
build_resp_header(output)
|
build_resp_header(output)
|
||||||
print("Tx JSON: %s" % json.dumps(output))
|
logger.debug("Tx JSON: %s" % json.dumps(output))
|
||||||
return json.dumps(output)
|
return json.dumps(output)
|
||||||
return _api_wrapper
|
return _api_wrapper
|
||||||
|
|
||||||
@@ -383,7 +488,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
euiccInfo1_bin = b64decode(content['euiccInfo1'])
|
euiccInfo1_bin = b64decode(content['euiccInfo1'])
|
||||||
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
|
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
|
||||||
print("Rx euiccInfo1: %s" % euiccInfo1)
|
logger.debug("Rx euiccInfo1: %s" % euiccInfo1)
|
||||||
#euiccInfo1['svn']
|
#euiccInfo1['svn']
|
||||||
|
|
||||||
# TODO: If euiccCiPKIdListForSigningV3 is present ...
|
# TODO: If euiccCiPKIdListForSigningV3 is present ...
|
||||||
@@ -391,6 +496,12 @@ class SmDppHttpServer:
|
|||||||
pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
|
pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
|
||||||
if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
|
if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
|
||||||
pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
|
pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
|
||||||
|
|
||||||
|
# Validate that SM-DP+ supports certificate chains for verification
|
||||||
|
verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', [])
|
||||||
|
if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list):
|
||||||
|
raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA Certificate with a Public Key supported by the eUICC')
|
||||||
|
|
||||||
# verify it supports one of the keys indicated by euiccCiPKIdListForSigning
|
# verify it supports one of the keys indicated by euiccCiPKIdListForSigning
|
||||||
ci_cert = None
|
ci_cert = None
|
||||||
for x in pkid_list:
|
for x in pkid_list:
|
||||||
@@ -405,13 +516,6 @@ class SmDppHttpServer:
|
|||||||
if not ci_cert:
|
if not ci_cert:
|
||||||
raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
|
raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
|
||||||
|
|
||||||
# TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
|
|
||||||
# * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
|
|
||||||
# supported by the eUICC (indicated by euiccCiPKIdListForVerification).
|
|
||||||
# * Using a certificate chain that the eUICC and the LPA both support:
|
|
||||||
#euiccInfo1['euiccCiPKIdListForVerification']
|
|
||||||
# raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
|
|
||||||
|
|
||||||
# Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
|
# Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
|
||||||
# SHALL be unique within the scope and lifetime of each SM-DP+.
|
# SHALL be unique within the scope and lifetime of each SM-DP+.
|
||||||
transactionId = uuid.uuid4().hex.upper()
|
transactionId = uuid.uuid4().hex.upper()
|
||||||
@@ -427,9 +531,9 @@ class SmDppHttpServer:
|
|||||||
'serverAddress': self.server_hostname,
|
'serverAddress': self.server_hostname,
|
||||||
'serverChallenge': serverChallenge,
|
'serverChallenge': serverChallenge,
|
||||||
}
|
}
|
||||||
print("Tx serverSigned1: %s" % serverSigned1)
|
logger.debug("Tx serverSigned1: %s" % serverSigned1)
|
||||||
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
|
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
|
||||||
print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
logger.debug("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
|
||||||
output = {}
|
output = {}
|
||||||
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
|
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
|
||||||
|
|
||||||
@@ -458,7 +562,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
|
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
|
||||||
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
|
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
|
||||||
print("Rx %s: %s" % authenticateServerResp)
|
logger.debug("Rx %s: %s" % authenticateServerResp)
|
||||||
if authenticateServerResp[0] == 'authenticateResponseError':
|
if authenticateServerResp[0] == 'authenticateResponseError':
|
||||||
r_err = authenticateServerResp[1]
|
r_err = authenticateServerResp[1]
|
||||||
#r_err['transactionId']
|
#r_err['transactionId']
|
||||||
@@ -510,7 +614,7 @@ class SmDppHttpServer:
|
|||||||
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
|
raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)')
|
||||||
|
|
||||||
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
|
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
|
||||||
print("EID (from eUICC cert): %s" % ss.eid)
|
logger.debug("EID (from eUICC cert): %s" % ss.eid)
|
||||||
|
|
||||||
# Verify EID is within permitted range of EUM certificate
|
# Verify EID is within permitted range of EUM certificate
|
||||||
if not validate_eid_range(ss.eid, eum_cert):
|
if not validate_eid_range(ss.eid, eum_cert):
|
||||||
@@ -525,6 +629,7 @@ class SmDppHttpServer:
|
|||||||
# If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...]
|
# If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...]
|
||||||
# TODO: We really do a very simplistic job here, this needs to be properly implemented later,
|
# TODO: We really do a very simplistic job here, this needs to be properly implemented later,
|
||||||
# considering all the various cases, profile state, etc.
|
# considering all the various cases, profile state, etc.
|
||||||
|
iccid_str = None
|
||||||
if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication':
|
if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication':
|
||||||
cpca = euiccSigned1['ctxParams1'][1]
|
cpca = euiccSigned1['ctxParams1'][1]
|
||||||
matchingId = cpca.get('matchingId', None)
|
matchingId = cpca.get('matchingId', None)
|
||||||
@@ -547,7 +652,7 @@ class SmDppHttpServer:
|
|||||||
# there's currently no other option in the ctxParams1 choice, so this cannot happen
|
# there's currently no other option in the ctxParams1 choice, so this cannot happen
|
||||||
raise ApiError('1.3.1', '2.2', 'ctxParams1 missing mandatory ctxParamsForCommonAuthentication')
|
raise ApiError('1.3.1', '2.2', 'ctxParams1 missing mandatory ctxParamsForCommonAuthentication')
|
||||||
|
|
||||||
# FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
|
# FIXME: we actually want to perform the profile binding herr, and read the profile metadata from the profile
|
||||||
|
|
||||||
# Put together profileMetadata + _bin
|
# Put together profileMetadata + _bin
|
||||||
ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId)
|
ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId)
|
||||||
@@ -589,7 +694,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
|
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
|
||||||
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
|
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
|
||||||
print("Rx %s: %s" % prepDownloadResp)
|
logger.debug("Rx %s: %s" % prepDownloadResp)
|
||||||
|
|
||||||
if prepDownloadResp[0] == 'downloadResponseError':
|
if prepDownloadResp[0] == 'downloadResponseError':
|
||||||
r_err = prepDownloadResp[1]
|
r_err = prepDownloadResp[1]
|
||||||
@@ -611,37 +716,37 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
# store otPK.EUICC.ECKA in session state
|
# store otPK.EUICC.ECKA in session state
|
||||||
ss.euicc_otpk = euiccSigned2['euiccOtpk']
|
ss.euicc_otpk = euiccSigned2['euiccOtpk']
|
||||||
print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
logger.debug("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
|
||||||
|
|
||||||
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
|
# Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
|
||||||
# Reference value of CERT.DPpb.ECDDSA
|
# Reference value of CERT.DPpb.ECDDSA
|
||||||
print("curve = %s" % self.dp_pb.get_curve())
|
logger.debug("curve = %s" % self.dp_pb.get_curve())
|
||||||
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
|
ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
|
||||||
# extract the public key in (hopefully) the right format for the ES8+ interface
|
# extract the public key in (hopefully) the right format for the ES8+ interface
|
||||||
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
||||||
print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk))
|
||||||
print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
|
||||||
|
|
||||||
ss.host_id = b'mahlzeit'
|
ss.host_id = b'mahlzeit'
|
||||||
|
|
||||||
# Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
|
# Generate Session Keys using the CRT, otPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
|
||||||
euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
|
euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
|
||||||
ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
|
ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
|
||||||
print("shared_secret: %s" % b2h(ss.shared_secret))
|
logger.debug("shared_secret: %s" % b2h(ss.shared_secret))
|
||||||
|
|
||||||
# TODO: Check if this order requires a Confirmation Code verification
|
# TODO: Check if this order requires a Confirmation Code verification
|
||||||
|
|
||||||
# Perform actual protection + binding of profile package (or return pre-bound one)
|
# Perform actual protection + binding of profile package (or return pre-bound one)
|
||||||
with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f:
|
with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f:
|
||||||
upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
|
upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
|
||||||
# HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
|
# HACK: Use empty PPP as we're still debugging the configureISDP step, and we want to avoid
|
||||||
# cluttering the log with stuff happening after the failure
|
# cluttering the log with stuff happening after the failure
|
||||||
#upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
|
#upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
|
||||||
if False:
|
if False:
|
||||||
# Use random keys
|
# Use random keys
|
||||||
bpp = BoundProfilePackage.from_upp(upp)
|
bpp = BoundProfilePackage.from_upp(upp)
|
||||||
else:
|
else:
|
||||||
# Use sesssion keys
|
# Use session keys
|
||||||
ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
|
ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
|
||||||
bpp = BoundProfilePackage.from_ppp(ppp)
|
bpp = BoundProfilePackage.from_ppp(ppp)
|
||||||
|
|
||||||
@@ -661,22 +766,22 @@ class SmDppHttpServer:
|
|||||||
request.setResponseCode(204)
|
request.setResponseCode(204)
|
||||||
pendingNotification_bin = b64decode(content['pendingNotification'])
|
pendingNotification_bin = b64decode(content['pendingNotification'])
|
||||||
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
|
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
|
||||||
print("Rx %s: %s" % pendingNotification)
|
logger.debug("Rx %s: %s" % pendingNotification)
|
||||||
if pendingNotification[0] == 'profileInstallationResult':
|
if pendingNotification[0] == 'profileInstallationResult':
|
||||||
profileInstallRes = pendingNotification[1]
|
profileInstallRes = pendingNotification[1]
|
||||||
pird = profileInstallRes['profileInstallationResultData']
|
pird = profileInstallRes['profileInstallationResultData']
|
||||||
transactionId = b2h(pird['transactionId'])
|
transactionId = b2h(pird['transactionId'])
|
||||||
ss = self.rss.get(transactionId, None)
|
ss = self.rss.get(transactionId, None)
|
||||||
if ss is None:
|
if ss is None:
|
||||||
print("Unable to find session for transactionId")
|
logger.warning(f"Unable to find session for transactionId: {transactionId}")
|
||||||
return
|
return None # Will return HTTP 204 with empty body
|
||||||
profileInstallRes['euiccSignPIR']
|
profileInstallRes['euiccSignPIR']
|
||||||
# TODO: use original data, don't re-encode?
|
# TODO: use original data, don't re-encode?
|
||||||
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
|
||||||
# verify eUICC signature
|
# verify eUICC signature
|
||||||
if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
|
if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
|
||||||
raise Exception('ECDSA signature verification failed on notification')
|
raise Exception('ECDSA signature verification failed on notification')
|
||||||
print("Profile Installation Final Result: ", pird['finalResult'])
|
logger.debug("Profile Installation Final Result: %s", pird['finalResult'])
|
||||||
# remove session state
|
# remove session state
|
||||||
del self.rss[transactionId]
|
del self.rss[transactionId]
|
||||||
elif pendingNotification[0] == 'otherSignedNotification':
|
elif pendingNotification[0] == 'otherSignedNotification':
|
||||||
@@ -701,7 +806,7 @@ class SmDppHttpServer:
|
|||||||
iccid = other_notif.get('iccid', None)
|
iccid = other_notif.get('iccid', None)
|
||||||
if iccid:
|
if iccid:
|
||||||
iccid = swap_nibbles(b2h(iccid))
|
iccid = swap_nibbles(b2h(iccid))
|
||||||
print("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
logger.debug("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid))
|
||||||
else:
|
else:
|
||||||
raise ValueError(pendingNotification)
|
raise ValueError(pendingNotification)
|
||||||
|
|
||||||
@@ -714,7 +819,7 @@ class SmDppHttpServer:
|
|||||||
@rsp_api_wrapper
|
@rsp_api_wrapper
|
||||||
def cancelSession(self, request: IRequest, content: dict) -> dict:
|
def cancelSession(self, request: IRequest, content: dict) -> dict:
|
||||||
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
|
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
|
||||||
print("Rx JSON: %s" % content)
|
logger.debug("Rx JSON: %s" % content)
|
||||||
transactionId = content['transactionId']
|
transactionId = content['transactionId']
|
||||||
|
|
||||||
# Verify that the received transactionId is known and relates to an ongoing RSP session
|
# Verify that the received transactionId is known and relates to an ongoing RSP session
|
||||||
@@ -724,7 +829,7 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
|
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
|
||||||
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
|
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
|
||||||
print("Rx %s: %s" % cancelSessionResponse)
|
logger.debug("Rx %s: %s" % cancelSessionResponse)
|
||||||
|
|
||||||
if cancelSessionResponse[0] == 'cancelSessionResponseError':
|
if cancelSessionResponse[0] == 'cancelSessionResponseError':
|
||||||
# FIXME: print some error
|
# FIXME: print some error
|
||||||
@@ -756,20 +861,28 @@ class SmDppHttpServer:
|
|||||||
|
|
||||||
def main(argv):
|
def main(argv):
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
|
parser.add_argument("-H", "--host", help="Host/IP to bind HTTP(S) to", default="localhost")
|
||||||
parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
|
parser.add_argument("-p", "--port", help="TCP port to bind HTTP(S) to", default=443)
|
||||||
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
parser.add_argument("-c", "--certdir", help=f"cert subdir relative to {DATA_DIR}", default="certs")
|
||||||
parser.add_argument("-s", "--nossl", help="do NOT use ssl", action='store_true', default=False)
|
parser.add_argument("-s", "--nossl", help="disable built in SSL/TLS support", action='store_true', default=False)
|
||||||
|
parser.add_argument("-v", "--verbose", help="dump more raw info", action='store_true', default=False)
|
||||||
|
parser.add_argument("-b", "--brainpool", help="Use Brainpool curves instead of NIST",
|
||||||
|
action='store_true', default=False)
|
||||||
|
parser.add_argument("-m", "--in-memory", help="Use ephermal in-memory session storage (for concurrent runs)",
|
||||||
|
action='store_true', default=False)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
|
||||||
|
|
||||||
common_cert_path = os.path.join(DATA_DIR, args.certdir)
|
common_cert_path = os.path.join(DATA_DIR, args.certdir)
|
||||||
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=False)
|
hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool)
|
||||||
if(args.nossl):
|
if(args.nossl):
|
||||||
hs.app.run(args.host, args.port)
|
hs.app.run(args.host, args.port)
|
||||||
else:
|
else:
|
||||||
cert_derpath = Path(common_cert_path) / 'DPtls' / 'CERT_S_SM_DP_TLS_NIST.der'
|
curve_type = 'BRP' if args.brainpool else 'NIST'
|
||||||
cert_pempath = Path(common_cert_path) / 'DPtls' / 'CERT_S_SM_DP_TLS_NIST.pem'
|
cert_derpath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.der'
|
||||||
cert_skpath = Path(common_cert_path) / 'DPtls' / 'SK_S_SM_DP_TLS_NIST.pem'
|
cert_pempath = Path(common_cert_path) / 'DPtls' / f'CERT_S_SM_DP_TLS_{curve_type}.pem'
|
||||||
|
cert_skpath = Path(common_cert_path) / 'DPtls' / f'SK_S_SM_DP_TLS_{curve_type}.pem'
|
||||||
dhparam_path = Path(common_cert_path) / "dhparam2048.pem"
|
dhparam_path = Path(common_cert_path) / "dhparam2048.pem"
|
||||||
if not dhparam_path.exists():
|
if not dhparam_path.exists():
|
||||||
print("Generating dh params, this takes a few seconds..")
|
print("Generating dh params, this takes a few seconds..")
|
||||||
|
|||||||
@@ -586,7 +586,7 @@ def read_params_csv(opts, imsi=None, iccid=None):
|
|||||||
else:
|
else:
|
||||||
row['mnc'] = row.get('mnc', mnc_from_imsi(row.get('imsi'), False))
|
row['mnc'] = row.get('mnc', mnc_from_imsi(row.get('imsi'), False))
|
||||||
|
|
||||||
# NOTE: We might concider to specify a new CSV field "mnclen" in our
|
# NOTE: We might consider to specify a new CSV field "mnclen" in our
|
||||||
# CSV files for a better automatization. However, this only makes sense
|
# CSV files for a better automatization. However, this only makes sense
|
||||||
# when the tools and databases we export our files from will also add
|
# when the tools and databases we export our files from will also add
|
||||||
# such a field.
|
# such a field.
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
#
|
#
|
||||||
# Utility to display some informations about a SIM card
|
# Utility to display some information about a SIM card
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
# Copyright (C) 2009 Sylvain Munaut <tnt@246tNt.com>
|
# Copyright (C) 2009 Sylvain Munaut <tnt@246tNt.com>
|
||||||
|
|||||||
159
pySim-shell.py
159
pySim-shell.py
@@ -22,19 +22,25 @@ from typing import List, Optional
|
|||||||
import json
|
import json
|
||||||
import traceback
|
import traceback
|
||||||
import re
|
import re
|
||||||
|
|
||||||
import cmd2
|
import cmd2
|
||||||
from packaging import version
|
from packaging import version
|
||||||
from cmd2 import style
|
from cmd2 import style
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
from osmocom.utils import auto_uint8
|
||||||
|
|
||||||
# cmd2 >= 2.3.0 has deprecated the bg/fg in favor of Bg/Fg :(
|
# cmd2 >= 2.3.0 has deprecated the bg/fg in favor of Bg/Fg :(
|
||||||
if version.parse(cmd2.__version__) < version.parse("2.3.0"):
|
if version.parse(cmd2.__version__) < version.parse("2.3.0"):
|
||||||
from cmd2 import fg, bg # pylint: disable=no-name-in-module
|
from cmd2 import fg, bg # pylint: disable=no-name-in-module
|
||||||
RED = fg.red
|
RED = fg.red
|
||||||
|
YELLOW = fg.yellow
|
||||||
LIGHT_RED = fg.bright_red
|
LIGHT_RED = fg.bright_red
|
||||||
LIGHT_GREEN = fg.bright_green
|
LIGHT_GREEN = fg.bright_green
|
||||||
else:
|
else:
|
||||||
from cmd2 import Fg, Bg # pylint: disable=no-name-in-module
|
from cmd2 import Fg, Bg # pylint: disable=no-name-in-module
|
||||||
RED = Fg.RED
|
RED = Fg.RED
|
||||||
|
YELLOW = Fg.YELLOW
|
||||||
LIGHT_RED = Fg.LIGHT_RED
|
LIGHT_RED = Fg.LIGHT_RED
|
||||||
LIGHT_GREEN = Fg.LIGHT_GREEN
|
LIGHT_GREEN = Fg.LIGHT_GREEN
|
||||||
from cmd2 import CommandSet, with_default_category, with_argparser
|
from cmd2 import CommandSet, with_default_category, with_argparser
|
||||||
@@ -63,10 +69,12 @@ from pySim.ts_102_222 import Ts102222Commands
|
|||||||
from pySim.gsm_r import DF_EIRENE
|
from pySim.gsm_r import DF_EIRENE
|
||||||
from pySim.cat import ProactiveCommand
|
from pySim.cat import ProactiveCommand
|
||||||
|
|
||||||
from pySim.card_key_provider import CardKeyProviderCsv, card_key_provider_register, card_key_provider_get_field
|
from pySim.card_key_provider import CardKeyProviderCsv
|
||||||
|
from pySim.card_key_provider import card_key_provider_register, card_key_provider_get_field, card_key_provider_get
|
||||||
|
|
||||||
from pySim.app import init_card
|
from pySim.app import init_card
|
||||||
|
|
||||||
|
log = PySimLogger.get("main")
|
||||||
|
|
||||||
class Cmd2Compat(cmd2.Cmd):
|
class Cmd2Compat(cmd2.Cmd):
|
||||||
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
|
"""Backwards-compatibility wrapper around cmd2.Cmd to support older and newer
|
||||||
@@ -92,15 +100,19 @@ class PysimApp(Cmd2Compat):
|
|||||||
(C) 2021-2023 by Harald Welte, sysmocom - s.f.m.c. GmbH and contributors
|
(C) 2021-2023 by Harald Welte, sysmocom - s.f.m.c. GmbH and contributors
|
||||||
Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/shell.html """
|
Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/shell.html """
|
||||||
|
|
||||||
def __init__(self, card, rs, sl, ch, script=None):
|
def __init__(self, verbose, card, rs, sl, ch, script=None):
|
||||||
if version.parse(cmd2.__version__) < version.parse("2.0.0"):
|
if version.parse(cmd2.__version__) < version.parse("2.0.0"):
|
||||||
kwargs = {'use_ipython': True}
|
kwargs = {'use_ipython': True}
|
||||||
else:
|
else:
|
||||||
kwargs = {'include_ipy': True}
|
kwargs = {'include_ipy': True}
|
||||||
|
|
||||||
|
self.verbose = verbose
|
||||||
|
self._onchange_verbose('verbose', False, self.verbose);
|
||||||
|
|
||||||
# pylint: disable=unexpected-keyword-arg
|
# pylint: disable=unexpected-keyword-arg
|
||||||
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
|
super().__init__(persistent_history_file='~/.pysim_shell_history', allow_cli_args=False,
|
||||||
auto_load_commands=False, startup_script=script, **kwargs)
|
auto_load_commands=False, startup_script=script, **kwargs)
|
||||||
|
PySimLogger.setup(self.poutput, {logging.WARN: YELLOW})
|
||||||
self.intro = style(self.BANNER, fg=RED)
|
self.intro = style(self.BANNER, fg=RED)
|
||||||
self.default_category = 'pySim-shell built-in commands'
|
self.default_category = 'pySim-shell built-in commands'
|
||||||
self.card = None
|
self.card = None
|
||||||
@@ -126,6 +138,9 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
|||||||
self.add_settable(Settable2Compat('apdu_strict', bool,
|
self.add_settable(Settable2Compat('apdu_strict', bool,
|
||||||
'Enforce APDU responses according to ISO/IEC 7816-3, table 12', self,
|
'Enforce APDU responses according to ISO/IEC 7816-3, table 12', self,
|
||||||
onchange_cb=self._onchange_apdu_strict))
|
onchange_cb=self._onchange_apdu_strict))
|
||||||
|
self.add_settable(Settable2Compat('verbose', bool,
|
||||||
|
'Enable/disable verbose logging', self,
|
||||||
|
onchange_cb=self._onchange_verbose))
|
||||||
self.equip(card, rs)
|
self.equip(card, rs)
|
||||||
|
|
||||||
def equip(self, card, rs):
|
def equip(self, card, rs):
|
||||||
@@ -210,6 +225,13 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
|||||||
else:
|
else:
|
||||||
self.card._scc._tp.apdu_strict = False
|
self.card._scc._tp.apdu_strict = False
|
||||||
|
|
||||||
|
def _onchange_verbose(self, param_name, old, new):
|
||||||
|
PySimLogger.set_verbose(new)
|
||||||
|
if new == True:
|
||||||
|
PySimLogger.set_level(logging.DEBUG)
|
||||||
|
else:
|
||||||
|
PySimLogger.set_level(logging.INFO)
|
||||||
|
|
||||||
class Cmd2ApduTracer(ApduTracer):
|
class Cmd2ApduTracer(ApduTracer):
|
||||||
def __init__(self, cmd2_app):
|
def __init__(self, cmd2_app):
|
||||||
self.cmd2 = cmd2_app
|
self.cmd2 = cmd2_app
|
||||||
@@ -265,7 +287,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
|||||||
def do_apdu(self, opts):
|
def do_apdu(self, opts):
|
||||||
"""Send a raw APDU to the card, and print SW + Response.
|
"""Send a raw APDU to the card, and print SW + Response.
|
||||||
CAUTION: this command bypasses the logical channel handling of pySim-shell and card state changes are not
|
CAUTION: this command bypasses the logical channel handling of pySim-shell and card state changes are not
|
||||||
tracked. Dpending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
|
tracked. Depending on the raw APDU sent, pySim-shell may not continue to work as expected if you e.g. select
|
||||||
a different file."""
|
a different file."""
|
||||||
|
|
||||||
# When sending raw APDUs we access the scc object through _scc member of the card object. It should also be
|
# When sending raw APDUs we access the scc object through _scc member of the card object. It should also be
|
||||||
@@ -336,7 +358,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
|||||||
|
|
||||||
def _process_card(self, first, script_path):
|
def _process_card(self, first, script_path):
|
||||||
|
|
||||||
# Early phase of card initialzation (this part may fail with an exception)
|
# Early phase of card initialization (this part may fail with an exception)
|
||||||
try:
|
try:
|
||||||
rs, card = init_card(self.sl)
|
rs, card = init_card(self.sl)
|
||||||
rc = self.equip(card, rs)
|
rc = self.equip(card, rs)
|
||||||
@@ -377,7 +399,7 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
|||||||
|
|
||||||
bulk_script_parser = argparse.ArgumentParser()
|
bulk_script_parser = argparse.ArgumentParser()
|
||||||
bulk_script_parser.add_argument('SCRIPT_PATH', help="path to the script file")
|
bulk_script_parser.add_argument('SCRIPT_PATH', help="path to the script file")
|
||||||
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exeption occurs',
|
bulk_script_parser.add_argument('--halt_on_error', help='stop card handling if an exception occurs',
|
||||||
action='store_true')
|
action='store_true')
|
||||||
bulk_script_parser.add_argument('--tries', type=int, default=2,
|
bulk_script_parser.add_argument('--tries', type=int, default=2,
|
||||||
help='how many tries before trying the next card')
|
help='how many tries before trying the next card')
|
||||||
@@ -477,6 +499,23 @@ Online manual available at https://downloads.osmocom.org/docs/pysim/master/html/
|
|||||||
"""Echo (print) a string on the console"""
|
"""Echo (print) a string on the console"""
|
||||||
self.poutput(' '.join(opts.STRING))
|
self.poutput(' '.join(opts.STRING))
|
||||||
|
|
||||||
|
query_card_key_parser = argparse.ArgumentParser()
|
||||||
|
query_card_key_parser.add_argument('FIELDS', help="fields to query", type=str, nargs='+')
|
||||||
|
query_card_key_parser.add_argument('--key', help='lookup key (typically \'ICCID\' or \'EID\')',
|
||||||
|
type=str, required=True)
|
||||||
|
query_card_key_parser.add_argument('--value', help='lookup key match value (e.g \'8988211000000123456\')',
|
||||||
|
type=str, required=True)
|
||||||
|
@cmd2.with_argparser(query_card_key_parser)
|
||||||
|
@cmd2.with_category(CUSTOM_CATEGORY)
|
||||||
|
def do_query_card_key(self, opts):
|
||||||
|
"""Manually query the Card Key Provider"""
|
||||||
|
result = card_key_provider_get(opts.FIELDS, opts.key, opts.value)
|
||||||
|
self.poutput("Result:")
|
||||||
|
if result == {}:
|
||||||
|
self.poutput(" (none)")
|
||||||
|
for k in result.keys():
|
||||||
|
self.poutput(" %s: %s" % (str(k), str(result.get(k))))
|
||||||
|
|
||||||
@cmd2.with_category(CUSTOM_CATEGORY)
|
@cmd2.with_category(CUSTOM_CATEGORY)
|
||||||
def do_version(self, opts):
|
def do_version(self, opts):
|
||||||
"""Print the pySim software version."""
|
"""Print the pySim software version."""
|
||||||
@@ -731,7 +770,7 @@ class PySimCommands(CommandSet):
|
|||||||
body = {}
|
body = {}
|
||||||
for t in tags:
|
for t in tags:
|
||||||
result = self._cmd.lchan.retrieve_data(t)
|
result = self._cmd.lchan.retrieve_data(t)
|
||||||
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
|
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
|
||||||
body[t] = b2h(val)
|
body[t] = b2h(val)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename))
|
raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename))
|
||||||
@@ -915,36 +954,53 @@ class Iso7816Commands(CommandSet):
|
|||||||
raise RuntimeError("cannot find %s for ICCID '%s'" % (field, iccid))
|
raise RuntimeError("cannot find %s for ICCID '%s'" % (field, iccid))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __select_pin_nr(pin_type:str, pin_nr:int) -> int:
|
||||||
|
if pin_type:
|
||||||
|
# pylint: disable=unsubscriptable-object
|
||||||
|
return pin_names.inverse[pin_type]
|
||||||
|
return pin_nr
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __add_pin_nr_to_ArgumentParser(chv_parser):
|
||||||
|
group = chv_parser.add_mutually_exclusive_group()
|
||||||
|
group.add_argument('--pin-type',
|
||||||
|
choices=[x for x in pin_names.values()
|
||||||
|
if (x.startswith('PIN') or x.startswith('2PIN'))],
|
||||||
|
help='Specifiy pin type (default is PIN1)')
|
||||||
|
group.add_argument('--pin-nr', type=auto_uint8, default=0x01,
|
||||||
|
help='PIN Number, 1=PIN1, 0x81=2PIN1 or custom value (see also TS 102 221, Table 9.3")')
|
||||||
|
|
||||||
verify_chv_parser = argparse.ArgumentParser()
|
verify_chv_parser = argparse.ArgumentParser()
|
||||||
verify_chv_parser.add_argument(
|
|
||||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
|
||||||
verify_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
verify_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||||
help='PIN code value. If none given, CSV file will be queried')
|
help='PIN code value. If none given, CSV file will be queried')
|
||||||
|
__add_pin_nr_to_ArgumentParser(verify_chv_parser)
|
||||||
|
|
||||||
@cmd2.with_argparser(verify_chv_parser)
|
@cmd2.with_argparser(verify_chv_parser)
|
||||||
def do_verify_chv(self, opts):
|
def do_verify_chv(self, opts):
|
||||||
"""Verify (authenticate) using specified CHV (PIN) code, which is how the specifications
|
"""Verify (authenticate) using specified CHV (PIN) code, which is how the specifications
|
||||||
call it if you authenticate yourself using the specified PIN. There usually is at least PIN1 and
|
call it if you authenticate yourself using the specified PIN. There usually is at least PIN1 and
|
||||||
PIN2."""
|
2PIN1 (see also TS 102 221 Section 9.5.1 / Table 9.3)."""
|
||||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||||
(data, sw) = self._cmd.lchan.scc.verify_chv(opts.pin_nr, h2b(pin))
|
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||||
|
(data, sw) = self._cmd.lchan.scc.verify_chv(pin_nr, h2b(pin))
|
||||||
self._cmd.poutput("CHV verification successful")
|
self._cmd.poutput("CHV verification successful")
|
||||||
|
|
||||||
unblock_chv_parser = argparse.ArgumentParser()
|
unblock_chv_parser = argparse.ArgumentParser()
|
||||||
unblock_chv_parser.add_argument(
|
|
||||||
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
|
||||||
unblock_chv_parser.add_argument('PUK', nargs='?', type=is_decimal,
|
unblock_chv_parser.add_argument('PUK', nargs='?', type=is_decimal,
|
||||||
help='PUK code value. If none given, CSV file will be queried')
|
help='PUK code value. If none given, CSV file will be queried')
|
||||||
unblock_chv_parser.add_argument('NEWPIN', nargs='?', type=is_decimal,
|
unblock_chv_parser.add_argument('NEWPIN', nargs='?', type=is_decimal,
|
||||||
help='PIN code value. If none given, CSV file will be queried')
|
help='PIN code value. If none given, CSV file will be queried')
|
||||||
|
__add_pin_nr_to_ArgumentParser(unblock_chv_parser)
|
||||||
|
|
||||||
@cmd2.with_argparser(unblock_chv_parser)
|
@cmd2.with_argparser(unblock_chv_parser)
|
||||||
def do_unblock_chv(self, opts):
|
def do_unblock_chv(self, opts):
|
||||||
"""Unblock PIN code using specified PUK code"""
|
"""Unblock PIN code using specified PUK code"""
|
||||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
|
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||||
puk = self.get_code(opts.PUK, "PUK" + str(opts.pin_nr))
|
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
|
||||||
|
puk = self.get_code(opts.PUK, "PUK" + str(pin_nr))
|
||||||
(data, sw) = self._cmd.lchan.scc.unblock_chv(
|
(data, sw) = self._cmd.lchan.scc.unblock_chv(
|
||||||
opts.pin_nr, h2b(puk), h2b(new_pin))
|
pin_nr, h2b(puk), h2b(new_pin))
|
||||||
self._cmd.poutput("CHV unblock successful")
|
self._cmd.poutput("CHV unblock successful")
|
||||||
|
|
||||||
change_chv_parser = argparse.ArgumentParser()
|
change_chv_parser = argparse.ArgumentParser()
|
||||||
@@ -952,42 +1008,42 @@ class Iso7816Commands(CommandSet):
|
|||||||
help='PIN code value. If none given, CSV file will be queried')
|
help='PIN code value. If none given, CSV file will be queried')
|
||||||
change_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
change_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||||
help='PIN code value. If none given, CSV file will be queried')
|
help='PIN code value. If none given, CSV file will be queried')
|
||||||
change_chv_parser.add_argument(
|
__add_pin_nr_to_ArgumentParser(change_chv_parser)
|
||||||
'--pin-nr', type=int, default=1, help='PUK Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
|
||||||
|
|
||||||
@cmd2.with_argparser(change_chv_parser)
|
@cmd2.with_argparser(change_chv_parser)
|
||||||
def do_change_chv(self, opts):
|
def do_change_chv(self, opts):
|
||||||
"""Change PIN code to a new PIN code"""
|
"""Change PIN code to a new PIN code"""
|
||||||
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(opts.pin_nr))
|
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
new_pin = self.get_code(opts.NEWPIN, "PIN" + str(pin_nr))
|
||||||
|
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||||
(data, sw) = self._cmd.lchan.scc.change_chv(
|
(data, sw) = self._cmd.lchan.scc.change_chv(
|
||||||
opts.pin_nr, h2b(pin), h2b(new_pin))
|
pin_nr, h2b(pin), h2b(new_pin))
|
||||||
self._cmd.poutput("CHV change successful")
|
self._cmd.poutput("CHV change successful")
|
||||||
|
|
||||||
disable_chv_parser = argparse.ArgumentParser()
|
disable_chv_parser = argparse.ArgumentParser()
|
||||||
disable_chv_parser.add_argument(
|
|
||||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
|
||||||
disable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
disable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||||
help='PIN code value. If none given, CSV file will be queried')
|
help='PIN code value. If none given, CSV file will be queried')
|
||||||
|
__add_pin_nr_to_ArgumentParser(disable_chv_parser)
|
||||||
|
|
||||||
@cmd2.with_argparser(disable_chv_parser)
|
@cmd2.with_argparser(disable_chv_parser)
|
||||||
def do_disable_chv(self, opts):
|
def do_disable_chv(self, opts):
|
||||||
"""Disable PIN code using specified PIN code"""
|
"""Disable PIN code using specified PIN code"""
|
||||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||||
(data, sw) = self._cmd.lchan.scc.disable_chv(opts.pin_nr, h2b(pin))
|
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||||
|
(data, sw) = self._cmd.lchan.scc.disable_chv(pin_nr, h2b(pin))
|
||||||
self._cmd.poutput("CHV disable successful")
|
self._cmd.poutput("CHV disable successful")
|
||||||
|
|
||||||
enable_chv_parser = argparse.ArgumentParser()
|
enable_chv_parser = argparse.ArgumentParser()
|
||||||
enable_chv_parser.add_argument(
|
__add_pin_nr_to_ArgumentParser(enable_chv_parser)
|
||||||
'--pin-nr', type=int, default=1, help='PIN Number, 1=PIN1, 2=PIN2 or custom value (decimal)')
|
|
||||||
enable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
enable_chv_parser.add_argument('PIN', nargs='?', type=is_decimal,
|
||||||
help='PIN code value. If none given, CSV file will be queried')
|
help='PIN code value. If none given, CSV file will be queried')
|
||||||
|
|
||||||
@cmd2.with_argparser(enable_chv_parser)
|
@cmd2.with_argparser(enable_chv_parser)
|
||||||
def do_enable_chv(self, opts):
|
def do_enable_chv(self, opts):
|
||||||
"""Enable PIN code using specified PIN code"""
|
"""Enable PIN code using specified PIN code"""
|
||||||
pin = self.get_code(opts.PIN, "PIN" + str(opts.pin_nr))
|
pin_nr = self.__select_pin_nr(opts.pin_type, opts.pin_nr)
|
||||||
(data, sw) = self._cmd.lchan.scc.enable_chv(opts.pin_nr, h2b(pin))
|
pin = self.get_code(opts.PIN, "PIN" + str(pin_nr))
|
||||||
|
(data, sw) = self._cmd.lchan.scc.enable_chv(pin_nr, h2b(pin))
|
||||||
self._cmd.poutput("CHV enable successful")
|
self._cmd.poutput("CHV enable successful")
|
||||||
|
|
||||||
def do_deactivate_file(self, opts):
|
def do_deactivate_file(self, opts):
|
||||||
@@ -1071,16 +1127,23 @@ argparse_add_reader_args(option_parser)
|
|||||||
global_group = option_parser.add_argument_group('General Options')
|
global_group = option_parser.add_argument_group('General Options')
|
||||||
global_group.add_argument('--script', metavar='PATH', default=None,
|
global_group.add_argument('--script', metavar='PATH', default=None,
|
||||||
help='script with pySim-shell commands to be executed automatically at start-up')
|
help='script with pySim-shell commands to be executed automatically at start-up')
|
||||||
global_group.add_argument('--csv', metavar='FILE',
|
|
||||||
default=None, help='Read card data from CSV file')
|
|
||||||
global_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
|
||||||
help='per-CSV-column AES transport key')
|
|
||||||
global_group.add_argument("--card_handler", dest="card_handler_config", metavar="FILE",
|
global_group.add_argument("--card_handler", dest="card_handler_config", metavar="FILE",
|
||||||
help="Use automatic card handling machine")
|
help="Use automatic card handling machine")
|
||||||
global_group.add_argument("--noprompt", help="Run in non interactive mode",
|
global_group.add_argument("--noprompt", help="Run in non interactive mode",
|
||||||
action='store_true', default=False)
|
action='store_true', default=False)
|
||||||
global_group.add_argument("--skip-card-init", help="Skip all card/profile initialization",
|
global_group.add_argument("--skip-card-init", help="Skip all card/profile initialization",
|
||||||
action='store_true', default=False)
|
action='store_true', default=False)
|
||||||
|
global_group.add_argument("--verbose", help="Enable verbose logging",
|
||||||
|
action='store_true', default=False)
|
||||||
|
|
||||||
|
card_key_group = option_parser.add_argument_group('Card Key Provider Options')
|
||||||
|
card_key_group.add_argument('--csv', metavar='FILE',
|
||||||
|
default=str(Path.home()) + "/.osmocom/pysim/card_data.csv",
|
||||||
|
help='Read card data from CSV file')
|
||||||
|
card_key_group.add_argument('--csv-column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||||
|
help=argparse.SUPPRESS, dest='column_key')
|
||||||
|
card_key_group.add_argument('--column-key', metavar='FIELD:AES_KEY_HEX', default=[], action='append',
|
||||||
|
help='per-column AES transport key', dest='column_key')
|
||||||
|
|
||||||
adm_group = global_group.add_mutually_exclusive_group()
|
adm_group = global_group.add_mutually_exclusive_group()
|
||||||
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
|
adm_group.add_argument('-a', '--pin-adm', metavar='PIN_ADM1', dest='pin_adm', default=None,
|
||||||
@@ -1095,23 +1158,27 @@ option_parser.add_argument("command", nargs='?',
|
|||||||
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
|
option_parser.add_argument('command_args', nargs=argparse.REMAINDER,
|
||||||
help="Optional Arguments for command")
|
help="Optional Arguments for command")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
||||||
startup_errors = False
|
startup_errors = False
|
||||||
opts = option_parser.parse_args()
|
opts = option_parser.parse_args()
|
||||||
|
|
||||||
|
# Ensure that we are able to print formatted warnings from the beginning.
|
||||||
|
PySimLogger.setup(print, {logging.WARN: YELLOW})
|
||||||
|
if (opts.verbose):
|
||||||
|
PySimLogger.set_verbose(True)
|
||||||
|
PySimLogger.set_level(logging.DEBUG)
|
||||||
|
else:
|
||||||
|
PySimLogger.set_verbose(False)
|
||||||
|
PySimLogger.set_level(logging.INFO)
|
||||||
|
|
||||||
# Register csv-file as card data provider, either from specified CSV
|
# Register csv-file as card data provider, either from specified CSV
|
||||||
# or from CSV file in home directory
|
# or from CSV file in home directory
|
||||||
csv_column_keys = {}
|
column_keys = {}
|
||||||
for par in opts.csv_column_key:
|
for par in opts.column_key:
|
||||||
name, key = par.split(':')
|
name, key = par.split(':')
|
||||||
csv_column_keys[name] = key
|
column_keys[name] = key
|
||||||
csv_default = str(Path.home()) + "/.osmocom/pysim/card_data.csv"
|
if os.path.isfile(opts.csv):
|
||||||
if opts.csv:
|
card_key_provider_register(CardKeyProviderCsv(opts.csv, column_keys))
|
||||||
card_key_provider_register(CardKeyProviderCsv(opts.csv, csv_column_keys))
|
|
||||||
if os.path.isfile(csv_default):
|
|
||||||
card_key_provider_register(CardKeyProviderCsv(csv_default, csv_column_keys))
|
|
||||||
|
|
||||||
# Init card reader driver
|
# Init card reader driver
|
||||||
sl = init_reader(opts, proactive_handler = Proact())
|
sl = init_reader(opts, proactive_handler = Proact())
|
||||||
@@ -1127,7 +1194,7 @@ if __name__ == '__main__':
|
|||||||
# able to tolerate and recover from that.
|
# able to tolerate and recover from that.
|
||||||
try:
|
try:
|
||||||
rs, card = init_card(sl, opts.skip_card_init)
|
rs, card = init_card(sl, opts.skip_card_init)
|
||||||
app = PysimApp(card, rs, sl, ch)
|
app = PysimApp(opts.verbose, card, rs, sl, ch)
|
||||||
except:
|
except:
|
||||||
startup_errors = True
|
startup_errors = True
|
||||||
print("Card initialization (%s) failed with an exception:" % str(sl))
|
print("Card initialization (%s) failed with an exception:" % str(sl))
|
||||||
@@ -1139,7 +1206,7 @@ if __name__ == '__main__':
|
|||||||
print(" it should also be noted that some readers may behave strangely when no card")
|
print(" it should also be noted that some readers may behave strangely when no card")
|
||||||
print(" is inserted.)")
|
print(" is inserted.)")
|
||||||
print("")
|
print("")
|
||||||
app = PysimApp(None, None, sl, ch)
|
app = PysimApp(opts.verbose, None, None, sl, ch)
|
||||||
|
|
||||||
# If the user supplies an ADM PIN at via commandline args authenticate
|
# If the user supplies an ADM PIN at via commandline args authenticate
|
||||||
# immediately so that the user does not have to use the shell commands
|
# immediately so that the user does not have to use the shell commands
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ from pySim.cards import UiccCardBase
|
|||||||
from pySim.exceptions import *
|
from pySim.exceptions import *
|
||||||
from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload, BearerDescription
|
from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload, BearerDescription
|
||||||
from pySim.cat import DeviceIdentities, Address, OtherAddress, UiccTransportLevel, BufferSize
|
from pySim.cat import DeviceIdentities, Address, OtherAddress, UiccTransportLevel, BufferSize
|
||||||
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength
|
from pySim.cat import ChannelStatus, ChannelData, ChannelDataLength, EventDownload, EventList
|
||||||
from pySim.utils import b2h, h2b
|
from pySim.utils import b2h, h2b
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -71,24 +71,46 @@ class MyApduTracer(ApduTracer):
|
|||||||
print("-> %s %s" % (cmd[:10], cmd[10:]))
|
print("-> %s %s" % (cmd[:10], cmd[10:]))
|
||||||
print("<- %s: %s" % (sw, resp))
|
print("<- %s: %s" % (sw, resp))
|
||||||
|
|
||||||
class TcpProtocol(protocol.Protocol):
|
|
||||||
def dataReceived(self, data):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def tcp_connected_callback(p: protocol.Protocol):
|
def tcp_connected_callback(p: protocol.Protocol):
|
||||||
"""called by twisted TCP client."""
|
"""called by twisted TCP client."""
|
||||||
logger.error("%s: connected!" % p)
|
logger.error("%s: connected!" % p)
|
||||||
|
for data in p.pending_tx:
|
||||||
|
p.transport.write(data)
|
||||||
|
|
||||||
class ProactChannel:
|
class ProactChannel(protocol.Protocol):
|
||||||
"""Representation of a single proective channel."""
|
"""Representation of a single proective channel."""
|
||||||
def __init__(self, channels: 'ProactChannels', chan_nr: int):
|
def __init__(self, channels: 'ProactChannels', chan_nr: int):
|
||||||
self.channels = channels
|
self.channels = channels
|
||||||
self.chan_nr = chan_nr
|
self.chan_nr = chan_nr
|
||||||
self.ep = None
|
self.ep = None
|
||||||
|
self.pending_tx = []
|
||||||
|
self.pending_rx = bytearray()
|
||||||
|
|
||||||
|
def write(self, data: bytes):
|
||||||
|
if self.connected:
|
||||||
|
self.transport.write(data)
|
||||||
|
else:
|
||||||
|
self.pending_tx.append(data)
|
||||||
|
|
||||||
|
def dataReceived(self, data: bytes):
|
||||||
|
logger.error(f"Got data (len={len(data)}): {data}")
|
||||||
|
self.pending_rx.extend(data)
|
||||||
|
# Send ENVELOPE with EventDownload Data available
|
||||||
|
event_list_ie = EventList(decoded=[ EventList.Event.data_available])
|
||||||
|
channel_status_ie = ChannelStatus(decoded='8100')
|
||||||
|
channel_data_len_ie = ChannelDataLength(decoded=min(255,len(self.pending_rx)))
|
||||||
|
dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
|
||||||
|
event_dl = EventDownload(children=[event_list_ie, dev_ids, channel_status_ie, channel_data_len_ie])
|
||||||
|
# 3) send to the card
|
||||||
|
envelope_hex = b2h(event_dl.to_tlv())
|
||||||
|
logger.info("ENVELOPE Event: %s" % envelope_hex)
|
||||||
|
global g_ms
|
||||||
|
(data, sw) = g_ms.scc.envelope(envelope_hex)
|
||||||
|
logger.info("SW %s: %s" % (sw, data))
|
||||||
|
# FIXME: Handle result?!
|
||||||
|
|
||||||
|
def connectionLost(self, reason):
|
||||||
|
logger.error("connection lost: %s" % reason)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""Close the channel."""
|
"""Close the channel."""
|
||||||
@@ -174,14 +196,13 @@ class Proact(ProactiveHandler):
|
|||||||
raise ValueError('Unsupported protocol_type')
|
raise ValueError('Unsupported protocol_type')
|
||||||
if other_addr_ie.decoded.get('type_of_address', None) != 'ipv4':
|
if other_addr_ie.decoded.get('type_of_address', None) != 'ipv4':
|
||||||
raise ValueError('Unsupported type_of_address')
|
raise ValueError('Unsupported type_of_address')
|
||||||
ipv4_bytes = h2b(other_addr_ie.decoded['address'])
|
ipv4_bytes = other_addr_ie.decoded['address']
|
||||||
ipv4_str = '%u.%u.%u.%u' % (ipv4_bytes[0], ipv4_bytes[1], ipv4_bytes[2], ipv4_bytes[3])
|
ipv4_str = '%u.%u.%u.%u' % (ipv4_bytes[0], ipv4_bytes[1], ipv4_bytes[2], ipv4_bytes[3])
|
||||||
port_nr = transp_lvl_ie.decoded['port_number']
|
port_nr = transp_lvl_ie.decoded['port_number']
|
||||||
print("%s:%u" % (ipv4_str, port_nr))
|
logger.error("OpenChannel opening with %s:%u" % (ipv4_str, port_nr))
|
||||||
channel = self.channels.channel_create()
|
channel = self.channels.channel_create()
|
||||||
channel.ep = endpoints.TCP4ClientEndpoint(reactor, ipv4_str, port_nr)
|
channel.ep = endpoints.TCP4ClientEndpoint(reactor, ipv4_str, port_nr)
|
||||||
channel.prot = TcpProtocol()
|
d = endpoints.connectProtocol(channel.ep, channel)
|
||||||
d = endpoints.connectProtocol(channel.ep, channel.prot)
|
|
||||||
# FIXME: why is this never called despite the client showing the inbound connection?
|
# FIXME: why is this never called despite the client showing the inbound connection?
|
||||||
d.addCallback(tcp_connected_callback)
|
d.addCallback(tcp_connected_callback)
|
||||||
|
|
||||||
@@ -213,6 +234,17 @@ class Proact(ProactiveHandler):
|
|||||||
# ]}
|
# ]}
|
||||||
logger.info("ReceiveData")
|
logger.info("ReceiveData")
|
||||||
logger.info(pcmd)
|
logger.info(pcmd)
|
||||||
|
dev_id_ie = Proact._find_first_element_of_type(pcmd.children, DeviceIdentities)
|
||||||
|
chan_data_len_ie = Proact._find_first_element_of_type(pcmd.children, ChannelDataLength)
|
||||||
|
len_requested = chan_data_len_ie.decoded
|
||||||
|
chan_str = dev_id_ie.decoded['dest_dev_id']
|
||||||
|
chan_nr = 1 # FIXME
|
||||||
|
chan = self.channels.channels.get(chan_nr, None)
|
||||||
|
|
||||||
|
requested = chan.pending_rx[:len_requested]
|
||||||
|
chan.pending_rx = chan.pending_rx[len_requested:]
|
||||||
|
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
|
||||||
|
|
||||||
# Terminal Response example: [
|
# Terminal Response example: [
|
||||||
# {'command_details': {'command_number': 1,
|
# {'command_details': {'command_number': 1,
|
||||||
# 'type_of_command': 'receive_data',
|
# 'type_of_command': 'receive_data',
|
||||||
@@ -222,7 +254,8 @@ class Proact(ProactiveHandler):
|
|||||||
# {'channel_data': '16030100040e000000'},
|
# {'channel_data': '16030100040e000000'},
|
||||||
# {'channel_data_length': 0}
|
# {'channel_data_length': 0}
|
||||||
# ]
|
# ]
|
||||||
return self.prepare_response(pcmd) + []
|
resp = self.prepare_response(pcmd) + [ChannelData(decoded=requested), ChannelDataLength(decoded=min(255, len(chan.pending_rx)))]
|
||||||
|
return resp
|
||||||
|
|
||||||
def handle_SendData(self, pcmd: ProactiveCommand):
|
def handle_SendData(self, pcmd: ProactiveCommand):
|
||||||
"""Send/write data received from the SIM to the socket."""
|
"""Send/write data received from the SIM to the socket."""
|
||||||
@@ -240,7 +273,10 @@ class Proact(ProactiveHandler):
|
|||||||
chan_str = dev_id_ie.decoded['dest_dev_id']
|
chan_str = dev_id_ie.decoded['dest_dev_id']
|
||||||
chan_nr = 1 # FIXME
|
chan_nr = 1 # FIXME
|
||||||
chan = self.channels.channels.get(chan_nr, None)
|
chan = self.channels.channels.get(chan_nr, None)
|
||||||
# FIXME chan.prot.transport.write(h2b(chan_data_ie.decoded))
|
# FIXME
|
||||||
|
logger.error(f"Chan data received: {chan_data_ie.decoded}")
|
||||||
|
chan.write(chan_data_ie.decoded)
|
||||||
|
#chan.write(h2b(chan_data_ie.decoded))
|
||||||
# Terminal Response example: [
|
# Terminal Response example: [
|
||||||
# {'command_details': {'command_number': 1,
|
# {'command_details': {'command_number': 1,
|
||||||
# 'type_of_command': 'send_data',
|
# 'type_of_command': 'send_data',
|
||||||
@@ -425,4 +461,3 @@ if __name__ == '__main__':
|
|||||||
g_ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip, opts.smpp_system_id, opts.smpp_password)
|
g_ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip, opts.smpp_system_id, opts.smpp_password)
|
||||||
g_ms.connect_to_card(tp)
|
g_ms.connect_to_card(tp)
|
||||||
reactor.run()
|
reactor.run()
|
||||||
|
|
||||||
|
|||||||
@@ -151,7 +151,7 @@ global_group.add_argument('--no-suppress-select', action='store_false', dest='su
|
|||||||
global_group.add_argument('--no-suppress-status', action='store_false', dest='suppress_status',
|
global_group.add_argument('--no-suppress-status', action='store_false', dest='suppress_status',
|
||||||
help="""
|
help="""
|
||||||
Don't suppress displaying STATUS APDUs. We normally suppress them as they don't provide any
|
Don't suppress displaying STATUS APDUs. We normally suppress them as they don't provide any
|
||||||
information that was not already received in resposne to the most recent SEELCT.""")
|
information that was not already received in response to the most recent SEELCT.""")
|
||||||
global_group.add_argument('--show-raw-apdu', action='store_true', dest='show_raw_apdu',
|
global_group.add_argument('--show-raw-apdu', action='store_true', dest='show_raw_apdu',
|
||||||
help="""Show the raw APDU in addition to its parsed form.""")
|
help="""Show the raw APDU in addition to its parsed form.""")
|
||||||
|
|
||||||
@@ -188,7 +188,7 @@ parser_rspro_pyshark_live.add_argument('-i', '--interface', required=True,
|
|||||||
parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
|
parser_tcaloader_log = subparsers.add_parser('tca-loader-log', help="""
|
||||||
Read APDUs from a TCA Loader log file.""")
|
Read APDUs from a TCA Loader log file.""")
|
||||||
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
|
parser_tcaloader_log.add_argument('-f', '--log-file', required=True,
|
||||||
help='Name of te log file to be read')
|
help='Name of the log file to be read')
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
|||||||
@@ -317,7 +317,7 @@ class ADF_ARAM(CardADF):
|
|||||||
store_ref_ar_do_parse = argparse.ArgumentParser()
|
store_ref_ar_do_parse = argparse.ArgumentParser()
|
||||||
# REF-DO
|
# REF-DO
|
||||||
store_ref_ar_do_parse.add_argument(
|
store_ref_ar_do_parse.add_argument(
|
||||||
'--device-app-id', required=True, help='Identifies the specific device application that the rule appplies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
|
'--device-app-id', required=True, help='Identifies the specific device application that the rule applies to. Hash of Certificate of Application Provider, or UUID. (20/32 hex bytes)')
|
||||||
aid_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
|
aid_grp = store_ref_ar_do_parse.add_mutually_exclusive_group()
|
||||||
aid_grp.add_argument(
|
aid_grp.add_argument(
|
||||||
'--aid', help='Identifies the specific SE application for which rules are to be stored. Can be a partial AID, containing for example only the RID. (5-16 or 0 hex bytes)')
|
'--aid', help='Identifies the specific SE application for which rules are to be stored. Can be a partial AID, containing for example only the RID. (5-16 or 0 hex bytes)')
|
||||||
@@ -399,7 +399,7 @@ class ADF_ARAM(CardADF):
|
|||||||
sw_aram = {
|
sw_aram = {
|
||||||
'ARA-M': {
|
'ARA-M': {
|
||||||
'6381': 'Rule successfully stored but an access rule already exists',
|
'6381': 'Rule successfully stored but an access rule already exists',
|
||||||
'6382': 'Rule successfully stored bu contained at least one unknown (discarded) BER-TLV',
|
'6382': 'Rule successfully stored but contained at least one unknown (discarded) BER-TLV',
|
||||||
'6581': 'Memory Problem',
|
'6581': 'Memory Problem',
|
||||||
'6700': 'Wrong Length in Lc',
|
'6700': 'Wrong Length in Lc',
|
||||||
'6981': 'DO is not supported by the ARA-M/ARA-C',
|
'6981': 'DO is not supported by the ARA-M/ARA-C',
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
|
|||||||
operation with pySim-shell.
|
operation with pySim-shell.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# (C) 2021-2024 by Sysmocom s.f.m.c. GmbH
|
# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
#
|
#
|
||||||
# Author: Philipp Maier, Harald Welte
|
# Author: Philipp Maier, Harald Welte
|
||||||
@@ -31,128 +31,161 @@ operation with pySim-shell.
|
|||||||
from typing import List, Dict, Optional
|
from typing import List, Dict, Optional
|
||||||
from Cryptodome.Cipher import AES
|
from Cryptodome.Cipher import AES
|
||||||
from osmocom.utils import h2b, b2h
|
from osmocom.utils import h2b, b2h
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import csv
|
import csv
|
||||||
|
import logging
|
||||||
|
|
||||||
|
log = PySimLogger.get("CARDKEY")
|
||||||
|
|
||||||
card_key_providers = [] # type: List['CardKeyProvider']
|
card_key_providers = [] # type: List['CardKeyProvider']
|
||||||
|
|
||||||
# well-known groups of columns relate to a given functionality. This avoids having
|
class CardKeyFieldCryptor:
|
||||||
# to specify the same transport key N number of times, if the same key is used for multiple
|
"""
|
||||||
# fields of one group, like KIC+KID+KID of one SD.
|
A Card key field encryption class that may be used by Card key provider implementations to add support for
|
||||||
CRYPT_GROUPS = {
|
a column-based encryption to protect sensitive material (cryptographic key material, ADM keys, etc.).
|
||||||
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
|
The sensitive material is encrypted using a "key-encryption key", occasionally also known as "transport key"
|
||||||
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
|
before it is stored into a file or database (see also GSMA FS.28). The "transport key" is then used to decrypt
|
||||||
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
|
the key material on demand.
|
||||||
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
"""
|
||||||
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
|
|
||||||
|
# well-known groups of columns relate to a given functionality. This avoids having
|
||||||
|
# to specify the same transport key N number of times, if the same key is used for multiple
|
||||||
|
# fields of one group, like KIC+KID+KID of one SD.
|
||||||
|
__CRYPT_GROUPS = {
|
||||||
|
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
|
||||||
|
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
|
||||||
|
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
|
||||||
|
'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
|
||||||
|
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
|
||||||
}
|
}
|
||||||
|
|
||||||
class CardKeyProvider(abc.ABC):
|
__IV = b'\x23' * 16
|
||||||
"""Base class, not containing any concrete implementation."""
|
|
||||||
|
|
||||||
VALID_KEY_FIELD_NAMES = ['ICCID', 'EID', 'IMSI' ]
|
|
||||||
|
|
||||||
# check input parameters, but do nothing concrete yet
|
|
||||||
def _verify_get_data(self, fields: List[str] = [], key: str = 'ICCID', value: str = "") -> Dict[str, str]:
|
|
||||||
"""Verify multiple fields for identified card.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
|
|
||||||
key : look-up key to identify card data, such as 'ICCID'
|
|
||||||
value : value for look-up key to identify card data
|
|
||||||
Returns:
|
|
||||||
dictionary of {field, value} strings for each requested field from 'fields'
|
|
||||||
"""
|
|
||||||
|
|
||||||
if key not in self.VALID_KEY_FIELD_NAMES:
|
|
||||||
raise ValueError("Key field name '%s' is not a valid field name, valid field names are: %s" %
|
|
||||||
(key, str(self.VALID_KEY_FIELD_NAMES)))
|
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
def get_field(self, field: str, key: str = 'ICCID', value: str = "") -> Optional[str]:
|
|
||||||
"""get a single field from CSV file using a specified key/value pair"""
|
|
||||||
fields = [field]
|
|
||||||
result = self.get(fields, key, value)
|
|
||||||
return result.get(field)
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
|
||||||
"""Get multiple card-individual fields for identified card.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
|
|
||||||
key : look-up key to identify card data, such as 'ICCID'
|
|
||||||
value : value for look-up key to identify card data
|
|
||||||
Returns:
|
|
||||||
dictionary of {field, value} strings for each requested field from 'fields'
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class CardKeyProviderCsv(CardKeyProvider):
|
|
||||||
"""Card key provider implementation that allows to query against a specified CSV file.
|
|
||||||
Supports column-based encryption as it is generally a bad idea to store cryptographic key material in
|
|
||||||
plaintext. Instead, the key material should be encrypted by a "key-encryption key", occasionally also
|
|
||||||
known as "transport key" (see GSMA FS.28)."""
|
|
||||||
IV = b'\x23' * 16
|
|
||||||
csv_file = None
|
|
||||||
filename = None
|
|
||||||
|
|
||||||
def __init__(self, filename: str, transport_keys: dict):
|
|
||||||
"""
|
|
||||||
Args:
|
|
||||||
filename : file name (path) of CSV file containing card-individual key/data
|
|
||||||
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
|
|
||||||
respective field (column) of the CSV. This is done so that different fields
|
|
||||||
(columns) can use different transport keys, which is strongly recommended by
|
|
||||||
GSMA FS.28
|
|
||||||
"""
|
|
||||||
self.csv_file = open(filename, 'r')
|
|
||||||
if not self.csv_file:
|
|
||||||
raise RuntimeError("Could not open CSV file '%s'" % filename)
|
|
||||||
self.filename = filename
|
|
||||||
self.transport_keys = self.process_transport_keys(transport_keys)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def process_transport_keys(transport_keys: dict):
|
def __dict_keys_to_upper(d: dict) -> dict:
|
||||||
|
return {k.upper():v for k,v in d.items()}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __process_transport_keys(transport_keys: dict, crypt_groups: dict):
|
||||||
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
|
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
|
||||||
new_dict = {}
|
new_dict = {}
|
||||||
for name, key in transport_keys.items():
|
for name, key in transport_keys.items():
|
||||||
if name in CRYPT_GROUPS:
|
if name in crypt_groups:
|
||||||
for field in CRYPT_GROUPS[name]:
|
for field in crypt_groups[name]:
|
||||||
new_dict[field] = key
|
new_dict[field] = key
|
||||||
else:
|
else:
|
||||||
new_dict[name] = key
|
new_dict[name] = key
|
||||||
return new_dict
|
return new_dict
|
||||||
|
|
||||||
def _decrypt_field(self, field_name: str, encrypted_val: str) -> str:
|
def __init__(self, transport_keys: dict):
|
||||||
"""decrypt a single field, if we have a transport key for the field of that name."""
|
"""
|
||||||
if not field_name in self.transport_keys:
|
Create new field encryptor/decryptor object and set transport keys, usually one for each column. In some cases
|
||||||
|
it is also possible to use a single key for multiple columns (see also __CRYPT_GROUPS)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
|
||||||
|
respective field (column) of the CSV. This is done so that different fields
|
||||||
|
(columns) can use different transport keys, which is strongly recommended by
|
||||||
|
GSMA FS.28
|
||||||
|
"""
|
||||||
|
self.transport_keys = self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys),
|
||||||
|
self.__CRYPT_GROUPS)
|
||||||
|
for name, key in self.transport_keys.items():
|
||||||
|
log.debug("Encrypting/decrypting field %s using AES key %s" % (name, key))
|
||||||
|
|
||||||
|
def decrypt_field(self, field_name: str, encrypted_val: str) -> str:
|
||||||
|
"""
|
||||||
|
Decrypt a single field. The decryption is only applied if we have a transport key is known under the provided
|
||||||
|
field name, otherwise the field is treated as plaintext and passed through as it is.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
field_name : name of the field to decrypt (used to identify which key to use)
|
||||||
|
encrypted_val : encrypted field value
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
plaintext field value
|
||||||
|
"""
|
||||||
|
if not field_name.upper() in self.transport_keys:
|
||||||
return encrypted_val
|
return encrypted_val
|
||||||
cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, self.IV)
|
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
||||||
return b2h(cipher.decrypt(h2b(encrypted_val)))
|
return b2h(cipher.decrypt(h2b(encrypted_val)))
|
||||||
|
|
||||||
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
def encrypt_field(self, field_name: str, plaintext_val: str) -> str:
|
||||||
super()._verify_get_data(fields, key, value)
|
"""
|
||||||
|
Encrypt a single field. The encryption is only applied if we have a transport key is known under the provided
|
||||||
|
field name, otherwise the field is treated as non sensitive and passed through as it is.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
field_name : name of the field to decrypt (used to identify which key to use)
|
||||||
|
encrypted_val : encrypted field value
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
plaintext field value
|
||||||
|
"""
|
||||||
|
if not field_name.upper() in self.transport_keys:
|
||||||
|
return plaintext_val
|
||||||
|
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
|
||||||
|
return b2h(cipher.encrypt(h2b(plaintext_val)))
|
||||||
|
|
||||||
|
class CardKeyProvider(abc.ABC):
|
||||||
|
"""Base class, not containing any concrete implementation."""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||||
|
"""
|
||||||
|
Get multiple card-individual fields for identified card. This method should not fail with an exception in
|
||||||
|
case the entry, columns or even the key column itsself is not found.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fields : list of valid field names such as 'ADM1', 'PIN1', ... which are to be obtained
|
||||||
|
key : look-up key to identify card data, such as 'ICCID'
|
||||||
|
value : value for look-up key to identify card data
|
||||||
|
Returns:
|
||||||
|
dictionary of {field : value, ...} strings for each requested field from 'fields'. In case nothing is
|
||||||
|
fond None shall be returned.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return type(self).__name__
|
||||||
|
|
||||||
|
class CardKeyProviderCsv(CardKeyProvider):
|
||||||
|
"""Card key provider implementation that allows to query against a specified CSV file."""
|
||||||
|
|
||||||
|
def __init__(self, csv_filename: str, transport_keys: dict):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
csv_filename : file name (path) of CSV file containing card-individual key/data
|
||||||
|
transport_keys : (see class CardKeyFieldCryptor)
|
||||||
|
"""
|
||||||
|
self.csv_file = open(csv_filename, 'r')
|
||||||
|
if not self.csv_file:
|
||||||
|
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
|
||||||
|
self.csv_filename = csv_filename
|
||||||
|
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||||
|
|
||||||
|
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
|
||||||
self.csv_file.seek(0)
|
self.csv_file.seek(0)
|
||||||
cr = csv.DictReader(self.csv_file)
|
cr = csv.DictReader(self.csv_file)
|
||||||
if not cr:
|
if not cr:
|
||||||
raise RuntimeError(
|
raise RuntimeError("Could not open DictReader for CSV-File '%s'" % self.csv_filename)
|
||||||
"Could not open DictReader for CSV-File '%s'" % self.filename)
|
|
||||||
cr.fieldnames = [field.upper() for field in cr.fieldnames]
|
cr.fieldnames = [field.upper() for field in cr.fieldnames]
|
||||||
|
|
||||||
rc = {}
|
if key not in cr.fieldnames:
|
||||||
|
return None
|
||||||
|
return_dict = {}
|
||||||
for row in cr:
|
for row in cr:
|
||||||
if row[key] == value:
|
if row[key] == value:
|
||||||
for f in fields:
|
for f in fields:
|
||||||
if f in row:
|
if f in row:
|
||||||
rc.update({f: self._decrypt_field(f, row[f])})
|
return_dict.update({f: self.crypt.decrypt_field(f, row[f])})
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("CSV-File '%s' lacks column '%s'" %
|
raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.csv_filename, f))
|
||||||
(self.filename, f))
|
if return_dict == {}:
|
||||||
return rc
|
return None
|
||||||
|
return return_dict
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):
|
||||||
@@ -163,11 +196,11 @@ def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key
|
|||||||
provider_list : override the list of providers from the global default
|
provider_list : override the list of providers from the global default
|
||||||
"""
|
"""
|
||||||
if not isinstance(provider, CardKeyProvider):
|
if not isinstance(provider, CardKeyProvider):
|
||||||
raise ValueError("provider is not a card data provier")
|
raise ValueError("provider is not a card data provider")
|
||||||
provider_list.append(provider)
|
provider_list.append(provider)
|
||||||
|
|
||||||
|
|
||||||
def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
|
def card_key_provider_get(fields: list[str], key: str, value: str, provider_list=card_key_providers) -> Dict[str, str]:
|
||||||
"""Query all registered card data providers for card-individual [key] data.
|
"""Query all registered card data providers for card-individual [key] data.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -178,17 +211,21 @@ def card_key_provider_get(fields, key: str, value: str, provider_list=card_key_p
|
|||||||
Returns:
|
Returns:
|
||||||
dictionary of {field, value} strings for each requested field from 'fields'
|
dictionary of {field, value} strings for each requested field from 'fields'
|
||||||
"""
|
"""
|
||||||
|
key = key.upper()
|
||||||
|
fields = [f.upper() for f in fields]
|
||||||
for p in provider_list:
|
for p in provider_list:
|
||||||
if not isinstance(p, CardKeyProvider):
|
if not isinstance(p, CardKeyProvider):
|
||||||
raise ValueError(
|
raise ValueError("Provider list contains element which is not a card data provider")
|
||||||
"provider list contains element which is not a card data provier")
|
log.debug("Searching for card key data (key=%s, value=%s, provider=%s)" % (key, value, str(p)))
|
||||||
result = p.get(fields, key, value)
|
result = p.get(fields, key, value)
|
||||||
if result:
|
if result:
|
||||||
|
log.debug("Found card data: %s" % (str(result)))
|
||||||
return result
|
return result
|
||||||
return {}
|
|
||||||
|
raise ValueError("Unable to find card key data (key=%s, value=%s, fields=%s)" % (key, value, str(fields)))
|
||||||
|
|
||||||
|
|
||||||
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> Optional[str]:
|
def card_key_provider_get_field(field: str, key: str, value: str, provider_list=card_key_providers) -> str:
|
||||||
"""Query all registered card data providers for a single field.
|
"""Query all registered card data providers for a single field.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -199,11 +236,7 @@ def card_key_provider_get_field(field: str, key: str, value: str, provider_list=
|
|||||||
Returns:
|
Returns:
|
||||||
dictionary of {field, value} strings for the requested field
|
dictionary of {field, value} strings for the requested field
|
||||||
"""
|
"""
|
||||||
for p in provider_list:
|
|
||||||
if not isinstance(p, CardKeyProvider):
|
fields = [field]
|
||||||
raise ValueError(
|
result = card_key_provider_get(fields, key, value, card_key_providers)
|
||||||
"provider list contains element which is not a card data provier")
|
return result.get(field.upper())
|
||||||
result = p.get_field(field, key, value)
|
|
||||||
if result:
|
|
||||||
return result
|
|
||||||
return None
|
|
||||||
|
|||||||
@@ -112,6 +112,8 @@ class UiccCardBase(SimCardBase):
|
|||||||
def probe(self) -> bool:
|
def probe(self) -> bool:
|
||||||
# EF.DIR is a mandatory EF on all ICCIDs; however it *may* also exist on a TS 51.011 SIM
|
# EF.DIR is a mandatory EF on all ICCIDs; however it *may* also exist on a TS 51.011 SIM
|
||||||
ef_dir = EF_DIR()
|
ef_dir = EF_DIR()
|
||||||
|
# select MF first
|
||||||
|
self.file_exists("3f00")
|
||||||
return self.file_exists(ef_dir.fid)
|
return self.file_exists(ef_dir.fid)
|
||||||
|
|
||||||
def read_aids(self) -> List[Hexstr]:
|
def read_aids(self) -> List[Hexstr]:
|
||||||
|
|||||||
@@ -316,19 +316,19 @@ class FileList(COMPR_TLV_IE, tag=0x92):
|
|||||||
_construct = Struct('number_of_files'/Int8ub,
|
_construct = Struct('number_of_files'/Int8ub,
|
||||||
'files'/GreedyRange(FileId))
|
'files'/GreedyRange(FileId))
|
||||||
|
|
||||||
# TS 102 223 Secton 8.19
|
# TS 102 223 Section 8.19
|
||||||
class LocationInformation(COMPR_TLV_IE, tag=0x93):
|
class LocationInformation(COMPR_TLV_IE, tag=0x93):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# TS 102 223 Secton 8.20
|
# TS 102 223 Section 8.20
|
||||||
class IMEI(COMPR_TLV_IE, tag=0x94):
|
class IMEI(COMPR_TLV_IE, tag=0x94):
|
||||||
_construct = BcdAdapter(GreedyBytes)
|
_construct = BcdAdapter(GreedyBytes)
|
||||||
|
|
||||||
# TS 102 223 Secton 8.21
|
# TS 102 223 Section 8.21
|
||||||
class HelpRequest(COMPR_TLV_IE, tag=0x95):
|
class HelpRequest(COMPR_TLV_IE, tag=0x95):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# TS 102 223 Secton 8.22
|
# TS 102 223 Section 8.22
|
||||||
class NetworkMeasurementResults(COMPR_TLV_IE, tag=0x96):
|
class NetworkMeasurementResults(COMPR_TLV_IE, tag=0x96):
|
||||||
_construct = BcdAdapter(GreedyBytes)
|
_construct = BcdAdapter(GreedyBytes)
|
||||||
|
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ class SimCardCommands:
|
|||||||
Returns:
|
Returns:
|
||||||
Tuple of (decoded_data, sw)
|
Tuple of (decoded_data, sw)
|
||||||
"""
|
"""
|
||||||
cmd = cmd_constr.build(cmd_data) if cmd_data else ''
|
cmd = cmd_constr.build(cmd_data) if cmd_data else b''
|
||||||
lc = i2h([len(cmd)]) if cmd_data else ''
|
lc = i2h([len(cmd)]) if cmd_data else ''
|
||||||
le = '00' if resp_constr else ''
|
le = '00' if resp_constr else ''
|
||||||
pdu = ''.join([cla, ins, p1, p2, lc, b2h(cmd), le])
|
pdu = ''.join([cla, ins, p1, p2, lc, b2h(cmd), le])
|
||||||
@@ -285,7 +285,7 @@ class SimCardCommands:
|
|||||||
return self.send_apdu_checksw(self.cla_byte + "a40304")
|
return self.send_apdu_checksw(self.cla_byte + "a40304")
|
||||||
|
|
||||||
def select_adf(self, aid: Hexstr) -> ResTuple:
|
def select_adf(self, aid: Hexstr) -> ResTuple:
|
||||||
"""Execute SELECT a given Applicaiton ADF.
|
"""Execute SELECT a given Application ADF.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
aid : application identifier as hex string
|
aid : application identifier as hex string
|
||||||
@@ -577,7 +577,7 @@ class SimCardCommands:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
rand : 16 byte random data as hex string (RAND)
|
rand : 16 byte random data as hex string (RAND)
|
||||||
autn : 8 byte Autentication Token (AUTN)
|
autn : 8 byte Authentication Token (AUTN)
|
||||||
context : 16 byte random data ('3g' or 'gsm')
|
context : 16 byte random data ('3g' or 'gsm')
|
||||||
"""
|
"""
|
||||||
# 3GPP TS 31.102 Section 7.1.2.1
|
# 3GPP TS 31.102 Section 7.1.2.1
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ class BspAlgoCrypt(BspAlgo, abc.ABC):
|
|||||||
block_nr = self.block_nr
|
block_nr = self.block_nr
|
||||||
ciphertext = self._encrypt(padded_data)
|
ciphertext = self._encrypt(padded_data)
|
||||||
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
|
logger.debug("encrypt(block_nr=%u, s_enc=%s, plaintext=%s, padded=%s) -> %s",
|
||||||
block_nr, b2h(self.s_enc), b2h(data), b2h(padded_data), b2h(ciphertext))
|
block_nr, b2h(self.s_enc)[:20], b2h(data)[:20], b2h(padded_data)[:20], b2h(ciphertext)[:20])
|
||||||
return ciphertext
|
return ciphertext
|
||||||
|
|
||||||
def decrypt(self, data:bytes) -> bytes:
|
def decrypt(self, data:bytes) -> bytes:
|
||||||
@@ -149,10 +149,20 @@ class BspAlgoMac(BspAlgo, abc.ABC):
|
|||||||
temp_data = self.mac_chain + tag_and_length + data
|
temp_data = self.mac_chain + tag_and_length + data
|
||||||
old_mcv = self.mac_chain
|
old_mcv = self.mac_chain
|
||||||
c_mac = self._auth(temp_data)
|
c_mac = self._auth(temp_data)
|
||||||
|
|
||||||
|
# DEBUG: Show MAC computation details
|
||||||
|
logger.debug(f"MAC_DEBUG: tag=0x{tag:02x}, lcc={lcc}")
|
||||||
|
logger.debug(f"MAC_DEBUG: tag_and_length: {tag_and_length.hex()}")
|
||||||
|
logger.debug(f"MAC_DEBUG: mac_chain[:20]: {old_mcv[:20].hex()}")
|
||||||
|
logger.debug(f"MAC_DEBUG: temp_data[:20]: {temp_data[:20].hex()}")
|
||||||
|
logger.debug(f"MAC_DEBUG: c_mac: {c_mac.hex()}")
|
||||||
|
|
||||||
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
|
# The output data is computed by concatenating the following data: the tag, the final length, the result of step 2 and the C-MAC value.
|
||||||
ret = tag_and_length + data + c_mac
|
ret = tag_and_length + data + c_mac
|
||||||
|
logger.debug(f"MAC_DEBUG: final_output[:20]: {ret[:20].hex()}")
|
||||||
|
|
||||||
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
|
logger.debug("auth(tag=0x%x, mcv=%s, s_mac=%s, plaintext=%s, temp=%s) -> %s",
|
||||||
tag, b2h(old_mcv), b2h(self.s_mac), b2h(data), b2h(temp_data), b2h(ret))
|
tag, b2h(old_mcv)[:20], b2h(self.s_mac)[:20], b2h(data)[:20], b2h(temp_data)[:20], b2h(ret)[:20])
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def verify(self, ciphertext: bytes) -> bool:
|
def verify(self, ciphertext: bytes) -> bool:
|
||||||
@@ -204,6 +214,11 @@ def bsp_key_derivation(shared_secret: bytes, key_type: int, key_length: int, hos
|
|||||||
s_enc = out[l:2*l]
|
s_enc = out[l:2*l]
|
||||||
s_mac = out[l*2:3*l]
|
s_mac = out[l*2:3*l]
|
||||||
|
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: kdf_out = {b2h(out)}")
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: initial_mcv = {b2h(initial_mac_chaining_value)}")
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: s_enc = {b2h(s_enc)}")
|
||||||
|
logger.debug(f"BSP_KDF_DEBUG: s_mac = {b2h(s_mac)}")
|
||||||
|
|
||||||
return s_enc, s_mac, initial_mac_chaining_value
|
return s_enc, s_mac, initial_mac_chaining_value
|
||||||
|
|
||||||
|
|
||||||
@@ -231,9 +246,21 @@ class BspInstance:
|
|||||||
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
|
"""Encrypt + MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||||
assert tag <= 255
|
assert tag <= 255
|
||||||
assert len(plaintext) <= self.max_payload_size
|
assert len(plaintext) <= self.max_payload_size
|
||||||
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext))
|
|
||||||
|
# DEBUG: Show what we're processing
|
||||||
|
logger.debug(f"BSP_DEBUG: encrypt_and_mac_one(tag=0x{tag:02x}, plaintext_len={len(plaintext)})")
|
||||||
|
logger.debug(f"BSP_DEBUG: plaintext[:20]: {plaintext[:20].hex()}")
|
||||||
|
logger.debug(f"BSP_DEBUG: s_enc[:20]: {self.c_algo.s_enc[:20].hex()}")
|
||||||
|
logger.debug(f"BSP_DEBUG: s_mac[:20]: {self.m_algo.s_mac[:20].hex()}")
|
||||||
|
|
||||||
|
logger.debug("encrypt_and_mac_one(tag=0x%x, plaintext=%s)", tag, b2h(plaintext)[:20])
|
||||||
ciphered = self.c_algo.encrypt(plaintext)
|
ciphered = self.c_algo.encrypt(plaintext)
|
||||||
|
logger.debug(f"BSP_DEBUG: ciphered[:20]: {ciphered[:20].hex()}")
|
||||||
|
|
||||||
maced = self.m_algo.auth(tag, ciphered)
|
maced = self.m_algo.auth(tag, ciphered)
|
||||||
|
logger.debug(f"BSP_DEBUG: final_result[:20]: {maced[:20].hex()}")
|
||||||
|
logger.debug(f"BSP_DEBUG: final_result_len: {len(maced)}")
|
||||||
|
|
||||||
return maced
|
return maced
|
||||||
|
|
||||||
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
|
def encrypt_and_mac(self, tag: int, plaintext:bytes) -> List[bytes]:
|
||||||
@@ -255,7 +282,7 @@ class BspInstance:
|
|||||||
def mac_only_one(self, tag: int, plaintext: bytes) -> bytes:
|
def mac_only_one(self, tag: int, plaintext: bytes) -> bytes:
|
||||||
"""MAC a single plaintext TLV. Returns the protected ciphertext."""
|
"""MAC a single plaintext TLV. Returns the protected ciphertext."""
|
||||||
assert tag <= 255
|
assert tag <= 255
|
||||||
assert len(plaintext) < self.max_payload_size
|
assert len(plaintext) <= self.max_payload_size
|
||||||
maced = self.m_algo.auth(tag, plaintext)
|
maced = self.m_algo.auth(tag, plaintext)
|
||||||
# The data block counter for ICV calculation is incremented also for each segment with C-MAC only.
|
# The data block counter for ICV calculation is incremented also for each segment with C-MAC only.
|
||||||
self.c_algo.block_nr += 1
|
self.c_algo.block_nr += 1
|
||||||
|
|||||||
@@ -116,7 +116,7 @@ class param:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
class Es2PlusApiFunction(JsonHttpApiFunction):
|
class Es2PlusApiFunction(JsonHttpApiFunction):
|
||||||
"""Base classs for representing an ES2+ API Function."""
|
"""Base class for representing an ES2+ API Function."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# ES2+ DownloadOrder function (SGP.22 section 5.3.1)
|
# ES2+ DownloadOrder function (SGP.22 section 5.3.1)
|
||||||
|
|||||||
@@ -25,6 +25,9 @@ import pySim.esim.rsp as rsp
|
|||||||
from pySim.esim.bsp import BspInstance
|
from pySim.esim.bsp import BspInstance
|
||||||
from pySim.esim import PMO
|
from pySim.esim import PMO
|
||||||
|
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
|
# Given that GSMA RSP uses ASN.1 in a very weird way, we actually cannot encode the full data type before
|
||||||
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
|
# signing, but we have to build parts of it separately first, then sign that, so we can put the signature
|
||||||
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
|
# into the same sequence as the signed data. We use the existing pySim TLV code for this.
|
||||||
@@ -73,10 +76,11 @@ def gen_replace_session_keys(ppk_enc: bytes, ppk_cmac: bytes, initial_mcv: bytes
|
|||||||
class ProfileMetadata:
|
class ProfileMetadata:
|
||||||
"""Representation of Profile metadata. Right now only the mandatory bits are
|
"""Representation of Profile metadata. Right now only the mandatory bits are
|
||||||
supported, but in general this should follow the StoreMetadataRequest of SGP.22 5.5.3"""
|
supported, but in general this should follow the StoreMetadataRequest of SGP.22 5.5.3"""
|
||||||
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str):
|
def __init__(self, iccid_bin: bytes, spn: str, profile_name: str, profile_class = 'operational'):
|
||||||
self.iccid_bin = iccid_bin
|
self.iccid_bin = iccid_bin
|
||||||
self.spn = spn
|
self.spn = spn
|
||||||
self.profile_name = profile_name
|
self.profile_name = profile_name
|
||||||
|
self.profile_class = profile_class
|
||||||
self.icon = None
|
self.icon = None
|
||||||
self.icon_type = None
|
self.icon_type = None
|
||||||
self.notifications = []
|
self.notifications = []
|
||||||
@@ -102,6 +106,14 @@ class ProfileMetadata:
|
|||||||
'serviceProviderName': self.spn,
|
'serviceProviderName': self.spn,
|
||||||
'profileName': self.profile_name,
|
'profileName': self.profile_name,
|
||||||
}
|
}
|
||||||
|
if self.profile_class == 'test':
|
||||||
|
smr['profileClass'] = 0
|
||||||
|
elif self.profile_class == 'provisioning':
|
||||||
|
smr['profileClass'] = 1
|
||||||
|
elif self.profile_class == 'operational':
|
||||||
|
smr['profileClass'] = 2
|
||||||
|
else:
|
||||||
|
raise ValueError('Unsupported Profile Class %s' % self.profile_class)
|
||||||
if self.icon:
|
if self.icon:
|
||||||
smr['icon'] = self.icon
|
smr['icon'] = self.icon
|
||||||
smr['iconType'] = self.icon_type
|
smr['iconType'] = self.icon_type
|
||||||
@@ -196,8 +208,12 @@ class BoundProfilePackage(ProfilePackage):
|
|||||||
# 'initialiseSecureChannelRequest'
|
# 'initialiseSecureChannelRequest'
|
||||||
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
|
bpp_seq = rsp.asn1.encode('InitialiseSecureChannelRequest', iscr)
|
||||||
# firstSequenceOf87
|
# firstSequenceOf87
|
||||||
|
logger.debug("BPP_ENCODE_DEBUG: Encrypting ConfigureISDP with BSP keys")
|
||||||
|
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-ENC: {bsp.c_algo.s_enc.hex()}")
|
||||||
|
logger.debug(f"BPP_ENCODE_DEBUG: BSP S-MAC: {bsp.m_algo.s_mac.hex()}")
|
||||||
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
|
bpp_seq += encode_seq(0xa0, bsp.encrypt_and_mac(0x87, conf_idsp_bin))
|
||||||
# sequenceOF88
|
# sequenceOF88
|
||||||
|
logger.debug("BPP_ENCODE_DEBUG: MAC-only StoreMetadata with BSP keys")
|
||||||
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
|
bpp_seq += encode_seq(0xa1, bsp.mac_only(0x88, smr_bin))
|
||||||
|
|
||||||
if self.ppp: # we have to use session keys
|
if self.ppp: # we have to use session keys
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
"""GSMA eSIM RSP ES9+ interface according ot SGP.22 v2.5"""
|
"""GSMA eSIM RSP ES9+ interface according to SGP.22 v2.5"""
|
||||||
|
|
||||||
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
# (C) 2024 by Harald Welte <laforge@osmocom.org>
|
||||||
#
|
#
|
||||||
|
|||||||
@@ -159,7 +159,7 @@ class ApiError(Exception):
|
|||||||
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
|
return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
|
||||||
|
|
||||||
class JsonHttpApiFunction(abc.ABC):
|
class JsonHttpApiFunction(abc.ABC):
|
||||||
"""Base classs for representing an HTTP[s] API Function."""
|
"""Base class for representing an HTTP[s] API Function."""
|
||||||
# the below class variables are expected to be overridden in derived classes
|
# the below class variables are expected to be overridden in derived classes
|
||||||
|
|
||||||
path = None
|
path = None
|
||||||
|
|||||||
@@ -90,13 +90,61 @@ class RspSessionState:
|
|||||||
# FIXME: how to add the public key from smdp_otpk to an instance of EllipticCurvePrivateKey?
|
# FIXME: how to add the public key from smdp_otpk to an instance of EllipticCurvePrivateKey?
|
||||||
del state['_smdp_otsk']
|
del state['_smdp_otsk']
|
||||||
del state['_smdp_ot_curve']
|
del state['_smdp_ot_curve']
|
||||||
# automatically recover all the remainig state
|
# automatically recover all the remaining state
|
||||||
self.__dict__.update(state)
|
self.__dict__.update(state)
|
||||||
|
|
||||||
|
|
||||||
class RspSessionStore(shelve.DbfilenameShelf):
|
class RspSessionStore:
|
||||||
"""A derived class as wrapper around the database-backed non-volatile storage 'shelve', in case we might
|
"""A wrapper around the database-backed storage 'shelve' for storing RspSessionState objects.
|
||||||
need to extend it in the future. We use it to store RspSessionState objects indexed by transactionId."""
|
Can be configured to use either file-based storage or in-memory storage.
|
||||||
|
We use it to store RspSessionState objects indexed by transactionId."""
|
||||||
|
|
||||||
|
def __init__(self, filename: Optional[str] = None, in_memory: bool = False):
|
||||||
|
self._in_memory = in_memory
|
||||||
|
|
||||||
|
if in_memory:
|
||||||
|
self._shelf = shelve.Shelf(dict())
|
||||||
|
else:
|
||||||
|
if filename is None:
|
||||||
|
raise ValueError("filename is required for file-based session store")
|
||||||
|
self._shelf = shelve.open(filename)
|
||||||
|
|
||||||
|
# dunder magic
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return self._shelf[key]
|
||||||
|
|
||||||
|
def __setitem__(self, key, value):
|
||||||
|
self._shelf[key] = value
|
||||||
|
|
||||||
|
def __delitem__(self, key):
|
||||||
|
del self._shelf[key]
|
||||||
|
|
||||||
|
def __contains__(self, key):
|
||||||
|
return key in self._shelf
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self._shelf)
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self._shelf)
|
||||||
|
|
||||||
|
# everything else
|
||||||
|
def __getattr__(self, name):
|
||||||
|
"""Delegate attribute access to the underlying shelf object."""
|
||||||
|
return getattr(self._shelf, name)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Close the session store."""
|
||||||
|
if hasattr(self._shelf, 'close'):
|
||||||
|
self._shelf.close()
|
||||||
|
if self._in_memory:
|
||||||
|
# For in-memory store, clear the reference
|
||||||
|
self._shelf = None
|
||||||
|
|
||||||
|
def sync(self):
|
||||||
|
"""Synchronize the cache with the underlying storage."""
|
||||||
|
if hasattr(self._shelf, 'sync'):
|
||||||
|
self._shelf.sync()
|
||||||
|
|
||||||
def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes:
|
def extract_euiccSigned1(authenticateServerResponse: bytes) -> bytes:
|
||||||
"""Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This
|
"""Extract the raw, DER-encoded binary euiccSigned1 field from the given AuthenticateServerResponse. This
|
||||||
|
|||||||
@@ -334,7 +334,7 @@ class File:
|
|||||||
self.fill_pattern = pefi['fillPattern']
|
self.fill_pattern = pefi['fillPattern']
|
||||||
self.fill_pattern_repeat = False
|
self.fill_pattern_repeat = False
|
||||||
elif fdb_dec['file_type'] == 'df':
|
elif fdb_dec['file_type'] == 'df':
|
||||||
# only set it, if an earlier call to from_template() didn't alrady set it, as
|
# only set it, if an earlier call to from_template() didn't already set it, as
|
||||||
# the template can differentiate between MF, DF and ADF (unlike FDB)
|
# the template can differentiate between MF, DF and ADF (unlike FDB)
|
||||||
if not self.file_type:
|
if not self.file_type:
|
||||||
self.file_type = 'DF'
|
self.file_type = 'DF'
|
||||||
@@ -427,7 +427,7 @@ class File:
|
|||||||
|
|
||||||
class ProfileElement:
|
class ProfileElement:
|
||||||
"""Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly,
|
"""Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly,
|
||||||
but ist more likely sub-classed with a specific class for the specific profile element type, like e.g
|
but it's more likely sub-classed with a specific class for the specific profile element type, like e.g
|
||||||
ProfileElementHeader, ProfileElementMF, ...
|
ProfileElementHeader, ProfileElementMF, ...
|
||||||
"""
|
"""
|
||||||
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
|
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
|
||||||
@@ -440,7 +440,7 @@ class ProfileElement:
|
|||||||
'genericFileManagement': 'gfm-header',
|
'genericFileManagement': 'gfm-header',
|
||||||
'akaParameter': 'aka-header',
|
'akaParameter': 'aka-header',
|
||||||
'cdmaParameter': 'cdma-header',
|
'cdmaParameter': 'cdma-header',
|
||||||
# note how they couldn't even consistently captialize the 'header' suffix :(
|
# note how they couldn't even consistently capitalize the 'header' suffix :(
|
||||||
'application': 'app-Header',
|
'application': 'app-Header',
|
||||||
'pukCodes': 'puk-Header',
|
'pukCodes': 'puk-Header',
|
||||||
'pinCodes': 'pin-Header',
|
'pinCodes': 'pin-Header',
|
||||||
@@ -628,7 +628,7 @@ class FsProfileElement(ProfileElement):
|
|||||||
# this is a template that belongs into the [A]DF of another template
|
# this is a template that belongs into the [A]DF of another template
|
||||||
# 1) find the PE for the referenced template
|
# 1) find the PE for the referenced template
|
||||||
parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid)
|
parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid)
|
||||||
# 2) resolve te [A]DF that forms the base of that parent PE
|
# 2) resolve the [A]DF that forms the base of that parent PE
|
||||||
pe_df = parent_pe.files[template.parent.base_df().pe_name].node
|
pe_df = parent_pe.files[template.parent.base_df().pe_name].node
|
||||||
self.pe_sequence.cur_df = pe_df
|
self.pe_sequence.cur_df = pe_df
|
||||||
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
|
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
|
||||||
@@ -649,7 +649,7 @@ class FsProfileElement(ProfileElement):
|
|||||||
self.add_file(file)
|
self.add_file(file)
|
||||||
|
|
||||||
def create_file(self, pename: str) -> File:
|
def create_file(self, pename: str) -> File:
|
||||||
"""Programatically create a file by its PE-Name."""
|
"""Programmatically create a file by its PE-Name."""
|
||||||
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
||||||
file = File(pename, None, template.files_by_pename.get(pename, None))
|
file = File(pename, None, template.files_by_pename.get(pename, None))
|
||||||
self.add_file(file)
|
self.add_file(file)
|
||||||
@@ -1409,7 +1409,7 @@ class ProfileElementHeader(ProfileElement):
|
|||||||
iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None,
|
iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
"""You would usually initialize an instance either with a "decoded" argument (as read from
|
"""You would usually initialize an instance either with a "decoded" argument (as read from
|
||||||
a DER-encoded SAIP file via asn1tools), or [some of] the othe arguments in case you're
|
a DER-encoded SAIP file via asn1tools), or [some of] the other arguments in case you're
|
||||||
constructing a Profile Header from scratch.
|
constructing a Profile Header from scratch.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -1562,7 +1562,7 @@ class ProfileElementSequence:
|
|||||||
|
|
||||||
def _rebuild_pes_by_naa(self) -> None:
|
def _rebuild_pes_by_naa(self) -> None:
|
||||||
"""rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form,
|
"""rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form,
|
||||||
which basically means for every NAA there's a lsit of instances, and each consists
|
which basically means for every NAA there's a list of instances, and each consists
|
||||||
of a list of a list of PEs."""
|
of a list of a list of PEs."""
|
||||||
self.pres_by_naa = {}
|
self.pres_by_naa = {}
|
||||||
petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end']
|
petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end']
|
||||||
@@ -1690,7 +1690,7 @@ class ProfileElementSequence:
|
|||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
def get_index_by_pe(self, pe: ProfileElement) -> int:
|
def get_index_by_pe(self, pe: ProfileElement) -> int:
|
||||||
"""Return a list with the indicies of all instances of PEs of petype."""
|
"""Return a list with the indices of all instances of PEs of petype."""
|
||||||
ret = []
|
ret = []
|
||||||
i = 0
|
i = 0
|
||||||
for cur in self.pe_list:
|
for cur in self.pe_list:
|
||||||
@@ -1711,7 +1711,7 @@ class ProfileElementSequence:
|
|||||||
self.insert_at_index(idx+1, pe_new)
|
self.insert_at_index(idx+1, pe_new)
|
||||||
|
|
||||||
def get_index_by_type(self, petype: str) -> List[int]:
|
def get_index_by_type(self, petype: str) -> List[int]:
|
||||||
"""Return a list with the indicies of all instances of PEs of petype."""
|
"""Return a list with the indices of all instances of PEs of petype."""
|
||||||
ret = []
|
ret = []
|
||||||
i = 0
|
i = 0
|
||||||
for pe in self.pe_list:
|
for pe in self.pe_list:
|
||||||
@@ -1736,7 +1736,7 @@ class ProfileElementSequence:
|
|||||||
for service in naa.mandatory_services:
|
for service in naa.mandatory_services:
|
||||||
if service in hdr.decoded['eUICC-Mandatory-services']:
|
if service in hdr.decoded['eUICC-Mandatory-services']:
|
||||||
del hdr.decoded['eUICC-Mandatory-services'][service]
|
del hdr.decoded['eUICC-Mandatory-services'][service]
|
||||||
# remove any associaed mandatory filesystem templates
|
# remove any associated mandatory filesystem templates
|
||||||
for template in naa.templates:
|
for template in naa.templates:
|
||||||
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
|
if template in hdr.decoded['eUICC-Mandatory-GFSTEList']:
|
||||||
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
|
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
|
||||||
@@ -2040,7 +2040,8 @@ class FsNodeADF(FsNodeDF):
|
|||||||
super().__init__(fid, parent, file, name)
|
super().__init__(fid, parent, file, name)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name))
|
# self.df_name is usually None for an ADF like ADF.USIM or ADF.ISIM so we need to guard against it
|
||||||
|
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name) if self.df_name else None)
|
||||||
|
|
||||||
class FsNodeMF(FsNodeDF):
|
class FsNodeMF(FsNodeDF):
|
||||||
"""The MF (Master File) in the filesystem hierarchy."""
|
"""The MF (Master File) in the filesystem hierarchy."""
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ class CheckBasicStructure(ProfileConstraintChecker):
|
|||||||
|
|
||||||
def check_optional_ordering(self, pes: ProfileElementSequence):
|
def check_optional_ordering(self, pes: ProfileElementSequence):
|
||||||
"""Check the ordering of optional PEs following the respective mandatory ones."""
|
"""Check the ordering of optional PEs following the respective mandatory ones."""
|
||||||
# ordering and required depenencies
|
# ordering and required dependencies
|
||||||
self._is_after_if_exists(pes,'opt-usim', 'usim')
|
self._is_after_if_exists(pes,'opt-usim', 'usim')
|
||||||
self._is_after_if_exists(pes,'opt-isim', 'isim')
|
self._is_after_if_exists(pes,'opt-isim', 'isim')
|
||||||
self._is_after_if_exists(pes,'gsm-access', 'usim')
|
self._is_after_if_exists(pes,'gsm-access', 'usim')
|
||||||
|
|||||||
@@ -149,7 +149,7 @@ class CertificateSet:
|
|||||||
check_signed(c, self.root_cert)
|
check_signed(c, self.root_cert)
|
||||||
return
|
return
|
||||||
parent_cert = self.intermediate_certs.get(aki, None)
|
parent_cert = self.intermediate_certs.get(aki, None)
|
||||||
if not aki:
|
if not parent_cert:
|
||||||
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki))
|
raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki))
|
||||||
check_signed(c, parent_cert)
|
check_signed(c, parent_cert)
|
||||||
# if we reach here, we passed (no exception raised)
|
# if we reach here, we passed (no exception raised)
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ from osmocom.utils import h2b, b2h, is_hex, auto_int, auto_uint8, auto_uint16, i
|
|||||||
from osmocom.tlv import bertlv_parse_one
|
from osmocom.tlv import bertlv_parse_one
|
||||||
from osmocom.construct import filter_dict, parse_construct, build_construct
|
from osmocom.construct import filter_dict, parse_construct, build_construct
|
||||||
|
|
||||||
from pySim.utils import sw_match
|
from pySim.utils import sw_match, decomposeATR
|
||||||
from pySim.jsonpath import js_path_modify
|
from pySim.jsonpath import js_path_modify
|
||||||
from pySim.commands import SimCardCommands
|
from pySim.commands import SimCardCommands
|
||||||
from pySim.exceptions import SwMatchError
|
from pySim.exceptions import SwMatchError
|
||||||
@@ -86,7 +86,7 @@ class CardFile:
|
|||||||
self.service = service
|
self.service = service
|
||||||
self.shell_commands = [] # type: List[CommandSet]
|
self.shell_commands = [] # type: List[CommandSet]
|
||||||
|
|
||||||
# Note: the basic properties (fid, name, ect.) are verified when
|
# Note: the basic properties (fid, name, etc.) are verified when
|
||||||
# the file is attached to a parent file. See method add_file() in
|
# the file is attached to a parent file. See method add_file() in
|
||||||
# class Card DF
|
# class Card DF
|
||||||
|
|
||||||
@@ -266,7 +266,7 @@ class CardFile:
|
|||||||
def get_profile(self):
|
def get_profile(self):
|
||||||
"""Get the profile associated with this file. If this file does not have any
|
"""Get the profile associated with this file. If this file does not have any
|
||||||
profile assigned, try to find a file above (usually the MF) in the filesystem
|
profile assigned, try to find a file above (usually the MF) in the filesystem
|
||||||
hirarchy that has a profile assigned
|
hierarchy that has a profile assigned
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# If we have a profile set, return it
|
# If we have a profile set, return it
|
||||||
@@ -679,7 +679,7 @@ class TransparentEF(CardEF):
|
|||||||
Args:
|
Args:
|
||||||
fid : File Identifier (4 hex digits)
|
fid : File Identifier (4 hex digits)
|
||||||
sfid : Short File Identifier (2 hex digits, optional)
|
sfid : Short File Identifier (2 hex digits, optional)
|
||||||
name : Brief name of the file, lik EF_ICCID
|
name : Brief name of the file, like EF_ICCID
|
||||||
desc : Description of the file
|
desc : Description of the file
|
||||||
parent : Parent CardFile object within filesystem hierarchy
|
parent : Parent CardFile object within filesystem hierarchy
|
||||||
size : tuple of (minimum_size, recommended_size)
|
size : tuple of (minimum_size, recommended_size)
|
||||||
@@ -982,11 +982,11 @@ class LinFixedEF(CardEF):
|
|||||||
Args:
|
Args:
|
||||||
fid : File Identifier (4 hex digits)
|
fid : File Identifier (4 hex digits)
|
||||||
sfid : Short File Identifier (2 hex digits, optional)
|
sfid : Short File Identifier (2 hex digits, optional)
|
||||||
name : Brief name of the file, lik EF_ICCID
|
name : Brief name of the file, like EF_ICCID
|
||||||
desc : Description of the file
|
desc : Description of the file
|
||||||
parent : Parent CardFile object within filesystem hierarchy
|
parent : Parent CardFile object within filesystem hierarchy
|
||||||
rec_len : Tuple of (minimum_length, recommended_length)
|
rec_len : Tuple of (minimum_length, recommended_length)
|
||||||
leftpad: On write, data must be padded from the left to fit pysical record length
|
leftpad: On write, data must be padded from the left to fit physical record length
|
||||||
"""
|
"""
|
||||||
super().__init__(fid=fid, sfid=sfid, name=name, desc=desc, parent=parent, **kwargs)
|
super().__init__(fid=fid, sfid=sfid, name=name, desc=desc, parent=parent, **kwargs)
|
||||||
self.rec_len = rec_len
|
self.rec_len = rec_len
|
||||||
@@ -1422,7 +1422,7 @@ class BerTlvEF(CardEF):
|
|||||||
Args:
|
Args:
|
||||||
fid : File Identifier (4 hex digits)
|
fid : File Identifier (4 hex digits)
|
||||||
sfid : Short File Identifier (2 hex digits, optional)
|
sfid : Short File Identifier (2 hex digits, optional)
|
||||||
name : Brief name of the file, lik EF_ICCID
|
name : Brief name of the file, like EF_ICCID
|
||||||
desc : Description of the file
|
desc : Description of the file
|
||||||
parent : Parent CardFile object within filesystem hierarchy
|
parent : Parent CardFile object within filesystem hierarchy
|
||||||
size : tuple of (minimum_size, recommended_size)
|
size : tuple of (minimum_size, recommended_size)
|
||||||
@@ -1455,7 +1455,7 @@ class BerTlvEF(CardEF):
|
|||||||
export_str += "delete_all\n"
|
export_str += "delete_all\n"
|
||||||
for t in tags:
|
for t in tags:
|
||||||
result = lchan.retrieve_data(t)
|
result = lchan.retrieve_data(t)
|
||||||
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
|
(tag, l, val, remainder) = bertlv_parse_one(h2b(result[0]))
|
||||||
export_str += ("set_data 0x%02x %s\n" % (t, b2h(val)))
|
export_str += ("set_data 0x%02x %s\n" % (t, b2h(val)))
|
||||||
return export_str.strip()
|
return export_str.strip()
|
||||||
|
|
||||||
@@ -1495,7 +1495,7 @@ class CardApplication:
|
|||||||
self.name = name
|
self.name = name
|
||||||
self.adf = adf
|
self.adf = adf
|
||||||
self.sw = sw or {}
|
self.sw = sw or {}
|
||||||
# back-reference from ADF to Applicaiton
|
# back-reference from ADF to Application
|
||||||
if self.adf:
|
if self.adf:
|
||||||
self.aid = aid or self.adf.aid
|
self.aid = aid or self.adf.aid
|
||||||
self.adf.application = self
|
self.adf.application = self
|
||||||
@@ -1545,6 +1545,13 @@ class CardModel(abc.ABC):
|
|||||||
if atr == card_atr:
|
if atr == card_atr:
|
||||||
print("Detected CardModel:", cls.__name__)
|
print("Detected CardModel:", cls.__name__)
|
||||||
return True
|
return True
|
||||||
|
# if nothing found try to just compare the Historical Bytes of the ATR
|
||||||
|
card_atr_hb = decomposeATR(card_atr)['hb']
|
||||||
|
for atr in cls._atrs:
|
||||||
|
atr_hb = decomposeATR(atr)['hb']
|
||||||
|
if atr_hb == card_atr_hb:
|
||||||
|
print("Detected CardModel:", cls.__name__)
|
||||||
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -1565,7 +1572,7 @@ class Path:
|
|||||||
p = p.split('/')
|
p = p.split('/')
|
||||||
elif len(p) and isinstance(p[0], int):
|
elif len(p) and isinstance(p[0], int):
|
||||||
p = ['%04x' % x for x in p]
|
p = ['%04x' % x for x in p]
|
||||||
# make sure internal representation alwas is uppercase only
|
# make sure internal representation always is uppercase only
|
||||||
self.list = [x.upper() for x in p]
|
self.list = [x.upper() for x in p]
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
|
|||||||
@@ -627,7 +627,7 @@ class ADF_SD(CardADF):
|
|||||||
kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b''
|
kcv_bin = compute_kcv(opts.key_type[i], h2b(opts.key_data[i])) or b''
|
||||||
kcv = b2h(kcv_bin)
|
kcv = b2h(kcv_bin)
|
||||||
if self._cmd.lchan.scc.scp:
|
if self._cmd.lchan.scc.scp:
|
||||||
# encrypte key data with DEK of current SCP
|
# encrypted key data with DEK of current SCP
|
||||||
kcb = b2h(self._cmd.lchan.scc.scp.encrypt_key(h2b(opts.key_data[i])))
|
kcb = b2h(self._cmd.lchan.scc.scp.encrypt_key(h2b(opts.key_data[i])))
|
||||||
else:
|
else:
|
||||||
# (for example) during personalization, DEK might not be required)
|
# (for example) during personalization, DEK might not be required)
|
||||||
@@ -755,7 +755,7 @@ class ADF_SD(CardADF):
|
|||||||
|
|
||||||
inst_load_parser = argparse.ArgumentParser()
|
inst_load_parser = argparse.ArgumentParser()
|
||||||
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
|
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
|
||||||
help='AID of the loded file')
|
help='AID of the loaded file')
|
||||||
inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='',
|
inst_load_parser.add_argument('--security-domain-aid', type=is_hexstr, default='',
|
||||||
help='AID of the Security Domain into which the file shalle be added')
|
help='AID of the Security Domain into which the file shalle be added')
|
||||||
inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='',
|
inst_load_parser.add_argument('--load-file-hash', type=is_hexstr, default='',
|
||||||
@@ -845,7 +845,7 @@ class ADF_SD(CardADF):
|
|||||||
# TODO:tune chunk_len based on the overhead of the used SCP?
|
# TODO:tune chunk_len based on the overhead of the used SCP?
|
||||||
# build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
|
# build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
|
||||||
remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
|
remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
|
||||||
# transfer this in vaious chunks to the card
|
# transfer this in various chunks to the card
|
||||||
total_size = len(remainder)
|
total_size = len(remainder)
|
||||||
block_nr = 0
|
block_nr = 0
|
||||||
while len(remainder):
|
while len(remainder):
|
||||||
|
|||||||
@@ -104,4 +104,4 @@ class UiccSdInstallParams(TLV_IE_Collection, nested=[UiccScp, AcceptExtradAppsAn
|
|||||||
# KID 0x02: SK.CASD.AUT (PK) and KS.CASD.AUT (Non-PK)
|
# KID 0x02: SK.CASD.AUT (PK) and KS.CASD.AUT (Non-PK)
|
||||||
# KID 0x03: SK.CASD.CT (P) and KS.CASD.CT (Non-PK)
|
# KID 0x03: SK.CASD.CT (P) and KS.CASD.CT (Non-PK)
|
||||||
# KVN 0x75 KID 0x01: 16-byte DES key for Ciphered Load File Data Block
|
# KVN 0x75 KID 0x01: 16-byte DES key for Ciphered Load File Data Block
|
||||||
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s upport
|
# KVN 0xFF reserved for ISD with SCP02 without SCP80 s support
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ class CapFile():
|
|||||||
raise ValueError("invalid cap file, %s missing!" % required_components[component])
|
raise ValueError("invalid cap file, %s missing!" % required_components[component])
|
||||||
|
|
||||||
def get_loadfile(self) -> bytes:
|
def get_loadfile(self) -> bytes:
|
||||||
"""Get the executeable loadfile as hexstring"""
|
"""Get the executable loadfile as hexstring"""
|
||||||
# Concatenate all cap file components in the specified order
|
# Concatenate all cap file components in the specified order
|
||||||
# see also: Java Card Platform Virtual Machine Specification, v3.2, section 6.3
|
# see also: Java Card Platform Virtual Machine Specification, v3.2, section 6.3
|
||||||
loadfile = self.__component['Header']
|
loadfile = self.__component['Header']
|
||||||
|
|||||||
@@ -495,7 +495,7 @@ class IsimCard(UiccCardBase):
|
|||||||
|
|
||||||
class MagicSimBase(abc.ABC, SimCard):
|
class MagicSimBase(abc.ABC, SimCard):
|
||||||
"""
|
"""
|
||||||
Theses cards uses several record based EFs to store the provider infos,
|
These cards uses several record based EFs to store the provider infos,
|
||||||
each possible provider uses a specific record number in each EF. The
|
each possible provider uses a specific record number in each EF. The
|
||||||
indexes used are ( where N is the number of providers supported ) :
|
indexes used are ( where N is the number of providers supported ) :
|
||||||
- [2 .. N+1] for the operator name
|
- [2 .. N+1] for the operator name
|
||||||
@@ -644,7 +644,7 @@ class MagicSim(MagicSimBase):
|
|||||||
|
|
||||||
class FakeMagicSim(SimCard):
|
class FakeMagicSim(SimCard):
|
||||||
"""
|
"""
|
||||||
Theses cards have a record based EF 3f00/000c that contains the provider
|
These cards have a record based EF 3f00/000c that contains the provider
|
||||||
information. See the program method for its format. The records go from
|
information. See the program method for its format. The records go from
|
||||||
1 to N.
|
1 to N.
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -296,7 +296,7 @@ def dec_addr_tlv(hexstr):
|
|||||||
|
|
||||||
elif addr_type == 0x01: # IPv4
|
elif addr_type == 0x01: # IPv4
|
||||||
# Skip address tye byte i.e. first byte in value list
|
# Skip address tye byte i.e. first byte in value list
|
||||||
# Skip the unused byte in Octect 4 after address type byte as per 3GPP TS 31.102
|
# Skip the unused byte in Octet 4 after address type byte as per 3GPP TS 31.102
|
||||||
ipv4 = tlv[2][2:]
|
ipv4 = tlv[2][2:]
|
||||||
content = '.'.join(str(x) for x in ipv4)
|
content = '.'.join(str(x) for x in ipv4)
|
||||||
return (content, '01')
|
return (content, '01')
|
||||||
|
|||||||
125
pySim/log.py
Normal file
125
pySim/log.py
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
""" pySim: Logging
|
||||||
|
"""
|
||||||
|
|
||||||
|
#
|
||||||
|
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||||
|
# All Rights Reserved
|
||||||
|
#
|
||||||
|
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from cmd2 import style
|
||||||
|
|
||||||
|
class _PySimLogHandler(logging.Handler):
|
||||||
|
def __init__(self, log_callback):
|
||||||
|
super().__init__()
|
||||||
|
self.log_callback = log_callback
|
||||||
|
|
||||||
|
def emit(self, record):
|
||||||
|
formatted_message = self.format(record)
|
||||||
|
self.log_callback(formatted_message, record)
|
||||||
|
|
||||||
|
class PySimLogger:
|
||||||
|
"""
|
||||||
|
Static class to centralize the log output of PySim applications. This class can be used to print log messages from
|
||||||
|
any pySim module. Configuration of the log behaviour (see setup and set_ methods) is entirely optional. In case no
|
||||||
|
print callback is set (see setup method), the logger will pass the log messages directly to print() without applying
|
||||||
|
any formatting to the original log message.
|
||||||
|
"""
|
||||||
|
|
||||||
|
LOG_FMTSTR = "%(levelname)s: %(message)s"
|
||||||
|
LOG_FMTSTR_VERBOSE = "%(module)s.%(lineno)d -- %(name)s - " + LOG_FMTSTR
|
||||||
|
__formatter = logging.Formatter(LOG_FMTSTR)
|
||||||
|
__formatter_verbose = logging.Formatter(LOG_FMTSTR_VERBOSE)
|
||||||
|
|
||||||
|
# No print callback by default, means that log messages are passed directly to print()
|
||||||
|
print_callback = None
|
||||||
|
|
||||||
|
# No specific color scheme by default
|
||||||
|
colors = {}
|
||||||
|
|
||||||
|
# The logging default is non-verbose logging on logging level DEBUG. This is a safe default that works for
|
||||||
|
# applications that ignore the presence of the PySimLogger class.
|
||||||
|
verbose = False
|
||||||
|
logging.root.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
raise RuntimeError('static class, do not instantiate')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def setup(print_callback = None, colors:dict = {}):
|
||||||
|
"""
|
||||||
|
Set a print callback function and color scheme. This function call is optional. In case this method is not
|
||||||
|
called, default settings apply.
|
||||||
|
Args:
|
||||||
|
print_callback : A callback function that accepts the resulting log string as input. The callback should
|
||||||
|
have the following format: print_callback(message:str)
|
||||||
|
colors : An optional dict through which certain log levels can be assigned a color.
|
||||||
|
(e.g. {logging.WARN: YELLOW})
|
||||||
|
"""
|
||||||
|
PySimLogger.print_callback = print_callback
|
||||||
|
PySimLogger.colors = colors
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def set_verbose(verbose:bool = False):
|
||||||
|
"""
|
||||||
|
Enable/disable verbose logging. (has no effect in case no print callback is set, see method setup)
|
||||||
|
Args:
|
||||||
|
verbose: verbosity (True = verbose logging, False = normal logging)
|
||||||
|
"""
|
||||||
|
PySimLogger.verbose = verbose;
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def set_level(level:int = logging.DEBUG):
|
||||||
|
"""
|
||||||
|
Set the logging level.
|
||||||
|
Args:
|
||||||
|
level: Logging level, valis log leves are: DEBUG, INFO, WARNING, ERROR and CRITICAL
|
||||||
|
"""
|
||||||
|
logging.root.setLevel(level)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _log_callback(message, record):
|
||||||
|
if not PySimLogger.print_callback:
|
||||||
|
# In case no print callback has been set display the message as if it were printed trough a normal
|
||||||
|
# python print statement.
|
||||||
|
print(record.message)
|
||||||
|
else:
|
||||||
|
# When a print callback is set, use it to display the log line. Apply color if the API user chose one
|
||||||
|
if PySimLogger.verbose:
|
||||||
|
formatted_message = logging.Formatter.format(PySimLogger.__formatter_verbose, record)
|
||||||
|
else:
|
||||||
|
formatted_message = logging.Formatter.format(PySimLogger.__formatter, record)
|
||||||
|
color = PySimLogger.colors.get(record.levelno)
|
||||||
|
if color:
|
||||||
|
PySimLogger.print_callback(style(formatted_message, fg = color))
|
||||||
|
else:
|
||||||
|
PySimLogger.print_callback(formatted_message)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get(log_facility: str):
|
||||||
|
"""
|
||||||
|
Set up and return a new python logger object
|
||||||
|
Args:
|
||||||
|
log_facility : Name of log facility (e.g. "MAIN", "RUNTIME"...)
|
||||||
|
"""
|
||||||
|
logger = logging.getLogger(log_facility)
|
||||||
|
handler = _PySimLogHandler(log_callback=PySimLogger._log_callback)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
return logger
|
||||||
@@ -110,7 +110,7 @@ class CardProfile:
|
|||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
def _try_match_card(cls, scc: SimCardCommands) -> None:
|
||||||
"""Try to see if the specific profile matches the card. This method is a
|
"""Try to see if the specific profile matches the card. This method is a
|
||||||
placeholder that is overloaded by specific dirived classes. The method
|
placeholder that is overloaded by specific derived classes. The method
|
||||||
actively probes the card to make sure the profile class matches the
|
actively probes the card to make sure the profile class matches the
|
||||||
physical card. This usually also means that the card is reset during
|
physical card. This usually also means that the card is reset during
|
||||||
the process, so this method must not be called at random times. It may
|
the process, so this method must not be called at random times. It may
|
||||||
|
|||||||
@@ -23,6 +23,9 @@ from osmocom.tlv import bertlv_parse_one
|
|||||||
|
|
||||||
from pySim.exceptions import *
|
from pySim.exceptions import *
|
||||||
from pySim.filesystem import *
|
from pySim.filesystem import *
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
|
||||||
|
log = PySimLogger.get("RUNTIME")
|
||||||
|
|
||||||
def lchan_nr_from_cla(cla: int) -> int:
|
def lchan_nr_from_cla(cla: int) -> int:
|
||||||
"""Resolve the logical channel number from the CLA byte."""
|
"""Resolve the logical channel number from the CLA byte."""
|
||||||
@@ -44,6 +47,7 @@ class RuntimeState:
|
|||||||
card : pysim.cards.Card instance
|
card : pysim.cards.Card instance
|
||||||
profile : CardProfile instance
|
profile : CardProfile instance
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.mf = CardMF(profile=profile)
|
self.mf = CardMF(profile=profile)
|
||||||
self.card = card
|
self.card = card
|
||||||
self.profile = profile
|
self.profile = profile
|
||||||
@@ -60,10 +64,13 @@ class RuntimeState:
|
|||||||
self.card.set_apdu_parameter(
|
self.card.set_apdu_parameter(
|
||||||
cla=self.profile.cla, sel_ctrl=self.profile.sel_ctrl)
|
cla=self.profile.cla, sel_ctrl=self.profile.sel_ctrl)
|
||||||
|
|
||||||
|
# make sure MF is selected before probing for Addons
|
||||||
|
self.lchan[0].select('MF')
|
||||||
|
|
||||||
for addon_cls in self.profile.addons:
|
for addon_cls in self.profile.addons:
|
||||||
addon = addon_cls()
|
addon = addon_cls()
|
||||||
if addon.probe(self.card):
|
if addon.probe(self.card):
|
||||||
print("Detected %s Add-on \"%s\"" % (self.profile, addon))
|
log.info("Detected %s Add-on \"%s\"" % (self.profile, addon))
|
||||||
for f in addon.files_in_mf:
|
for f in addon.files_in_mf:
|
||||||
self.mf.add_file(f)
|
self.mf.add_file(f)
|
||||||
|
|
||||||
@@ -97,18 +104,18 @@ class RuntimeState:
|
|||||||
apps_taken = []
|
apps_taken = []
|
||||||
if aids_card:
|
if aids_card:
|
||||||
aids_taken = []
|
aids_taken = []
|
||||||
print("AIDs on card:")
|
log.info("AIDs on card:")
|
||||||
for a in aids_card:
|
for a in aids_card:
|
||||||
for f in apps_profile:
|
for f in apps_profile:
|
||||||
if f.aid in a:
|
if f.aid in a:
|
||||||
print(" %s: %s (EF.DIR)" % (f.name, a))
|
log.info(" %s: %s (EF.DIR)" % (f.name, a))
|
||||||
aids_taken.append(a)
|
aids_taken.append(a)
|
||||||
apps_taken.append(f)
|
apps_taken.append(f)
|
||||||
aids_unknown = set(aids_card) - set(aids_taken)
|
aids_unknown = set(aids_card) - set(aids_taken)
|
||||||
for a in aids_unknown:
|
for a in aids_unknown:
|
||||||
print(" unknown: %s (EF.DIR)" % a)
|
log.info(" unknown: %s (EF.DIR)" % a)
|
||||||
else:
|
else:
|
||||||
print("warning: EF.DIR seems to be empty!")
|
log.warn("EF.DIR seems to be empty!")
|
||||||
|
|
||||||
# Some card applications may not be registered in EF.DIR, we will actively
|
# Some card applications may not be registered in EF.DIR, we will actively
|
||||||
# probe for those applications
|
# probe for those applications
|
||||||
@@ -123,7 +130,7 @@ class RuntimeState:
|
|||||||
_data, sw = self.card.select_adf_by_aid(f.aid)
|
_data, sw = self.card.select_adf_by_aid(f.aid)
|
||||||
self.selected_adf = f
|
self.selected_adf = f
|
||||||
if sw == "9000":
|
if sw == "9000":
|
||||||
print(" %s: %s" % (f.name, f.aid))
|
log.info(" %s: %s" % (f.name, f.aid))
|
||||||
apps_taken.append(f)
|
apps_taken.append(f)
|
||||||
except (SwMatchError, ProtocolError):
|
except (SwMatchError, ProtocolError):
|
||||||
pass
|
pass
|
||||||
@@ -147,7 +154,7 @@ class RuntimeState:
|
|||||||
# select MF to reset internal state and to verify card really works
|
# select MF to reset internal state and to verify card really works
|
||||||
self.lchan[0].select('MF', cmd_app)
|
self.lchan[0].select('MF', cmd_app)
|
||||||
self.lchan[0].selected_adf = None
|
self.lchan[0].selected_adf = None
|
||||||
# store ATR as part of our card identies dict
|
# store ATR as part of our card identities dict
|
||||||
self.identity['ATR'] = atr
|
self.identity['ATR'] = atr
|
||||||
return atr
|
return atr
|
||||||
|
|
||||||
@@ -321,7 +328,7 @@ class RuntimeLchan:
|
|||||||
# If we succeed, we know that the file exists on the card and we may
|
# If we succeed, we know that the file exists on the card and we may
|
||||||
# proceed with creating a new CardEF object in the local file model at
|
# proceed with creating a new CardEF object in the local file model at
|
||||||
# run time. In case the file does not exist on the card, we just abort.
|
# run time. In case the file does not exist on the card, we just abort.
|
||||||
# The state on the card (selected file/application) wont't be changed,
|
# The state on the card (selected file/application) won't be changed,
|
||||||
# so we do not have to update any state in that case.
|
# so we do not have to update any state in that case.
|
||||||
(data, _sw) = self.scc.select_file(fid)
|
(data, _sw) = self.scc.select_file(fid)
|
||||||
except SwMatchError as swm:
|
except SwMatchError as swm:
|
||||||
@@ -507,6 +514,47 @@ class RuntimeLchan:
|
|||||||
dec_data = self.selected_file.decode_hex(data)
|
dec_data = self.selected_file.decode_hex(data)
|
||||||
return (dec_data, sw)
|
return (dec_data, sw)
|
||||||
|
|
||||||
|
def __get_writeable_size(self):
|
||||||
|
""" Determine the writable size (file or record) using the cached FCP parameters of the currently selected
|
||||||
|
file. Return None in case the writeable size cannot be determined (no FCP available, FCP lacks size
|
||||||
|
information).
|
||||||
|
"""
|
||||||
|
fcp = self.selected_file_fcp
|
||||||
|
if not fcp:
|
||||||
|
return None
|
||||||
|
|
||||||
|
structure = fcp.get('file_descriptor', {}).get('file_descriptor_byte', {}).get('structure')
|
||||||
|
if not structure:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if structure == 'transparent':
|
||||||
|
return fcp.get('file_size')
|
||||||
|
elif structure == 'linear_fixed':
|
||||||
|
return fcp.get('file_descriptor', {}).get('record_len')
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __check_writeable_size(self, data_len):
|
||||||
|
""" Guard against unsuccessful writes caused by attempts to write data that exceeds the file limits. """
|
||||||
|
|
||||||
|
writeable_size = self.__get_writeable_size()
|
||||||
|
if not writeable_size:
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(self.selected_file, TransparentEF):
|
||||||
|
writeable_name = "file"
|
||||||
|
elif isinstance(self.selected_file, LinFixedEF):
|
||||||
|
writeable_name = "record"
|
||||||
|
else:
|
||||||
|
writeable_name = "object"
|
||||||
|
|
||||||
|
if data_len > writeable_size:
|
||||||
|
raise TypeError("Data length (%u) exceeds %s size (%u) by %u bytes" %
|
||||||
|
(data_len, writeable_name, writeable_size, data_len - writeable_size))
|
||||||
|
elif data_len < writeable_size:
|
||||||
|
log.warn("Data length (%u) less than %s size (%u), leaving %u unwritten bytes at the end of the %s" %
|
||||||
|
(data_len, writeable_name, writeable_size, writeable_size - data_len, writeable_name))
|
||||||
|
|
||||||
def update_binary(self, data_hex: str, offset: int = 0):
|
def update_binary(self, data_hex: str, offset: int = 0):
|
||||||
"""Update transparent EF binary data.
|
"""Update transparent EF binary data.
|
||||||
|
|
||||||
@@ -517,6 +565,7 @@ class RuntimeLchan:
|
|||||||
if not isinstance(self.selected_file, TransparentEF):
|
if not isinstance(self.selected_file, TransparentEF):
|
||||||
raise TypeError("Only works with TransparentEF, but %s is %s" % (self.selected_file,
|
raise TypeError("Only works with TransparentEF, but %s is %s" % (self.selected_file,
|
||||||
self.selected_file.__class__.__mro__))
|
self.selected_file.__class__.__mro__))
|
||||||
|
self.__check_writeable_size(len(data_hex) // 2 + offset)
|
||||||
return self.scc.update_binary(self.selected_file.fid, data_hex, offset, conserve=self.rs.conserve_write)
|
return self.scc.update_binary(self.selected_file.fid, data_hex, offset, conserve=self.rs.conserve_write)
|
||||||
|
|
||||||
def update_binary_dec(self, data: dict):
|
def update_binary_dec(self, data: dict):
|
||||||
@@ -564,6 +613,7 @@ class RuntimeLchan:
|
|||||||
if not isinstance(self.selected_file, LinFixedEF):
|
if not isinstance(self.selected_file, LinFixedEF):
|
||||||
raise TypeError("Only works with Linear Fixed EF, but %s is %s" % (self.selected_file,
|
raise TypeError("Only works with Linear Fixed EF, but %s is %s" % (self.selected_file,
|
||||||
self.selected_file.__class__.__mro__))
|
self.selected_file.__class__.__mro__))
|
||||||
|
self.__check_writeable_size(len(data_hex) // 2)
|
||||||
return self.scc.update_record(self.selected_file.fid, rec_nr, data_hex,
|
return self.scc.update_record(self.selected_file.fid, rec_nr, data_hex,
|
||||||
conserve=self.rs.conserve_write,
|
conserve=self.rs.conserve_write,
|
||||||
leftpad=self.selected_file.leftpad)
|
leftpad=self.selected_file.leftpad)
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ class SecureChannel(abc.ABC):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def send_apdu_wrapper(self, send_fn: callable, pdu: Hexstr, *args, **kwargs) -> ResTuple:
|
def send_apdu_wrapper(self, send_fn: callable, pdu: Hexstr, *args, **kwargs) -> ResTuple:
|
||||||
"""Wrapper function to wrap command APDU and unwrap repsonse APDU around send_apdu callable."""
|
"""Wrapper function to wrap command APDU and unwrap response APDU around send_apdu callable."""
|
||||||
pdu_wrapped = b2h(self.wrap_cmd_apdu(h2b(pdu)))
|
pdu_wrapped = b2h(self.wrap_cmd_apdu(h2b(pdu)))
|
||||||
res, sw = send_fn(pdu_wrapped, *args, **kwargs)
|
res, sw = send_fn(pdu_wrapped, *args, **kwargs)
|
||||||
res_unwrapped = b2h(self.unwrap_rsp_apdu(h2b(sw), h2b(res)))
|
res_unwrapped = b2h(self.unwrap_rsp_apdu(h2b(sw), h2b(res)))
|
||||||
|
|||||||
@@ -200,7 +200,7 @@ class LinkBase(abc.ABC):
|
|||||||
# It *was* successful after all -- the extra pieces FETCH handled
|
# It *was* successful after all -- the extra pieces FETCH handled
|
||||||
# need not concern the caller.
|
# need not concern the caller.
|
||||||
rv = (rv[0], '9000')
|
rv = (rv[0], '9000')
|
||||||
# proactive sim as per TS 102 221 Setion 7.4.2
|
# proactive sim as per TS 102 221 Section 7.4.2
|
||||||
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
|
# TODO: Check SW manually to avoid recursing on the stack (provided this piece of code stays in this place)
|
||||||
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
|
fetch_rv = self.send_apdu_checksw('80120000' + last_sw[2:], sw)
|
||||||
# Setting this in case we later decide not to send a terminal
|
# Setting this in case we later decide not to send a terminal
|
||||||
@@ -228,7 +228,7 @@ class LinkBase(abc.ABC):
|
|||||||
# Structure as per TS 102 223 V4.4.0 Section 6.8
|
# Structure as per TS 102 223 V4.4.0 Section 6.8
|
||||||
|
|
||||||
# Testing hint: The value of tail does not influence the behavior
|
# Testing hint: The value of tail does not influence the behavior
|
||||||
# of an SJA2 that sent ans SMS, so this is implemented only
|
# of an SJA2 that sent an SMS, so this is implemented only
|
||||||
# following TS 102 223, and not fully tested.
|
# following TS 102 223, and not fully tested.
|
||||||
ti_list_bin = [x.to_tlv() for x in ti_list]
|
ti_list_bin = [x.to_tlv() for x in ti_list]
|
||||||
tail = b''.join(ti_list_bin)
|
tail = b''.join(ti_list_bin)
|
||||||
|
|||||||
@@ -208,7 +208,7 @@ EF_5G_PROSE_ST_map = {
|
|||||||
5: '5G ProSe configuration data for usage information reporting',
|
5: '5G ProSe configuration data for usage information reporting',
|
||||||
}
|
}
|
||||||
|
|
||||||
# Mapping between USIM Enbled Service Number and its description
|
# Mapping between USIM Enabled Service Number and its description
|
||||||
EF_EST_map = {
|
EF_EST_map = {
|
||||||
1: 'Fixed Dialling Numbers (FDN)',
|
1: 'Fixed Dialling Numbers (FDN)',
|
||||||
2: 'Barred Dialling Numbers (BDN)',
|
2: 'Barred Dialling Numbers (BDN)',
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ class EF_AC_GBAUAPI(LinFixedEF):
|
|||||||
"""The use of this EF is eescribed in 3GPP TS 31.130"""
|
"""The use of this EF is eescribed in 3GPP TS 31.130"""
|
||||||
class AppletNafAccessControl(BER_TLV_IE, tag=0x80):
|
class AppletNafAccessControl(BER_TLV_IE, tag=0x80):
|
||||||
# the use of Int8ub as length field in Prefixed is strictly speaking incorrect, as it is a BER-TLV
|
# the use of Int8ub as length field in Prefixed is strictly speaking incorrect, as it is a BER-TLV
|
||||||
# length field whihc will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
|
# length field which will consume two bytes from length > 127 bytes. However, AIDs and NAF IDs can
|
||||||
# safely be assumed shorter than that
|
# safely be assumed shorter than that
|
||||||
_construct = Struct('aid'/Prefixed(Int8ub, GreedyBytes),
|
_construct = Struct('aid'/Prefixed(Int8ub, GreedyBytes),
|
||||||
'naf_id'/Prefixed(Int8ub, GreedyBytes))
|
'naf_id'/Prefixed(Int8ub, GreedyBytes))
|
||||||
|
|||||||
@@ -1007,7 +1007,7 @@ class EF_ICCID(TransparentEF):
|
|||||||
def _encode_hex(self, abstract, **kwargs):
|
def _encode_hex(self, abstract, **kwargs):
|
||||||
return enc_iccid(abstract['iccid'])
|
return enc_iccid(abstract['iccid'])
|
||||||
|
|
||||||
# TS 102 221 Section 13.3 / TS 31.101 Secction 13 / TS 51.011 Section 10.1.2
|
# TS 102 221 Section 13.3 / TS 31.101 Section 13 / TS 51.011 Section 10.1.2
|
||||||
class EF_PL(TransRecEF):
|
class EF_PL(TransRecEF):
|
||||||
_test_de_encode = [
|
_test_de_encode = [
|
||||||
( '6465', "de" ),
|
( '6465', "de" ),
|
||||||
|
|||||||
139
pySim/utils.py
139
pySim/utils.py
@@ -15,6 +15,7 @@ from osmocom.tlv import bertlv_encode_tag, bertlv_encode_len
|
|||||||
|
|
||||||
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
|
# Copyright (C) 2009-2010 Sylvain Munaut <tnt@246tNt.com>
|
||||||
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
|
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
|
||||||
|
# Copyright (C) 2009-2022 Ludovic Rousseau
|
||||||
#
|
#
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU General Public License as published by
|
# it under the terms of the GNU General Public License as published by
|
||||||
@@ -330,7 +331,7 @@ def derive_mnc(digit1: int, digit2: int, digit3: int = 0x0f) -> int:
|
|||||||
mnc = 0
|
mnc = 0
|
||||||
|
|
||||||
# 3-rd digit is optional for the MNC. If present
|
# 3-rd digit is optional for the MNC. If present
|
||||||
# the algorythm is the same as for the MCC.
|
# the algorithm is the same as for the MCC.
|
||||||
if digit3 != 0x0f:
|
if digit3 != 0x0f:
|
||||||
return derive_mcc(digit1, digit2, digit3)
|
return derive_mcc(digit1, digit2, digit3)
|
||||||
|
|
||||||
@@ -410,7 +411,7 @@ def get_addr_type(addr):
|
|||||||
|
|
||||||
fqdn_flag = True
|
fqdn_flag = True
|
||||||
for i in addr_list:
|
for i in addr_list:
|
||||||
# Only Alpha-numeric characters and hyphen - RFC 1035
|
# Only Alphanumeric characters and hyphen - RFC 1035
|
||||||
import re
|
import re
|
||||||
if not re.match("^[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)?$", i):
|
if not re.match("^[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)?$", i):
|
||||||
fqdn_flag = False
|
fqdn_flag = False
|
||||||
@@ -476,7 +477,7 @@ def expand_hex(hexstring, length):
|
|||||||
"""Expand a given hexstring to a specified length by replacing "." or ".."
|
"""Expand a given hexstring to a specified length by replacing "." or ".."
|
||||||
with a filler that is derived from the neighboring nibbles respective
|
with a filler that is derived from the neighboring nibbles respective
|
||||||
bytes. Usually this will be the nibble respective byte before "." or
|
bytes. Usually this will be the nibble respective byte before "." or
|
||||||
"..", execpt when the string begins with "." or "..", then the nibble
|
"..", except when the string begins with "." or "..", then the nibble
|
||||||
respective byte after "." or ".." is used.". In case the string cannot
|
respective byte after "." or ".." is used.". In case the string cannot
|
||||||
be expanded for some reason, the input string is returned unmodified.
|
be expanded for some reason, the input string is returned unmodified.
|
||||||
|
|
||||||
@@ -585,10 +586,138 @@ def parse_command_apdu(apdu: bytes) -> int:
|
|||||||
raise ValueError('invalid APDU (%s), too short!' % b2h(apdu))
|
raise ValueError('invalid APDU (%s), too short!' % b2h(apdu))
|
||||||
|
|
||||||
|
|
||||||
|
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
|
||||||
|
def normalizeATR(atr):
|
||||||
|
"""Transform an ATR in list of integers.
|
||||||
|
valid input formats are
|
||||||
|
"3B A7 00 40 18 80 65 A2 08 01 01 52"
|
||||||
|
"3B:A7:00:40:18:80:65:A2:08:01:01:52"
|
||||||
|
|
||||||
|
Args:
|
||||||
|
atr: string
|
||||||
|
Returns:
|
||||||
|
list of bytes
|
||||||
|
|
||||||
|
>>> normalize("3B:A7:00:40:18:80:65:A2:08:01:01:52")
|
||||||
|
[59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82]
|
||||||
|
"""
|
||||||
|
atr = atr.replace(":", "")
|
||||||
|
atr = atr.replace(" ", "")
|
||||||
|
|
||||||
|
res = []
|
||||||
|
while len(atr) >= 2:
|
||||||
|
byte, atr = atr[:2], atr[2:]
|
||||||
|
res.append(byte)
|
||||||
|
if len(atr) > 0:
|
||||||
|
raise ValueError("warning: odd string, remainder: %r" % atr)
|
||||||
|
|
||||||
|
atr = [int(x, 16) for x in res]
|
||||||
|
return atr
|
||||||
|
|
||||||
|
|
||||||
|
# ATR handling code under GPL from parseATR: https://github.com/LudovicRousseau/pyscard-contrib
|
||||||
|
def decomposeATR(atr_txt):
|
||||||
|
"""Decompose the ATR in elementary fields
|
||||||
|
|
||||||
|
Args:
|
||||||
|
atr_txt: ATR as a hex bytes string
|
||||||
|
Returns:
|
||||||
|
dictionary of field and values
|
||||||
|
|
||||||
|
>>> decomposeATR("3B A7 00 40 18 80 65 A2 08 01 01 52")
|
||||||
|
{ 'T0': {'value': 167},
|
||||||
|
'TB': {1: {'value': 0}},
|
||||||
|
'TC': {2: {'value': 24}},
|
||||||
|
'TD': {1: {'value': 64}},
|
||||||
|
'TS': {'value': 59},
|
||||||
|
'atr': [59, 167, 0, 64, 24, 128, 101, 162, 8, 1, 1, 82],
|
||||||
|
'hb': {'value': [128, 101, 162, 8, 1, 1, 82]},
|
||||||
|
'hbn': 7}
|
||||||
|
"""
|
||||||
|
ATR_PROTOCOL_TYPE_T0 = 0
|
||||||
|
atr_txt = normalizeATR(atr_txt)
|
||||||
|
atr = {}
|
||||||
|
|
||||||
|
# the ATR itself as a list of integers
|
||||||
|
atr["atr"] = atr_txt
|
||||||
|
|
||||||
|
# store TS and T0
|
||||||
|
atr["TS"] = {"value": atr_txt[0]}
|
||||||
|
TDi = atr_txt[1]
|
||||||
|
atr["T0"] = {"value": TDi}
|
||||||
|
hb_length = TDi & 15
|
||||||
|
pointer = 1
|
||||||
|
# protocol number
|
||||||
|
pn = 1
|
||||||
|
|
||||||
|
# store number of historical bytes
|
||||||
|
atr["hbn"] = TDi & 0xF
|
||||||
|
|
||||||
|
while pointer < len(atr_txt):
|
||||||
|
# Check TAi is present
|
||||||
|
if (TDi | 0xEF) == 0xFF:
|
||||||
|
pointer += 1
|
||||||
|
if "TA" not in atr:
|
||||||
|
atr["TA"] = {}
|
||||||
|
atr["TA"][pn] = {"value": atr_txt[pointer]}
|
||||||
|
|
||||||
|
# Check TBi is present
|
||||||
|
if (TDi | 0xDF) == 0xFF:
|
||||||
|
pointer += 1
|
||||||
|
if "TB" not in atr:
|
||||||
|
atr["TB"] = {}
|
||||||
|
atr["TB"][pn] = {"value": atr_txt[pointer]}
|
||||||
|
|
||||||
|
# Check TCi is present
|
||||||
|
if (TDi | 0xBF) == 0xFF:
|
||||||
|
pointer += 1
|
||||||
|
if "TC" not in atr:
|
||||||
|
atr["TC"] = {}
|
||||||
|
atr["TC"][pn] = {"value": atr_txt[pointer]}
|
||||||
|
|
||||||
|
# Check TDi is present
|
||||||
|
if (TDi | 0x7F) == 0xFF:
|
||||||
|
pointer += 1
|
||||||
|
if "TD" not in atr:
|
||||||
|
atr["TD"] = {}
|
||||||
|
TDi = atr_txt[pointer]
|
||||||
|
atr["TD"][pn] = {"value": TDi}
|
||||||
|
if (TDi & 0x0F) != ATR_PROTOCOL_TYPE_T0:
|
||||||
|
atr["TCK"] = True
|
||||||
|
pn += 1
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Store historical bytes
|
||||||
|
atr["hb"] = {"value": atr_txt[pointer + 1 : pointer + 1 + hb_length]}
|
||||||
|
|
||||||
|
# Store TCK
|
||||||
|
last = pointer + 1 + hb_length
|
||||||
|
if "TCK" in atr:
|
||||||
|
try:
|
||||||
|
atr["TCK"] = {"value": atr_txt[last]}
|
||||||
|
except IndexError:
|
||||||
|
atr["TCK"] = {"value": -1}
|
||||||
|
last += 1
|
||||||
|
|
||||||
|
if len(atr_txt) > last:
|
||||||
|
atr["extra"] = atr_txt[last:]
|
||||||
|
|
||||||
|
if len(atr["hb"]["value"]) < hb_length:
|
||||||
|
missing = hb_length - len(atr["hb"]["value"])
|
||||||
|
if missing > 1:
|
||||||
|
(t1, t2) = ("s", "are")
|
||||||
|
else:
|
||||||
|
(t1, t2) = ("", "is")
|
||||||
|
atr["warning"] = "ATR is truncated: %d byte%s %s missing" % (missing, t1, t2)
|
||||||
|
|
||||||
|
return atr
|
||||||
|
|
||||||
|
|
||||||
class DataObject(abc.ABC):
|
class DataObject(abc.ABC):
|
||||||
"""A DataObject (DO) in the sense of ISO 7816-4. Contrary to 'normal' TLVs where one
|
"""A DataObject (DO) in the sense of ISO 7816-4. Contrary to 'normal' TLVs where one
|
||||||
simply has any number of different TLVs that may occur in any order at any point, ISO 7816
|
simply has any number of different TLVs that may occur in any order at any point, ISO 7816
|
||||||
has the habit of specifying TLV data but with very spcific ordering, or specific choices of
|
has the habit of specifying TLV data but with very specific ordering, or specific choices of
|
||||||
tags at specific points in a stream. This class tries to represent this."""
|
tags at specific points in a stream. This class tries to represent this."""
|
||||||
|
|
||||||
def __init__(self, name: str, desc: Optional[str] = None, tag: Optional[int] = None):
|
def __init__(self, name: str, desc: Optional[str] = None, tag: Optional[int] = None):
|
||||||
@@ -710,7 +839,7 @@ class TL0_DataObject(DataObject):
|
|||||||
|
|
||||||
|
|
||||||
class DataObjectCollection:
|
class DataObjectCollection:
|
||||||
"""A DataObjectCollection consits of multiple Data Objects identified by their tags.
|
"""A DataObjectCollection consists of multiple Data Objects identified by their tags.
|
||||||
A given encoded DO may contain any of them in any order, and may contain multiple instances
|
A given encoded DO may contain any of them in any order, and may contain multiple instances
|
||||||
of each DO."""
|
of each DO."""
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
[build-system]
|
[build-system]
|
||||||
requires = ["setuptools", "wheel"]
|
requires = ["setuptools", "wheel"]
|
||||||
build-backend = "setuptools.build_meta"
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[tool.pylint.main]
|
||||||
|
ignored-classes = ["twisted.internet.reactor"]
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
pyscard
|
pyscard
|
||||||
pyserial
|
pyserial
|
||||||
pytlv
|
pytlv
|
||||||
cmd2>=1.5
|
cmd2>=2.6.2,<3.0
|
||||||
jsonpath-ng
|
jsonpath-ng
|
||||||
construct>=2.10.70
|
construct>=2.10.70
|
||||||
bidict
|
bidict
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,7 +1,7 @@
|
|||||||
-----BEGIN CERTIFICATE-----
|
-----BEGIN CERTIFICATE-----
|
||||||
MIICgjCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
MIICgzCCAimgAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||||
SVQwHhcNMjQwNzA5MTUyOTM2WhcNMjUwODExMTUyOTM2WjAzMQ0wCwYDVQQKDARB
|
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
|
||||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI
|
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFowFAYHKoZI
|
||||||
zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n
|
zj0CAQYJKyQDAwIIAQEHA0IABEwizNgsjQIh+dhUO3LhB7zJ/ZBU1mx1wOt0p73n
|
||||||
MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV
|
MOdhjvZbJwteguQ6eW+N7guvivvrilNiU3oC/WXHnkEZa7WjggEaMIIBFjAOBgNV
|
||||||
@@ -10,7 +10,7 @@ A1UdIAQNMAswCQYHZ4ESAQIBAzAdBgNVHQ4EFgQUPTMJg/OfzFvS5K1ophmnR0iu
|
|||||||
i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ
|
i50wHwYDVR0jBBgwFoAUwLxwujaSnUO0Z/9XVwUw5Xq4/NgwKQYDVR0RBCIwIIIZ
|
||||||
dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG
|
dGVzdHNtZHBwbHVzMS5leGFtcGxlLmNvbYgDiDcKMGEGA1UdHwRaMFgwKqAooCaG
|
||||||
JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0
|
JGh0dHA6Ly9jaS50ZXN0LmV4YW1wbGUuY29tL0NSTC1BLmNybDAqoCigJoYkaHR0
|
||||||
cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0cA
|
cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUIuY3JsMAoGCCqGSM49BAMCA0gA
|
||||||
MEQCIHHmXEy9mgudh/VbK0hJwmX7eOgbvHLnlujrpQzvUd4uAiBFVJgSdzYvrmJ9
|
MEUCIQCfaGcMk+kuSJsbIyRPWttwWNftwQdHCQuu346PaiA2FAIgUrqhPw2um9gV
|
||||||
5yeIvmjHwxSMBgQp2dde7OtdVEK8Kw==
|
C+eWHaXio7WQh5L6VgLZzNifTQcldD4=
|
||||||
-----END CERTIFICATE-----
|
-----END CERTIFICATE-----
|
||||||
|
|||||||
Binary file not shown.
@@ -1,7 +1,7 @@
|
|||||||
-----BEGIN CERTIFICATE-----
|
-----BEGIN CERTIFICATE-----
|
||||||
MIICgzCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
MIICgjCCAiigAwIBAgIBCTAKBggqhkjOPQQDAjBEMRAwDgYDVQQDDAdUZXN0IENJ
|
||||||
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
MREwDwYDVQQLDAhURVNUQ0VSVDEQMA4GA1UECgwHUlNQVEVTVDELMAkGA1UEBhMC
|
||||||
SVQwHhcNMjQwNzA5MTUyODMzWhcNMjUwODExMTUyODMzWjAzMQ0wCwYDVQQKDARB
|
SVQwHhcNMjUwNjMwMTMxNDM4WhcNMjYwODAyMTMxNDM4WjAzMQ0wCwYDVQQKDARB
|
||||||
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
|
Q01FMSIwIAYDVQQDDBl0ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tMFkwEwYHKoZI
|
||||||
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
|
zj0CAQYIKoZIzj0DAQcDQgAEKCQwdc6O/R+uZ2g5QH2ybkzLQ3CUYhybOWEz8bJL
|
||||||
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
|
tQG4/k6yTT4NOS8lP28blGJws8opLjTbb3qHs6X2rJRfCKOCARowggEWMA4GA1Ud
|
||||||
@@ -10,7 +10,7 @@ VR0gBA0wCzAJBgdngRIBAgEDMB0GA1UdDgQWBBQn/vHyKRh+x4Pt9uApZKRRjVfU
|
|||||||
qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
|
qTAfBgNVHSMEGDAWgBT1QXK9+YqV1ly+uIo4ocEdgAqFwzApBgNVHREEIjAgghl0
|
||||||
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
|
ZXN0c21kcHBsdXMxLmV4YW1wbGUuY29tiAOINwowYQYDVR0fBFowWDAqoCigJoYk
|
||||||
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
|
aHR0cDovL2NpLnRlc3QuZXhhbXBsZS5jb20vQ1JMLUEuY3JsMCqgKKAmhiRodHRw
|
||||||
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSQAw
|
Oi8vY2kudGVzdC5leGFtcGxlLmNvbS9DUkwtQi5jcmwwCgYIKoZIzj0EAwIDSAAw
|
||||||
RgIhAL1qQ/cnrCZC7UnnLJ8WeK+0aWUJFWh1cOlBEzw0NlTVAiEA25Vf4WHzwmJi
|
RQIhAL+1lp/hGsj87/5RqOX2u3hS/VSftDN7EPrHJJFnTXLRAiBVxemKIKmC7+W1
|
||||||
zkARzxJ1qB0qfBofuJrtfPM4gNJ4Quw=
|
+RsTY5I51R+Cyoq4l5TEU49eplo5bw==
|
||||||
-----END CERTIFICATE-----
|
-----END CERTIFICATE-----
|
||||||
|
|||||||
@@ -1 +0,0 @@
|
|||||||
/tmp/sm-dp-sessions-BRP
|
|
||||||
@@ -1 +0,0 @@
|
|||||||
/tmp/sm-dp-sessions-NIST
|
|
||||||
107
smpp_ota_apdu.py
Executable file
107
smpp_ota_apdu.py
Executable file
@@ -0,0 +1,107 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
from pprint import pprint as pp
|
||||||
|
|
||||||
|
from pySim.ota import OtaKeyset, OtaDialectSms
|
||||||
|
from pySim.utils import b2h, h2b
|
||||||
|
|
||||||
|
import smpplib.gsm
|
||||||
|
import smpplib.client
|
||||||
|
import smpplib.consts
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# if you want to know what's happening
|
||||||
|
logging.basicConfig(level='DEBUG')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Foo:
|
||||||
|
def smpp_rx_handler(self, pdu):
|
||||||
|
sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id))
|
||||||
|
if pdu.short_message:
|
||||||
|
try:
|
||||||
|
dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, pdu.short_message)
|
||||||
|
except ValueError:
|
||||||
|
spi = self.spi.copy()
|
||||||
|
spi['por_shall_be_ciphered'] = False
|
||||||
|
spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
|
||||||
|
dec = self.ota_dialect.decode_resp(self.ota_keyset, spi, pdu.short_message)
|
||||||
|
pp(dec)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __init__(self, kic, kid, idx, tar):
|
||||||
|
# Two parts, UCS2, SMS with UDH
|
||||||
|
#parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(u'Привет мир!\n'*10)
|
||||||
|
|
||||||
|
client = smpplib.client.Client('localhost', 2775, allow_unknown_opt_params=True)
|
||||||
|
|
||||||
|
# Print when obtain message_id
|
||||||
|
client.set_message_sent_handler(
|
||||||
|
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
|
||||||
|
#client.set_message_received_handler(
|
||||||
|
# lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
|
||||||
|
client.set_message_received_handler(self.smpp_rx_handler)
|
||||||
|
|
||||||
|
client.connect()
|
||||||
|
client.bind_transceiver(system_id='test', password='test')
|
||||||
|
|
||||||
|
self.client = client
|
||||||
|
|
||||||
|
|
||||||
|
self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=idx, kic=h2b(kic),
|
||||||
|
algo_auth='triple_des_cbc2', kid_idx=idx, kid=h2b(kid))
|
||||||
|
self.ota_keyset.cntr = 0xdadb
|
||||||
|
self.tar = h2b(tar)
|
||||||
|
|
||||||
|
self.ota_dialect = OtaDialectSms()
|
||||||
|
self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||||
|
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||||
|
|
||||||
|
|
||||||
|
def tx_sms_tpdu(self, tpdu: bytes):
|
||||||
|
self.client.send_message(
|
||||||
|
source_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||||
|
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||||
|
# Make sure it is a byte string, not unicode:
|
||||||
|
source_addr='12',
|
||||||
|
|
||||||
|
dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
|
||||||
|
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
|
||||||
|
# Make sure thease two params are byte strings, not unicode:
|
||||||
|
destination_addr='23',
|
||||||
|
short_message=tpdu,
|
||||||
|
|
||||||
|
data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
|
||||||
|
esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
|
||||||
|
protocol_id=0x7f,
|
||||||
|
#registered_delivery=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def tx_c_apdu(self, apdu: bytes):
|
||||||
|
logger.info("C-APDU: %s" % b2h(apdu))
|
||||||
|
# translate to Secured OTA RFM
|
||||||
|
secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
|
||||||
|
# add user data header
|
||||||
|
tpdu = b'\x02\x70\x00' + secured
|
||||||
|
# send via SMPP
|
||||||
|
self.tx_sms_tpdu(tpdu)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
|
parser.add_argument('-c', '--kic')
|
||||||
|
parser.add_argument('-d', '--kid')
|
||||||
|
parser.add_argument('-i', '--idx', type=int, default=1)
|
||||||
|
parser.add_argument('-t', '--tar', default='b00011')
|
||||||
|
parser.add_argument('apdu', default="", nargs='+')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
f = Foo(args.kic, args.kid, args.idx, args.tar)
|
||||||
|
print("initialized, sending APDU")
|
||||||
|
f.tx_c_apdu(h2b("".join(args.apdu)))
|
||||||
|
|
||||||
|
f.client.listen()
|
||||||
@@ -2,7 +2,7 @@ Detected UICC Add-on "SIM"
|
|||||||
Detected UICC Add-on "GSM-R"
|
Detected UICC Add-on "GSM-R"
|
||||||
Detected UICC Add-on "RUIM"
|
Detected UICC Add-on "RUIM"
|
||||||
Can't read AIDs from SIM -- 'list' object has no attribute 'lower'
|
Can't read AIDs from SIM -- 'list' object has no attribute 'lower'
|
||||||
warning: EF.DIR seems to be empty!
|
EF.DIR seems to be empty!
|
||||||
ADF.ECASD: a0000005591010ffffffff8900000200
|
ADF.ECASD: a0000005591010ffffffff8900000200
|
||||||
ADF.ISD-R: a0000005591010ffffffff8900000100
|
ADF.ISD-R: a0000005591010ffffffff8900000100
|
||||||
ISIM: a0000000871004
|
ISIM: a0000000871004
|
||||||
|
|||||||
5
tests/unittests/test_card_key_provider.csv
Normal file
5
tests/unittests/test_card_key_provider.csv
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"card_type_id","formfactor_id","imsi","iccid","pin1","puk1","pin2","puk2","ki","adm1","adm2","proprietary","kic1","kic2","kic3","kid1","kid2","kid3","kik1","kik2","kik3","msisdn","acc","opc"
|
||||||
|
"myCardType","3FF","901700000000001","8988211000000000001","1234","12345678","1223","12345678","AAAAAAAAAAA5435425AAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 01","BBBBBBBBBB3324BBBBBBBB21212BBBBB","CC7654CCCCCCCCCCCCCCCCCCCCCCCCCC","DDDD90DDDDDDDDDDDDDDDDDD767DDDDD","EEEEEE567657567567EEEEEEEEEEEEEE","FFFFFFFFFFFFFFFFFFF56765765FFFFF","11111567811111111111111111111111","22222222222222222227669999222222","33333333333333333333333333333333","44444444444444445234544444444444","55555555555","0001","66666666666666666666666666666666"
|
||||||
|
"myCardType","3FF","901700000000002","8988211000000000002","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAA3425AAAA","10101010","9999999999999999","proprietary data 02","BBBBBB421BBBBBBBBBB12BBBBBBBBBBB","CCCCCCCCCC3456CCCCCCCCCCCCCCCCCC","DDDDDDDDD567657DDDD2DDDDDDDDDDDD","EEEEEEEE56756EEEEEEEEE567657EEEE","FFFFF567657FFFFFFFFFFFFFFFFFFFFF","11111111111146113433411576511111","22222222222223432225765222222222","33333333333333523453333333333333","44425435234444444444446544444444","55555555555","0001","66666666666666266666666666666666"
|
||||||
|
"myCardType","3FF","901700000000003","8988211000000000003","1234","12345678","1223","12345678","AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","10101010","9999999999999999","proprietary data 03","BBBBBBB45678BBBB756765BBBBBBBBBB","CCCCCCCCCCCCCC76543CCCC56765CCCC","DDDDDDDDDDDDDDDDDD5676575DDDDDDD","EEEEEEEEEEEEEEEEEE56765EEEEEEEEE","FFFFFFFFFFFFFFF567657FFFFFFFFFFF","11111111119876511111111111111111","22222222222444422222222222576522","33333332543333576733333333333333","44444444444567657567444444444444","55555555555","0001","66666675676575666666666666666666"
|
||||||
|
|
||||||
|
152
tests/unittests/test_card_key_provider.py
Normal file
152
tests/unittests/test_card_key_provider.py
Normal file
@@ -0,0 +1,152 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
from pySim.card_key_provider import *
|
||||||
|
|
||||||
|
class TestCardKeyProviderCsv(unittest.TestCase):
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
|
||||||
|
"OPC" : "000102030405065545645645645D0E0F",
|
||||||
|
"KIC1" : "06410203546406456456450B0C0D0E0F",
|
||||||
|
"KID1" : "00040267840507667609045645645E0F",
|
||||||
|
"KIK1" : "0001020307687607668678678C0D0E0F",
|
||||||
|
"KIC2" : "000142457594860706090A0B0688678F",
|
||||||
|
"KID2" : "600102030405649468690A0B0C0D648F",
|
||||||
|
"KIK2" : "00010203330506070496330B08640E0F",
|
||||||
|
"KIC3" : "000104030405064684686A068C0D0E0F",
|
||||||
|
"KID3" : "00010243048468070809060B0C0D0E0F",
|
||||||
|
"KIK3" : "00010204040506070809488B0C0D0E0F"}
|
||||||
|
|
||||||
|
csv_file_path = os.path.dirname(os.path.abspath(__file__)) + "/test_card_key_provider.csv"
|
||||||
|
card_key_provider_register(CardKeyProviderCsv(csv_file_path, column_keys))
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def test_card_key_provider_get(self):
|
||||||
|
test_data = [{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||||
|
'KI': '48a6d5f60567d45299e3ba08594009e7', 'ADM1': '10101010',
|
||||||
|
'ADM2': '9999999999999999', 'KIC1': '3eb8567fa0b4b1e63bcab13bff5f2702',
|
||||||
|
'KIC2': 'fd6c173a5b3f04b563808da24237fb46',
|
||||||
|
'KIC3': '66c8c848e5dff69d70689d155d44f323',
|
||||||
|
'KID1': 'd78accce870332dced467c173244dd94',
|
||||||
|
'KID2': 'b3bf050969747b2d2c9389e127a3d791',
|
||||||
|
'KID3': '40a77deb50d260b3041bbde1b5040625',
|
||||||
|
'KIK1': '451b503239d818ea34421aa9c2a8887a',
|
||||||
|
'KIK2': '967716f5fca8ae179f87f76524d1ae6b',
|
||||||
|
'KIK3': '0884db5eee5409a00fc1bbc57ac52541',
|
||||||
|
'OPC': '81817574c1961dd272ad080eb2caf279'}, 'ICCID' :"8988211000000000001"},
|
||||||
|
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||||
|
'KI': 'e94d7fa6fb92375dae86744ff6ecef49', 'ADM1': '10101010',
|
||||||
|
'ADM2': '9999999999999999', 'KIC1': '79b4e39387c66253da68f653381ded44',
|
||||||
|
'KIC2': '560561b5dba89c1da8d1920049e5e4f7',
|
||||||
|
'KIC3': '79ff35e84e39305a119af8c79f84e8e5',
|
||||||
|
'KID1': '233baf89122159553d67545ecedcf8e0',
|
||||||
|
'KID2': '8fc2874164d7a8e40d72c968bc894ab8',
|
||||||
|
'KID3': '2e3320f0dda85054d261be920fbfa065',
|
||||||
|
'KIK1': 'd51b1b17630103d1672a3e9e0e4827ed',
|
||||||
|
'KIK2': 'd01edbc48be555139506b0d7982bf7ff',
|
||||||
|
'KIK3': 'a6487a5170849e8e0a03026afea91f5a',
|
||||||
|
'OPC': '6b0d19ef28bd12f2daac31828d426939'}, 'ICCID' :"8988211000000000002"},
|
||||||
|
{'EXPECTED' : {'PIN1': '1234', 'PUK1': '12345678', 'PIN2': '1223', 'PUK2': '12345678',
|
||||||
|
'KI': '3cdec1552ef433a89f327905213c5a6e', 'ADM1': '10101010',
|
||||||
|
'ADM2': '9999999999999999', 'KIC1': '72986b13ce505e12653ad42df5cfca13',
|
||||||
|
'KIC2': '8f0d1e58b01e833773e5562c4940674d',
|
||||||
|
'KIC3': '9c72ba5a14d54f489edbffd3d8802f03',
|
||||||
|
'KID1': 'd23a42995df9ca83f74b2cfd22695526',
|
||||||
|
'KID2': '5c3a189d12aa1ac6614883d7de5e6c8c',
|
||||||
|
'KID3': 'a6ace0d303a2b38a96b418ab83c16725',
|
||||||
|
'KIK1': 'bf2319467d859c12527aa598430caef2',
|
||||||
|
'KIK2': '6a4c459934bea7e40787976b8881ab01',
|
||||||
|
'KIK3': '91cd02c38b5f68a98cc90a1f2299538f',
|
||||||
|
'OPC': '6df46814b1697daca003da23808bbbc3'}, 'ICCID' :"8988211000000000003"}]
|
||||||
|
|
||||||
|
for t in test_data:
|
||||||
|
result = card_key_provider_get(["PIN1","PUK1","PIN2","PUK2","KI","ADM1","ADM2","KIC1",
|
||||||
|
"KIC2","KIC3","KID1","KID2","KID3","KIK1","KIK2","KIK3","OPC"],
|
||||||
|
"ICCID", t.get('ICCID'))
|
||||||
|
self.assertEqual(result, t.get('EXPECTED'))
|
||||||
|
result = card_key_provider_get(["PIN1","puk1","PIN2","PUK2","KI","adm1","ADM2","KIC1",
|
||||||
|
"KIC2","kic3","KID1","KID2","KID3","kik1","KIK2","KIK3","OPC"],
|
||||||
|
"iccid", t.get('ICCID'))
|
||||||
|
self.assertEqual(result, t.get('EXPECTED'))
|
||||||
|
|
||||||
|
|
||||||
|
def test_card_key_provider_get_field(self):
|
||||||
|
test_data = [{'EXPECTED' : "3eb8567fa0b4b1e63bcab13bff5f2702", 'ICCID' :"8988211000000000001"},
|
||||||
|
{'EXPECTED' : "79b4e39387c66253da68f653381ded44", 'ICCID' :"8988211000000000002"},
|
||||||
|
{'EXPECTED' : "72986b13ce505e12653ad42df5cfca13", 'ICCID' :"8988211000000000003"}]
|
||||||
|
|
||||||
|
for t in test_data:
|
||||||
|
result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID'))
|
||||||
|
self.assertEqual(result, t.get('EXPECTED'))
|
||||||
|
for t in test_data:
|
||||||
|
result = card_key_provider_get_field("kic1", "iccid", t.get('ICCID'))
|
||||||
|
self.assertEqual(result, t.get('EXPECTED'))
|
||||||
|
|
||||||
|
|
||||||
|
class TestCardKeyFieldCryptor(unittest.TestCase):
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
|
||||||
|
"OPC" : "000102030405065545645645645D0E0F",
|
||||||
|
"KIC1" : "06410203546406456456450B0C0D0E0F",
|
||||||
|
"UICC_SCP03" : "00040267840507667609045645645E0F"}
|
||||||
|
self.crypt = CardKeyFieldCryptor(transport_keys)
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def test_encrypt_field(self):
|
||||||
|
test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "OPC"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
|
||||||
|
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIC1"},
|
||||||
|
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KID1"},
|
||||||
|
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIK1"},
|
||||||
|
{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "opc"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
|
||||||
|
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kic1"},
|
||||||
|
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kid1"},
|
||||||
|
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
|
||||||
|
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kik1"}]
|
||||||
|
|
||||||
|
for t in test_data:
|
||||||
|
result = self.crypt.encrypt_field(t.get('FIELDNAME'), t.get('PLAINTEXT_VAL'))
|
||||||
|
self.assertEqual(result, t.get('EXPECTED'))
|
||||||
|
|
||||||
|
def test_decrypt_field(self):
|
||||||
|
test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "OPC"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIC1"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KID1"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIK1"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "opc"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kic1"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kid1"},
|
||||||
|
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
|
||||||
|
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kik1"}]
|
||||||
|
|
||||||
|
for t in test_data:
|
||||||
|
result = self.crypt.decrypt_field(t.get('FIELDNAME'), t.get('ENCRYPTED_VAL'))
|
||||||
|
self.assertEqual(result, t.get('EXPECTED'))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
121
tests/unittests/test_log.py
Executable file
121
tests/unittests/test_log.py
Executable file
@@ -0,0 +1,121 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# (C) 2025 by Sysmocom s.f.m.c. GmbH
|
||||||
|
# All Rights Reserved
|
||||||
|
#
|
||||||
|
# Author: Philipp Maier <pmaier@sysmocom.de>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import logging
|
||||||
|
from pySim.log import PySimLogger
|
||||||
|
import io
|
||||||
|
import sys
|
||||||
|
from inspect import currentframe, getframeinfo
|
||||||
|
|
||||||
|
log = PySimLogger.get("TEST")
|
||||||
|
|
||||||
|
TEST_MSG_DEBUG = "this is a debug message"
|
||||||
|
TEST_MSG_INFO = "this is an info message"
|
||||||
|
TEST_MSG_WARNING = "this is a warning message"
|
||||||
|
TEST_MSG_ERROR = "this is an error message"
|
||||||
|
TEST_MSG_CRITICAL = "this is a critical message"
|
||||||
|
|
||||||
|
expected_message = None
|
||||||
|
|
||||||
|
class PySimLogger_Test(unittest.TestCase):
|
||||||
|
|
||||||
|
def __test_01_safe_defaults_one(self, callback, message:str):
|
||||||
|
# When log messages are sent to an unconfigured PySimLogger class, we expect the unmodified message being
|
||||||
|
# logged to stdout, just as if it were printed via a normal print() statement.
|
||||||
|
log_output = io.StringIO()
|
||||||
|
sys.stdout = log_output
|
||||||
|
callback(message)
|
||||||
|
assert(log_output.getvalue().strip() == message)
|
||||||
|
sys.stdout = sys.__stdout__
|
||||||
|
|
||||||
|
def test_01_safe_defaults(self):
|
||||||
|
# When log messages are sent to an unconfigured PySimLogger class, we expect that all messages are logged,
|
||||||
|
# regardless of the logging level.
|
||||||
|
self.__test_01_safe_defaults_one(log.debug, TEST_MSG_DEBUG)
|
||||||
|
self.__test_01_safe_defaults_one(log.info, TEST_MSG_INFO)
|
||||||
|
self.__test_01_safe_defaults_one(log.warning, TEST_MSG_WARNING)
|
||||||
|
self.__test_01_safe_defaults_one(log.error, TEST_MSG_ERROR)
|
||||||
|
self.__test_01_safe_defaults_one(log.critical, TEST_MSG_CRITICAL)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _test_print_callback(message):
|
||||||
|
assert(message.strip() == expected_message)
|
||||||
|
|
||||||
|
def test_02_normal(self):
|
||||||
|
# When the PySimLogger is set up with its default values, we expect formatted log messages on all logging
|
||||||
|
# levels.
|
||||||
|
global expected_message
|
||||||
|
PySimLogger.setup(self._test_print_callback)
|
||||||
|
expected_message = "DEBUG: " + TEST_MSG_DEBUG
|
||||||
|
log.debug(TEST_MSG_DEBUG)
|
||||||
|
expected_message = "INFO: " + TEST_MSG_INFO
|
||||||
|
log.info(TEST_MSG_INFO)
|
||||||
|
expected_message = "WARNING: " + TEST_MSG_WARNING
|
||||||
|
log.warning(TEST_MSG_WARNING)
|
||||||
|
expected_message = "ERROR: " + TEST_MSG_ERROR
|
||||||
|
log.error(TEST_MSG_ERROR)
|
||||||
|
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
|
||||||
|
log.critical(TEST_MSG_CRITICAL)
|
||||||
|
|
||||||
|
def test_03_verbose(self):
|
||||||
|
# When the PySimLogger is set up with its default values, we expect verbose formatted log messages on all
|
||||||
|
# logging levels.
|
||||||
|
global expected_message
|
||||||
|
PySimLogger.setup(self._test_print_callback)
|
||||||
|
PySimLogger.set_verbose(True)
|
||||||
|
frame = currentframe()
|
||||||
|
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - DEBUG: " + TEST_MSG_DEBUG
|
||||||
|
log.debug(TEST_MSG_DEBUG)
|
||||||
|
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - INFO: " + TEST_MSG_INFO
|
||||||
|
log.info(TEST_MSG_INFO)
|
||||||
|
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - WARNING: " + TEST_MSG_WARNING
|
||||||
|
log.warning(TEST_MSG_WARNING)
|
||||||
|
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - ERROR: " + TEST_MSG_ERROR
|
||||||
|
log.error(TEST_MSG_ERROR)
|
||||||
|
expected_message = __name__ + "." + str(getframeinfo(frame).lineno + 1) + " -- TEST - CRITICAL: " + TEST_MSG_CRITICAL
|
||||||
|
log.critical(TEST_MSG_CRITICAL)
|
||||||
|
|
||||||
|
def test_04_level(self):
|
||||||
|
# When the PySimLogger is set up with its default values, we expect formatted log messages but since we will
|
||||||
|
# limit the log level to INFO, we should not see any messages of level DEBUG
|
||||||
|
global expected_message
|
||||||
|
PySimLogger.setup(self._test_print_callback)
|
||||||
|
PySimLogger.set_level(logging.INFO)
|
||||||
|
|
||||||
|
# We test this in non verbose mode, this will also confirm that disabeling the verbose mode works.
|
||||||
|
PySimLogger.set_verbose(False)
|
||||||
|
|
||||||
|
# Debug messages should not appear
|
||||||
|
expected_message = None
|
||||||
|
log.debug(TEST_MSG_DEBUG)
|
||||||
|
|
||||||
|
# All other messages should appear normally
|
||||||
|
expected_message = "INFO: " + TEST_MSG_INFO
|
||||||
|
log.info(TEST_MSG_INFO)
|
||||||
|
expected_message = "WARNING: " + TEST_MSG_WARNING
|
||||||
|
log.warning(TEST_MSG_WARNING)
|
||||||
|
expected_message = "ERROR: " + TEST_MSG_ERROR
|
||||||
|
log.error(TEST_MSG_ERROR)
|
||||||
|
expected_message = "CRITICAL: " + TEST_MSG_CRITICAL
|
||||||
|
log.critical(TEST_MSG_CRITICAL)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user