"""Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115.""" # (C) 2021-2022 by Harald Welte # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from pySim.construct import * from pySim.utils import b2h from pySim.sms import UserDataHeader from construct import * from bidict import bidict import zlib import abc import struct # ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS # 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP # CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data # CAT_TP TCP/IP SMS # CPI 0x01 0x01 =IEIa=70,len=0 # CHI NULL NULL NULL # CPI, CPL and CHL included in RC/CC/DS true true # RPI 0x02 0x02 =IEIa=71,len=0 # RHI NULL NULL # RPI, RPL and RHL included in RC/CC/DS true true # packet-id 0-bf,ff 0-bf,ff # identification packet false 102 225 tbl 6 # KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK # TS 102 225 Table 5 ota_status_codes = bidict({ 0x00: 'PoR OK', 0x01: 'RC/CC/DS failed', 0x02: 'CNTR low', 0x03: 'CNTR high', 0x04: 'CNTR blocked', 0x05: 'Ciphering error', 0x06: 'Unidentified security error', 0x07: 'Insufficient memory', 0x08: 'more time', 0x09: 'TAR unknown', 0x0a: 'Insufficient security level', 0x0b: 'Actual Response in SMS-SUBMIT', # 31.115 0x0c: 'Actual Response in USSD', # 31.115 }) # ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7 ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3, cntr_blocked=4, ciphering_error=5, undefined_security_error=6, insufficient_memory=7, more_time_needed=8, tar_unknown=9, insufficient_security_level=0x0A, actual_response_sms_submit=0x0B, actual_response_ussd=0x0C) # ETSI TS 102 226 Section 5.1.2 CompactRemoteResp = Struct('number_of_commands'/Int8ub, 'last_status_word'/HexAdapter(Bytes(2)), 'last_response_data'/HexAdapter(GreedyBytes)) RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3) # TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2 SPI = BitStruct( # first octet Padding(3), 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, counter_must_be_higher=2, counter_must_be_lower=3), 'ciphering'/Flag, 'rc_cc_ds'/RC_CC_DS, # second octet Padding(2), 'por_in_submit'/Flag, 'por_shall_be_ciphered'/Flag, 'por_rc_cc_ds'/RC_CC_DS, 'por'/Enum(BitsInteger(2), no_por=0, por_required=1, por_only_when_error=2) ) # TS 102 225 Section 5.1.2 KIC = BitStruct('key'/BitsInteger(4), 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9, aes_cbc=2) ) # TS 102 225 Section 5.1.3.1 KID_CC = BitStruct('key'/BitsInteger(4), 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9, aes_cmac=2) ) # TS 102 225 Section 5.1.3.2 KID_RC = BitStruct('key'/BitsInteger(4), 'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3) ) SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub, 'cmd_hdr_len'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }), 'tar'/Bytes(3), 'secured_data'/GreedyBytes) class OtaKeyset: """The OTA related data (key material, counter) to be used in encrypt/decrypt.""" def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes, algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0): self.algo_crypt = algo_crypt self.kic = bytes(kic) self.kic_idx = kic_idx self.algo_auth = algo_auth self.kid = bytes(kid) self.kid_idx = kid_idx self.cntr = cntr @property def auth(self): """Return an instance of the matching OtaAlgoAuth.""" return OtaAlgoAuth.fromKeyset(self) @property def crypt(self): """Return an instance of the matching OtaAlgoCrypt.""" return OtaAlgoCrypt.fromKeyset(self) class OtaCheckError(Exception): pass class OtaDialect(abc.ABC): """Base Class for OTA dialects such as SMS, BIP, ...""" def _compute_sig_len(self, spi:SPI): if spi['rc_cc_ds'] == 'no_rc_cc_ds': return 0 elif spi['rc_cc_ds'] == 'rc': # CRC-32 return 4 elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC) # TODO: this is not entirely correct, as in AES case it could be 4 or 8 return 8 else: raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds']) @abc.abstractmethod def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes: pass @abc.abstractmethod def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> bytes: pass from Crypto.Cipher import DES, DES3, AES from Crypto.Hash import CMAC class OtaAlgo(abc.ABC): iv = b'\x00\x00\x00\x00\x00\x00\x00\x00' blocksize = None enum_name = None @staticmethod def _get_padding(in_len: int, multiple: int, padding: int = 0): """Return padding bytes towards multiple of N.""" if in_len % multiple == 0: return b'' pad_cnt = multiple - (in_len % multiple) return b'\x00' * pad_cnt @staticmethod def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0): """Pad input bytes to multiple of N.""" return indat + OtaAlgo._get_padding(len(indat), multiple, padding) def pad_to_blocksize(self, indat: bytes, padding: int = 0): """Pad the given input data to multiple of the cipher block size.""" return self._pad_to_multiple(indat, self.blocksize, padding) def __init__(self, otak: OtaKeyset): self.otak = otak def __str__(self): return self.__class__.__name__ class OtaAlgoCrypt(OtaAlgo, abc.ABC): def __init__(self, otak: OtaKeyset): if self.enum_name != otak.algo_crypt: raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt)) super().__init__(otak) def encrypt(self, data:bytes) -> bytes: """Encrypt given input bytes using the key material given in constructor.""" padded_data = self.pad_to_blocksize(data) return self._encrypt(data) def decrypt(self, data:bytes) -> bytes: """Decrypt given input bytes using the key material given in constructor.""" return self._decrypt(data) @abc.abstractmethod def _encrypt(self, data:bytes) -> bytes: """Actual implementation, to be implemented by derived class.""" pass @abc.abstractmethod def _decrypt(self, data:bytes) -> bytes: """Actual implementation, to be implemented by derived class.""" pass @classmethod def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt': """Resolve the class for the encryption algorithm of otak and instantiate it.""" for subc in cls.__subclasses__(): if subc.enum_name == otak.algo_crypt: return subc(otak) raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth) class OtaAlgoAuth(OtaAlgo, abc.ABC): def __init__(self, otak: OtaKeyset): if self.enum_name != otak.algo_auth: raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt)) super().__init__(otak) def sign(self, data:bytes) -> bytes: """Compute the CC/CR check bytes for the input data using key material given in constructor.""" padded_data = self.pad_to_blocksize(data) sig = self._sign(padded_data) return sig def check_sig(self, data:bytes, cc_received:bytes): """Compute the CC/CR check bytes for the input data and compare against cc_received.""" cc = self.sign(data) if cc_received != cc: raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc))) @abc.abstractmethod def _sign(self, data:bytes) -> bytes: """Actual implementation, to be implemented by derived class.""" pass @classmethod def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth': """Resolve the class for the authentication algorithm of otak and instantiate it.""" for subc in cls.__subclasses__(): if subc.enum_name == otak.algo_auth: return subc(otak) raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth) class OtaAlgoCryptDES(OtaAlgoCrypt): """DES is insecure. For backwards compatibility with pre-Rel8""" name = 'DES' enum_name = 'single_des' blocksize = 8 def _encrypt(self, data:bytes) -> bytes: cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv) return cipher.encrypt(data) def _decrypt(self, data:bytes) -> bytes: cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv) return cipher.decrypt(data) class OtaAlgoAuthDES(OtaAlgoAuth): """DES is insecure. For backwards compatibility with pre-Rel8""" name = 'DES' enum_name = 'single_des' blocksize = 8 def _sign(self, data:bytes) -> bytes: cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv) ciph = cipher.encrypt(data) return ciph[len(ciph) - 8:] class OtaAlgoCryptDES3(OtaAlgoCrypt): name = '3DES' enum_name = 'triple_des_cbc2' blocksize = 8 def _encrypt(self, data:bytes) -> bytes: cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv) return cipher.encrypt(data) def _decrypt(self, data:bytes) -> bytes: cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv) return cipher.decrypt(data) class OtaAlgoAuthDES3(OtaAlgoAuth): name = '3DES' enum_name = 'triple_des_cbc2' blocksize = 8 def _sign(self, data:bytes) -> bytes: cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv) ciph = cipher.encrypt(data) return ciph[len(ciph) - 8:] class OtaAlgoCryptAES(OtaAlgoCrypt): name = 'AES' enum_name = 'aes_cbc' blocksize = 16 # TODO: is this needed? def _encrypt(self, data:bytes) -> bytes: cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv) return cipher.encrypt(data) def _decrypt(self, data:bytes) -> bytes: cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv) return cipher.decrypt(data) class OtaAlgoAuthAES(OtaAlgoAuth): name = 'AES' enum_name = 'aes_cmac' blocksize = 16 # TODO: is this needed? def _sign(self, data:bytes) -> bytes: cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8) cmac.update(data) ciph = cmac.digest() return ciph[len(ciph) - 8:] class OtaDialectSms(OtaDialect): """OTA dialect for SMS based transport, as described in 3GPP TS 31.115.""" SmsResponsePacket = Struct('rpl'/Int16ub, 'rhl'/Int8ub, 'tar'/Bytes(3), 'cntr'/Bytes(5), 'pcntr'/Int8ub, 'response_status'/ResponseStatus, 'cc_rc'/Bytes(this.rhl-10), 'secured_data'/GreedyBytes) def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes: # length of signature in octets len_sig = self._compute_sig_len(spi) pad_cnt = 0 if spi['ciphering']: # ciphering is requested # append padding bytes to end up with blocksize len_cipher = 6 + len_sig + len(apdu) apdu += otak.crypt._get_padding(len_cipher, otak.crypt.blocksize) kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt} kid = {'key': otak.kid_idx, 'algo': otak.algo_auth} # CHL = number of octets from (and including) SPI to the end of RC/CC/DS # 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1) chl = 13 + len_sig # CHL + SPI (+ KIC + KID) c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3)) part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar}) #print("part_head: %s" % b2h(part_head)) # CNTR + PCNTR (CNTR not used) part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big') #print("part_cnt: %s" % b2h(part_cnt)) envelope_data = part_head + part_cnt + apdu #print("envelope_data: %s" % b2h(envelope_data)) # 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering # CPL from and including CPI to end of secured data, including any padding for ciphering cpl = len(envelope_data) + len_sig envelope_data = cpl.to_bytes(2, 'big') + envelope_data #print("envelope_data with cpl: %s" % b2h(envelope_data)) if spi['rc_cc_ds'] == 'cc': cc = otak.auth.sign(envelope_data) envelope_data = part_cnt + cc + apdu elif spi['rc_cc_ds'] == 'rc': # CRC32 crc32 = zlib.crc32(envelope_data) & 0xffffffff envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu elif spi['rc_cc_ds'] == 'no_rc_cc_ds': envelope_data = part_cnt + apdu else: raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds']) #print("envelope_data with sig: %s" % b2h(envelope_data)) # encrypt as needed if spi['ciphering']: # ciphering is requested ciph = otak.crypt.encrypt(envelope_data) envelope_data = part_head + ciph # prefix with another CPL cpl = len(envelope_data) envelope_data = cpl.to_bytes(2, 'big') + envelope_data else: envelope_data = part_head + envelope_data #print("envelope_data: %s" % b2h(envelope_data)) return envelope_data def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> bytes: if isinstance(data, str): data = h2b(data) # plain-text POR: 027100000e0ab000110000000000000001612f # UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS # 02 71 00 000e 0a b00011 0000000000 00 00 01 612f # POR with CC: 027100001612b000110000000000000055f47118381175fb01612f # POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c if data[0] != 0x02: raise ValueError('Unexpected UDL=0x%02x' % data[0]) udhd, remainder = UserDataHeader.fromBytes(data) if not udhd.has_ie(0x71): raise ValueError('RPI 0x71 not found in UDH') rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered res = self.SmsResponsePacket.parse(remainder) if spi['por_shall_be_ciphered']: # decrypt ciphered_part = remainder[6:] deciph = otak.crypt.decrypt(ciphered_part) temp_data = rph_rhl_tar + deciph res = self.SmsResponsePacket.parse(temp_data) # remove specified number of padding bytes, if any if res['pcntr'] != 0: # this conditional is needed as python [:-0] renders an empty return! res['secured_data'] = res['secured_data'][:-res['pcntr']] remainder = temp_data # is there a CC/RC present? len_sig = res['rhl'] - 10 if spi['por_rc_cc_ds'] == 'no_rc_cc_ds': if len_sig: raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig) elif spi['por_rc_cc_ds'] == 'cc': # verify signature # UDH is part of CC/RC! udh = data[:3] # RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC rpl_rhl_tar_cntr_pcntr_sts = remainder[:13] # remove the CC/RC bytes temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:] cc = otak.auth.check_sig(temp_data, res['cc_rc']) # TODO: CRC else: raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds']) # TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2 dec = CompactRemoteResp.parse(res['secured_data']) dec['tar'] = res['tar'] dec['response_status'] = res['response_status'] return dec