mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-16 18:38:32 +03:00
pySim.ota.OtaDialectSms: Implement command decoding
So far we only implemented command encoding and response decoding. Let's also add command decoding, which is useful for example when decoding protocol traces. Change-Id: Id666cea8a91a854209f3c19c1f09b512bb493c85
This commit is contained in:
45
pySim/ota.py
45
pySim/ota.py
@@ -18,7 +18,7 @@
|
||||
import zlib
|
||||
import abc
|
||||
import struct
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
from construct import Enum, Int8ub, Int16ub, Struct, Bytes, GreedyBytes, BitsInteger, BitStruct
|
||||
from construct import Flag, Padding, Switch, this
|
||||
|
||||
@@ -388,6 +388,49 @@ class OtaDialectSms(OtaDialect):
|
||||
|
||||
return envelope_data
|
||||
|
||||
def decode_cmd(self, otak: OtaKeyset, encoded: bytes) -> Tuple[bytes, dict, bytes]:
|
||||
"""Decode an encoded (encrypted, signed) OTA SMS Command-APDU."""
|
||||
if True: # TODO: how to decide?
|
||||
cpl = int.from_bytes(encoded[:2], 'big')
|
||||
part_head = encoded[2:2+8]
|
||||
ciph = encoded[2+8:]
|
||||
envelope_data = otak.crypt.decrypt(ciph)
|
||||
else:
|
||||
part_head = encoded[:8]
|
||||
envelope_data = encoded[8:]
|
||||
|
||||
hdr_dec = self.hdr_construct.parse(part_head)
|
||||
|
||||
# strip counter part from front of envelope_data
|
||||
part_cnt = envelope_data[:6]
|
||||
cntr = int.from_bytes(part_cnt[:5], 'big')
|
||||
pad_cnt = int.from_bytes(part_cnt[5:], 'big')
|
||||
envelope_data = envelope_data[6:]
|
||||
|
||||
spi = hdr_dec['spi']
|
||||
if spi['rc_cc_ds'] == 'cc':
|
||||
# split cc from front of APDU
|
||||
cc = envelope_data[:8]
|
||||
apdu = envelope_data[8:]
|
||||
# verify CC
|
||||
temp_data = cpl.to_bytes(2, 'big') + part_head + part_cnt + apdu
|
||||
otak.auth.check_sig(temp_data, cc)
|
||||
elif spi['rc_cc_ds'] == 'rc':
|
||||
# CRC32
|
||||
crc32_rx = int.from_bytes(envelope_data[:4], 'big')
|
||||
# FIXME: crc32_computed = zlip.crc32(
|
||||
# FIXME: verify RC
|
||||
raise NotImplementedError
|
||||
apdu = envelope_data[4:]
|
||||
elif spi['rc_cc_ds'] == 'no_rc_cc_ds':
|
||||
apdu = envelope_data
|
||||
else:
|
||||
raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
|
||||
|
||||
apdu = apdu[:len(apdu)-pad_cnt]
|
||||
return hdr_dec['tar'], spi, apdu
|
||||
|
||||
|
||||
def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]):
|
||||
if isinstance(data, str):
|
||||
data = h2b(data)
|
||||
|
||||
@@ -65,14 +65,19 @@ class Test_SMS_AES128(unittest.TestCase):
|
||||
|
||||
def test_cmd_aes128_ciphered(self):
|
||||
spi = self.spi_base
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40004023f00'))
|
||||
apdu = h2b('00a40004023f00')
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, apdu)
|
||||
self.assertEqual(b2h(r), '00281506192222b00011e87cceebb2d93083011ce294f93fc4d8de80da1abae8c37ca3e72ec4432e5058')
|
||||
|
||||
|
||||
# also test decoder
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, r)
|
||||
self.assertEqual(b2h(apdu), b2h(dec_apdu))
|
||||
self.assertEqual(b2h(dec_tar), b2h(self.tar))
|
||||
self.assertEqual(dec_spi, spi)
|
||||
|
||||
|
||||
class Test_SMS_3DES(unittest.TestCase):
|
||||
tar = h2b('b00000')
|
||||
apdu = h2b('00a40000023f00')
|
||||
"""Test the OtaDialectSms for 3DES algorithms."""
|
||||
def __init__(self, foo, **kwargs):
|
||||
super().__init__(foo, **kwargs)
|
||||
@@ -134,21 +139,26 @@ class Test_SMS_3DES(unittest.TestCase):
|
||||
spi = self.spi_base
|
||||
spi['ciphering'] = True
|
||||
spi['rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, self.apdu)
|
||||
self.assertEqual(b2h(r), '00180d04193535b00000e3ec80a849b554421276af3883927c20')
|
||||
# also test decoder
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(self.od, r)
|
||||
self.assertEqual(b2h(self.apdu), b2h(dec_apdu))
|
||||
self.assertEqual(b2h(dec_tar), b2h(self.tar))
|
||||
self.assertEqual(dec_spi, spi)
|
||||
|
||||
def test_cmd_3des_signed(self):
|
||||
spi = self.spi_base
|
||||
spi['ciphering'] = False
|
||||
spi['rc_cc_ds'] = 'cc'
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, self.apdu)
|
||||
self.assertEqual(b2h(r), '1502193535b00000000000000000072ea17bdb72060e00a40000023f00')
|
||||
|
||||
def test_cmd_3des_none(self):
|
||||
spi = self.spi_base
|
||||
spi['ciphering'] = False
|
||||
spi['rc_cc_ds'] = 'no_rc_cc_ds'
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
|
||||
r = self.dialect.encode_cmd(self.od, self.tar, spi, self.apdu)
|
||||
self.assertEqual(b2h(r), '0d00193535b0000000000000000000a40000023f00')
|
||||
|
||||
|
||||
@@ -273,6 +283,12 @@ class SmsOtaTestCase(OtaTestCase):
|
||||
#print("tpdu: %s" % b2h(tpdu.to_bytes()))
|
||||
self.assertEqual(b2h(tpdu.to_bytes()), t['request']['encoded_tpdu'])
|
||||
|
||||
# also test decoder
|
||||
dec_tar, dec_spi, dec_apdu = self.dialect.decode_cmd(kset, outp)
|
||||
self.assertEqual(b2h(t['request']['apdu']), b2h(dec_apdu))
|
||||
self.assertEqual(b2h(dec_tar), b2h(self.tar))
|
||||
self.assertEqual(dec_spi, t['spi'])
|
||||
|
||||
def test_decode_resp(self):
|
||||
for t in SmsOtaTestCase.testdatasets:
|
||||
with self.subTest(name=t['name']):
|
||||
|
||||
Reference in New Issue
Block a user