mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-16 18:38:32 +03:00
'securityDomain' elements are decoded to ProfileElementSD instances, which keep higher level representations of the key data apart from the decoded[] lists. So far, apply_val() was dropping binary values in decoded[], which does not work, because ProfileElementSD._pre_encode() overwrites self.decoded[] from the higher level representation. Implement using - ProfileElementSD.find_key() and SecurityDomainKeyComponent to modify an exsiting entry, or - ProfileElementSD.add_key() to create a new entry. Before this patch, SdKey parameters seemed to patch PES successfully, but their modifications did not end up in the encoded DER. (BTW, this does not fix any other errors that may still be present in the various SdKey subclasses, patches coming up.) Related: SYS#6768 Change-Id: I07dfc378705eba1318e9e8652796cbde106c6a52
2126 lines
90 KiB
Python
2126 lines
90 KiB
Python
"""Implementation of SimAlliance/TCA Interoperable Profile handling"""
|
|
|
|
# (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import logging
|
|
import abc
|
|
import io
|
|
import os
|
|
from typing import Tuple, List, Optional, Dict, Union
|
|
from collections import OrderedDict
|
|
from difflib import SequenceMatcher, Match
|
|
|
|
import asn1tools
|
|
import zipfile
|
|
from pySim import javacard
|
|
from osmocom.utils import b2h, h2b, Hexstr
|
|
from osmocom.tlv import BER_TLV_IE, bertlv_parse_tag, bertlv_parse_len
|
|
from osmocom.construct import build_construct, parse_construct, GreedyInteger, GreedyBytes, StripHeaderAdapter
|
|
|
|
from pySim import ts_102_222
|
|
from pySim.utils import dec_imsi
|
|
from pySim.ts_102_221 import FileDescriptor
|
|
from pySim.filesystem import CardADF, Path
|
|
from pySim.ts_31_102 import ADF_USIM
|
|
from pySim.ts_31_103 import ADF_ISIM
|
|
from pySim.esim import compile_asn1_subdir
|
|
from pySim.esim.saip import templates
|
|
from pySim.esim.saip import oid
|
|
from pySim.global_platform import KeyType, KeyUsageQualifier
|
|
from pySim.global_platform.uicc import UiccSdInstallParams
|
|
|
|
asn1 = compile_asn1_subdir('saip')
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NonMatch(Match):
|
|
"""Representing a contiguous non-matching block of data; the opposite of difflib.Match"""
|
|
@classmethod
|
|
def from_matchlist(cls, l: List[Match], size:int) -> List['NonMatch']:
|
|
"""Build a list of non-matching blocks of data from its inverse (list of matching blocks).
|
|
The caller must ensure that the input list is ordered, non-overlapping and only contains
|
|
matches at equal offsets in a and b."""
|
|
res = []
|
|
cur = 0
|
|
for match in l:
|
|
if match.a != match.b:
|
|
raise ValueError('only works for equal-offset matches')
|
|
assert match.a >= cur
|
|
nm_len = match.a - cur
|
|
if nm_len > 0:
|
|
# there's no point in generating zero-lenth non-matching sections
|
|
res.append(cls(a=cur, b=cur, size=nm_len))
|
|
cur = match.a + match.size
|
|
if size > cur:
|
|
res.append(cls(a=cur, b=cur, size=size-cur))
|
|
|
|
return res
|
|
|
|
class Naa:
|
|
"""A class defining a Network Access Application (NAA)"""
|
|
name = None
|
|
# AID prefix, as used for ADF and EF.DIR
|
|
aid = None
|
|
# the ProfileElement types used specifically in this NAA
|
|
pe_types = []
|
|
# we only use the base DN of each OID; there may be subsequent versions underneath it
|
|
templates = []
|
|
mandatory_services = []
|
|
adf: CardADF = None
|
|
|
|
@classmethod
|
|
def adf_name(cls):
|
|
return 'adf-' + cls.mandatory_services[0]
|
|
|
|
class NaaCsim(Naa):
|
|
"""A class representing the CSIM (CDMA) Network Access Application (NAA)"""
|
|
name = "csim"
|
|
aid = h2b("")
|
|
mandatory_services = ["csim"]
|
|
pe_types = ["csim", "opt-csim", "cdmaParameter"]
|
|
templates = [oid.ADF_CSIM_by_default, oid.ADF_CSIMopt_not_by_default]
|
|
|
|
class NaaUsim(Naa):
|
|
"""A class representing the USIM Network Access Application (NAA)"""
|
|
name = "usim"
|
|
aid = h2b("a0000000871002")
|
|
mandatory_services = ["usim"]
|
|
pe_types = ["usim", "opt-usim", "phonebook", "gsm-access", "eap", "df-5gs", "df-saip",
|
|
"df-snpn", "df-5gprose"]
|
|
templates = [oid.ADF_USIM_by_default, oid.ADF_USIMopt_not_by_default,
|
|
oid.DF_PHONEBOOK_ADF_USIM, oid.DF_GSM_ACCESS_ADF_USIM,
|
|
oid.DF_EAP, oid.DF_5GS, oid.DF_SAIP, oid.DF_SNPN,
|
|
oid.DF_5GProSe]
|
|
adf = ADF_USIM()
|
|
|
|
class NaaIsim(Naa):
|
|
"""A class representing the ISIM Network Access Application (NAA)"""
|
|
name = "isim"
|
|
aid = h2b("a0000000871004")
|
|
mandatory_services = ["isim"]
|
|
pe_types = ["isim", "opt-isim"]
|
|
templates = [oid.ADF_ISIM_by_default, oid.ADF_ISIMopt_not_by_default]
|
|
adf = ADF_ISIM()
|
|
|
|
NAAs = {
|
|
NaaCsim.name: NaaCsim,
|
|
NaaUsim.name: NaaUsim,
|
|
NaaIsim.name: NaaIsim,
|
|
}
|
|
|
|
class File:
|
|
"""Internal representation of a file in a profile filesystem.
|
|
|
|
Args:
|
|
pename: Name string of the profile element
|
|
l: List of tuples [fileDescriptor, fillFileContent, fillFileOffset profile elements]
|
|
template: Applicable FileTemplate describing defaults as per SAIP spec
|
|
name: Human-readable name like EF.IMSI, DF.TELECOM, ADF.USIM, ...
|
|
"""
|
|
def __init__(self, pename: str, l: Optional[List[Tuple]] = None, template:
|
|
Optional[templates.FileTemplate] = None, name: Optional[str] = None):
|
|
self._template_derived = False
|
|
self.pe_name = pename
|
|
self._name = name
|
|
self.template = template
|
|
self._body: Optional[bytes] = None
|
|
self.node: Optional['FsNode'] = None
|
|
self.file_type = None
|
|
self.fid: Optional[int] = None
|
|
self.sfi: Optional[int] = None
|
|
self.arr = None
|
|
self.rec_len: Optional[int] = None
|
|
self.nb_rec: Optional[int] = None
|
|
self._file_size = 0
|
|
self.high_update: bool = False
|
|
self.read_and_update_when_deact: bool = False
|
|
self.shareable: bool = True
|
|
self.df_name = None
|
|
self.fill_pattern = None
|
|
self.fill_pattern_repeat = False
|
|
# apply some defaults from profile
|
|
if self.template:
|
|
self.from_template(self.template)
|
|
if l:
|
|
self.from_tuples(l)
|
|
|
|
@property
|
|
def name(self) -> Optional[str]:
|
|
if self._name:
|
|
return self._name
|
|
if self.template:
|
|
return self.template.name
|
|
return None
|
|
|
|
@property
|
|
def file_size(self) -> Optional[int]:
|
|
"""Return the size of the file in bytes."""
|
|
if self.file_type in ['LF', 'CY']:
|
|
if self._file_size and self.nb_rec is None and self.rec_len:
|
|
self.nb_rec = self._file_size // self.rec_len
|
|
|
|
return self.nb_rec * self.rec_len
|
|
elif self.file_type in ['TR', 'BT']:
|
|
return self._file_size
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_tuplelist_item(l: List[Tuple], key: str):
|
|
"""get the [first] value matching given key from a list of (key, value) tuples."""
|
|
for k, v in l:
|
|
if k == key:
|
|
return v
|
|
|
|
@staticmethod
|
|
def _encode_file_size(size: int) -> bytes:
|
|
"""Encode the integer file size into bytes, as needed by the asn1tools encoder."""
|
|
if False: # TODO: some way to know for which version of SAIP we should encode
|
|
# A V2.0 eUICC may expect file size to be encoded on at least 2 bytes, as specified in
|
|
# ETSI TS 102 222 [102 222], and may reject the encoding without the leading byte.
|
|
return size.to_bytes(2, 'big')
|
|
else:
|
|
# > v2.0 case where it must be encoded on the minimum number of octets possible
|
|
c = GreedyInteger()
|
|
return c.build(size)
|
|
|
|
@staticmethod
|
|
def _decode_file_size(size: bytes) -> int:
|
|
"""Decode the file size from asn1tools-bytes to integer."""
|
|
c = GreedyInteger()
|
|
return c.parse(size)
|
|
|
|
def from_template(self, template: templates.FileTemplate):
|
|
"""Determine defaults for file based on given FileTemplate."""
|
|
if self._template_derived:
|
|
raise ValueError('This file already has been initialized by a template before')
|
|
# copy various bits from template
|
|
self.file_type = template.file_type
|
|
self.fid = template.fid
|
|
self.sfi = template.sfi
|
|
self.arr = template.arr.to_bytes(1, 'big')
|
|
if hasattr(template, 'rec_len'):
|
|
self.rec_len = template.rec_len
|
|
else:
|
|
self.rec_len = None
|
|
if hasattr(template, 'nb_rec'):
|
|
self.nb_rec = template.nb_rec
|
|
else:
|
|
self.nb_rec = None
|
|
self.high_update = template.high_update
|
|
# All the files defined in the templates shall have, by default, shareable/not-shareable bit in the file descriptor set to "shareable".
|
|
self.shareable = True
|
|
self._template_derived = True
|
|
if hasattr(template, 'file_size'):
|
|
self._file_size = template.file_size
|
|
|
|
def _recompute_size(self):
|
|
"""recompute the file size, if needed (body larger than current size)"""
|
|
body_size = len(self.body)
|
|
if self.file_size == None or self.file_size < body_size:
|
|
self._file_size = body_size
|
|
|
|
@property
|
|
def body(self):
|
|
return self._body
|
|
|
|
@body.setter
|
|
def body(self, value: bytes):
|
|
self._body = value
|
|
# we need to potentially update the file size after changing the body [size]
|
|
self._recompute_size()
|
|
|
|
def to_fileDescriptor(self) -> dict:
|
|
"""Convert from internal representation to 'fileDescriptor' as used by asn1tools for SAIP"""
|
|
fileDescriptor = {}
|
|
fdb_dec = {}
|
|
pefi = {}
|
|
spfi = 0
|
|
if self.fid and self.fid != self.template.fid:
|
|
fileDescriptor['fileID'] = self.fid.to_bytes(2, 'big')
|
|
if self.sfi and self.sfi != self.template.sfi:
|
|
fileDescriptor['shortEFID'] = bytes([self.sfi])
|
|
if self.df_name:
|
|
fileDescriptor['dfName'] = self.df_name
|
|
if self.arr and self.arr != self.template.arr.to_bytes(1, 'big'):
|
|
fileDescriptor['securityAttributesReferenced'] = self.arr
|
|
if self.file_type in ['LF', 'CY']:
|
|
fdb_dec['file_type'] = 'working_ef'
|
|
if self.nb_rec and self.rec_len:
|
|
fileDescriptor['efFileSize'] = self._encode_file_size(self.file_size)
|
|
if self.file_type == 'LF':
|
|
fdb_dec['structure'] = 'linear_fixed'
|
|
elif self.file_type == 'CY':
|
|
fdb_dec['structure'] = 'cyclic'
|
|
elif self.file_type == 'BT':
|
|
fdb_dec['file_type'] = 'working_ef'
|
|
fdb_dec['structure'] = 'ber_tlv'
|
|
if self.file_size:
|
|
pefi['maximumFileSize'] = self._encode_file_size(self.file_size)
|
|
elif self.file_type == 'TR':
|
|
fdb_dec['file_type'] = 'working_ef'
|
|
fdb_dec['structure'] = 'transparent'
|
|
if self.file_size:
|
|
fileDescriptor['efFileSize'] = self._encode_file_size(self.file_size)
|
|
elif self.file_type in ['MF', 'DF', 'ADF']:
|
|
fdb_dec['file_type'] = 'df'
|
|
fdb_dec['structure'] = 'no_info_given'
|
|
# build file descriptor based on above input data
|
|
fd_dict = {}
|
|
if len(fdb_dec):
|
|
fdb_dec['shareable'] = self.shareable
|
|
fd_dict['file_descriptor_byte'] = fdb_dec
|
|
if self.rec_len:
|
|
fd_dict['record_len'] = self.rec_len
|
|
if len(fd_dict):
|
|
fileDescriptor['fileDescriptor'] = build_construct(FileDescriptor._construct, fd_dict)
|
|
if self.high_update:
|
|
spfi |= 0x80 # TS 102 222 Table 5
|
|
if self.read_and_update_when_deact:
|
|
spfi |= 0x40 # TS 102 222 Table 5
|
|
if spfi != 0x00:
|
|
pefi['specialFileInformation'] = spfi.to_bytes(1, 'big')
|
|
if self.fill_pattern:
|
|
if not self.fill_pattern_repeat:
|
|
pefi['fillPattern'] = self.fill_pattern
|
|
else:
|
|
pefi['repeatPattern'] = self.fill_pattern
|
|
if len(pefi.keys()):
|
|
# TODO: When overwriting the default "proprietaryEFInfo" for a template EF for which a
|
|
# default fill or repeat pattern is defined; it is hence recommended to provide the
|
|
# desired fill or repeat pattern in the "proprietaryEFInfo" element for the EF in Profiles
|
|
# downloaded to a V2.2 or earlier eUICC.
|
|
fileDescriptor['proprietaryEFInfo'] = pefi
|
|
logger.debug("%s: to_fileDescriptor(%s)" % (self, fileDescriptor))
|
|
return fileDescriptor
|
|
|
|
def from_fileDescriptor(self, fileDescriptor: dict):
|
|
"""Convert from 'fileDescriptor' as used by asn1tools for SAIP to internal representation"""
|
|
logger.debug("%s: from_fileDescriptor(%s)" % (self, fileDescriptor))
|
|
fileID = fileDescriptor.get('fileID', None)
|
|
if fileID:
|
|
self.fid = int.from_bytes(fileID, 'big')
|
|
shortEFID = fileDescriptor.get('shortEFID', None)
|
|
if shortEFID:
|
|
self.sfi = shortEFID[0]
|
|
dfName = fileDescriptor.get('dfName', None)
|
|
if dfName:
|
|
self.df_name = dfName
|
|
efFileSize = fileDescriptor.get('efFileSize', None)
|
|
if efFileSize:
|
|
self._file_size = self._decode_file_size(efFileSize)
|
|
|
|
pefi = fileDescriptor.get('proprietaryEFInfo', {})
|
|
securityAttributesReferenced = fileDescriptor.get('securityAttributesReferenced', None)
|
|
if securityAttributesReferenced:
|
|
self.arr = securityAttributesReferenced
|
|
if 'fileDescriptor' in fileDescriptor:
|
|
fd_dec = parse_construct(FileDescriptor._construct, fileDescriptor['fileDescriptor'])
|
|
fdb_dec = fd_dec['file_descriptor_byte']
|
|
self.shareable = fdb_dec['shareable']
|
|
if fdb_dec['file_type'] == 'working_ef':
|
|
if fd_dec['num_of_rec']:
|
|
self.nb_rec = fd_dec['num_of_rec']
|
|
if fd_dec['record_len']:
|
|
self.rec_len = fd_dec['record_len']
|
|
if efFileSize:
|
|
if self.rec_len and self.nb_rec == None:
|
|
# compute the number of records from file size and record length
|
|
self.nb_rec = self._file_size // self.rec_len
|
|
if fdb_dec['structure'] == 'linear_fixed':
|
|
self.file_type = 'LF'
|
|
elif fdb_dec['structure'] == 'cyclic':
|
|
self.file_type = 'CY'
|
|
elif fdb_dec['structure'] == 'transparent':
|
|
self.file_type = 'TR'
|
|
elif fdb_dec['structure'] == 'ber_tlv':
|
|
self.file_type = 'BT'
|
|
if 'maximumFileSize' in pefi:
|
|
self._file_size = self._decode_file_size(pefi['maximumFileSize'])
|
|
specialFileInformation = pefi.get('specialFileInformation', None)
|
|
if specialFileInformation:
|
|
# TS 102 222 Table 5
|
|
if specialFileInformation[0] & 0x80:
|
|
self.high_update = True
|
|
if specialFileInformation[0] & 0x40:
|
|
self.read_and_update_when_deact = True
|
|
if 'repeatPattern' in pefi:
|
|
self.fill_pattern = pefi['repeatPattern']
|
|
self.fill_pattern_repeat = True
|
|
if 'fillPattern' in pefi:
|
|
self.fill_pattern = pefi['fillPattern']
|
|
self.fill_pattern_repeat = False
|
|
elif fdb_dec['file_type'] == 'df':
|
|
# only set it, if an earlier call to from_template() didn't already set it, as
|
|
# the template can differentiate between MF, DF and ADF (unlike FDB)
|
|
if not self.file_type:
|
|
self.file_type = 'DF'
|
|
else:
|
|
if not self._template_derived:
|
|
# FIXME: this shouldn't happen? How can this be? But I see it in real profiles
|
|
#raise ValueError("%s: from_fileDescriptor without nested 'fileDescriptor'" % self)
|
|
pass
|
|
|
|
logger.debug("\t%s" % repr(self))
|
|
|
|
def from_tuples(self, l:List[Tuple]):
|
|
"""Parse a list of fileDescriptor, fillFileContent, fillFileOffset tuples into this instance."""
|
|
# fileDescriptor
|
|
fd = self.get_tuplelist_item(l, 'fileDescriptor')
|
|
if not fd and not self._template_derived:
|
|
raise ValueError("%s: No fileDescriptor found in tuple, and none set by template before" % self)
|
|
if fd:
|
|
self.from_fileDescriptor(dict(fd))
|
|
# BODY
|
|
self._body = self.file_content_from_tuples(l)
|
|
|
|
@staticmethod
|
|
def path_from_gfm(bin_path: bytes):
|
|
"""convert from byte-array of 16bit FIDs to list of integers"""
|
|
return [int.from_bytes(bin_path[i:i+2], 'big') for i in range(0, len(bin_path), 2)]
|
|
|
|
@staticmethod
|
|
def path_to_gfm(path: List[int]) -> bytes:
|
|
"""convert from list of 16bit integers to byte-array"""
|
|
return b''.join([x.to_bytes(2, 'big') for x in path])
|
|
|
|
def to_tuples(self) -> List[Tuple]:
|
|
"""Generate a list of fileDescriptor, fillFileContent, fillFileOffset tuples into this instance."""
|
|
return [('fileDescriptor', self.to_fileDescriptor())] + self.file_content_to_tuples()
|
|
|
|
def to_gfm(self) -> List[Tuple]:
|
|
"""Generate a list of filePath, createFCP, fillFileContent, fillFileOffset tuples into this instance."""
|
|
ret = [('filePath', self.path_to_gfm(self.path)), ('createFCP', self.to_fileDescriptor())]
|
|
ret += self.file_content_to_tuples()
|
|
return ret
|
|
|
|
def expand_fill_pattern(self) -> bytes:
|
|
"""Expand the fill/repeat pattern as per TS 102 222 Section 6.3.2.2.2"""
|
|
return ts_102_222.expand_pattern(self.fill_pattern, self.fill_pattern_repeat, self.file_size)
|
|
|
|
def file_content_from_tuples(self, l: List[Tuple]) -> Optional[bytes]:
|
|
"""linearize a list of fillFileContent / fillFileOffset tuples into a stream of bytes."""
|
|
stream = io.BytesIO()
|
|
# Providing file content within "fillFileContent" / "fillFileOffset" shall have the same effect as
|
|
# creating a file with a fill/repeat pattern and thereafter updating the content via Update.
|
|
# Step 1: Fill with pattern from Fcp or Template
|
|
if self.fill_pattern:
|
|
stream.write(self.expand_fill_pattern())
|
|
elif self.template and self.template.default_val:
|
|
stream.write(self.template.expand_default_value_pattern(self.file_size))
|
|
stream.seek(0)
|
|
# then process the fillFileContent/fillFileOffset
|
|
for k, v in l:
|
|
if k == 'doNotCreate':
|
|
return None
|
|
if k == 'fileDescriptor':
|
|
pass
|
|
elif k == 'fillFileOffset':
|
|
stream.seek(v, os.SEEK_CUR)
|
|
elif k == 'fillFileContent':
|
|
stream.write(v)
|
|
else:
|
|
return ValueError("Unknown key '%s' in tuple list" % k)
|
|
return stream.getvalue()
|
|
|
|
def file_content_to_tuples(self, optimize:bool = False) -> List[Tuple]:
|
|
"""Encode the file contents into a list of fillFileContent / fillFileOffset tuples that can be fed
|
|
into the asn.1 encoder. If optimize is True, it will try to encode only the differences from the
|
|
fillFileContent of the profile template. Otherwise, the entire file contents will be encoded
|
|
as-is."""
|
|
if not self.file_type in ['TR', 'LF', 'CY', 'BT']:
|
|
return []
|
|
if not optimize:
|
|
# simplistic approach: encode the full file, ignoring the template/default
|
|
return [('fillFileContent', self.body)]
|
|
# Try to 'compress' the file body, based on the default file contents.
|
|
if self.template:
|
|
default = self.template.expand_default_value_pattern(length=len(self.body))
|
|
if not default:
|
|
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
|
else:
|
|
if default == self.body:
|
|
# 100% match: return an empty tuple list to make eUICC use the default
|
|
return []
|
|
sm = SequenceMatcher(a=default, b=self.body)
|
|
else:
|
|
# no template at all: we can only remove padding
|
|
sm = SequenceMatcher(a=b'\xff'*len(self.body), b=self.body)
|
|
matching_blocks = sm.get_matching_blocks()
|
|
# we can only make use of matches that have the same offset in 'a' and 'b'
|
|
matching_blocks = [x for x in matching_blocks if x.size > 0 and x.a == x.b]
|
|
non_matching_blocks = NonMatch.from_matchlist(matching_blocks, self.file_size)
|
|
ret = []
|
|
cur = 0
|
|
for block in non_matching_blocks:
|
|
ret.append(('fillFileOffset', block.a - cur))
|
|
ret.append(('fillFileContent', self.body[block.a:block.a+block.size]))
|
|
cur += block.size
|
|
return ret
|
|
|
|
def __str__(self) -> str:
|
|
return "File(%s)" % self.pe_name
|
|
|
|
def __repr__(self) -> str:
|
|
return "File(%s, %s, %04X, SFI=%s)" % (self.pe_name, self.file_type, self.fid or 0, self.sfi)
|
|
|
|
def check_template_modification_rules(self):
|
|
"""Check template modification rules as per SAIP section 8.3.3."""
|
|
if not self.template:
|
|
return None
|
|
|
|
|
|
class ProfileElement:
|
|
"""Generic Class representing a Profile Element (PE) within a SAIP Profile. This may be used directly,
|
|
but it's more likely sub-classed with a specific class for the specific profile element type, like e.g
|
|
ProfileElementHeader, ProfileElementMF, ...
|
|
"""
|
|
FILE_BEARING = ['mf', 'cd', 'telecom', 'usim', 'opt-usim', 'isim', 'opt-isim', 'phonebook', 'gsm-access',
|
|
'csim', 'opt-csim', 'eap', 'df-5gs', 'df-saip', 'df-snpn', 'df-5gprose', 'iot', 'opt-iot']
|
|
# in their infinite wisdom the spec authors used inconsistent/irregular naming of PE type vs. hedaer field
|
|
# names, so we have to manually translate the exceptions here...
|
|
header_name_translation_dict = {
|
|
'header': None,
|
|
'end': 'end-header',
|
|
'genericFileManagement': 'gfm-header',
|
|
'akaParameter': 'aka-header',
|
|
'cdmaParameter': 'cdma-header',
|
|
# note how they couldn't even consistently capitalize the 'header' suffix :(
|
|
'application': 'app-Header',
|
|
'pukCodes': 'puk-Header',
|
|
'pinCodes': 'pin-Header',
|
|
'securityDomain': 'sd-Header',
|
|
'df-5gprose': 'df-5g-prose-header',
|
|
}
|
|
|
|
def __init__(self, decoded = None, mandated: bool = True,
|
|
pe_sequence: Optional['ProfileElementSequence'] = None):
|
|
"""
|
|
Instantiate a new ProfileElement. This is usually either called with the 'decoded' argument after
|
|
reading a SAIP-DER-encoded PE. Alternatively, when constructing a PE from scratch, decoded is None,
|
|
and a minimal PE-Header is generated.
|
|
|
|
Args:
|
|
decoded: asn1tools-generated decoded structure for this PE
|
|
mandated: Whether or not the PE-Header should contain the mandated attribute
|
|
pe_sequence: back-reference to the PE-Sequence of which we're part of
|
|
"""
|
|
self.pe_sequence = pe_sequence
|
|
if decoded:
|
|
self.decoded = decoded
|
|
else:
|
|
self.decoded = OrderedDict()
|
|
if self.header_name:
|
|
self.decoded[self.header_name] = { 'identification': None}
|
|
if mandated:
|
|
self.decoded[self.header_name] = { 'mandated': None}
|
|
|
|
@property
|
|
def header_name(self) -> str:
|
|
"""Return the name of the header field within the profile element."""
|
|
# unnecessary complication by inconsistent naming :(
|
|
if self.type.startswith('opt-'):
|
|
return self.type.replace('-','') + '-header'
|
|
if self.type in self.header_name_translation_dict:
|
|
return self.header_name_translation_dict[self.type]
|
|
return self.type + '-header'
|
|
|
|
@property
|
|
def header(self):
|
|
"""The decoded ProfileHeader."""
|
|
return self.decoded.get(self.header_name, None)
|
|
|
|
@property
|
|
def identification(self):
|
|
"""The identification value: An unique number for the PE within the PE-Sequence."""
|
|
if self.header:
|
|
return self.header['identification']
|
|
else:
|
|
return None
|
|
|
|
@property
|
|
def templateID(self):
|
|
"""Return the decoded templateID used by this profile element (if any)."""
|
|
return self.decoded.get('templateID', None)
|
|
|
|
@classmethod
|
|
def class_for_petype(cls, pe_type: str) -> Optional['ProfileElement']:
|
|
"""Return the subclass implementing the given pe-type string."""
|
|
class4petype = {
|
|
# use same order as ASN.1 source definition of "ProfileElement ::= CHOICE {"
|
|
'header': ProfileElementHeader,
|
|
'genericFileManagement': ProfileElementGFM,
|
|
'pinCodes': ProfileElementPin,
|
|
'pukCodes': ProfileElementPuk,
|
|
'akaParameter': ProfileElementAKA,
|
|
# TODO: cdmaParameter
|
|
'securityDomain': ProfileElementSD,
|
|
'rfm': ProfileElementRFM,
|
|
'application': ProfileElementApplication,
|
|
# TODO: nonStandard
|
|
'end': ProfileElementEnd,
|
|
'mf': ProfileElementMF,
|
|
'cd': ProfileElementCD,
|
|
'telecom': ProfileElementTelecom,
|
|
'usim': ProfileElementUSIM,
|
|
'opt-usim': ProfileElementOptUSIM,
|
|
'isim': ProfileElementISIM,
|
|
'opt-isim': ProfileElementOptISIM,
|
|
'phonebook': ProfileElementPhonebook,
|
|
'gsm-access': ProfileElementGsmAccess,
|
|
# TODO: csim
|
|
# TODO: opt-csim
|
|
'eap': ProfileElementEAP,
|
|
'df-5gs': ProfileElementDf5GS,
|
|
'df-saip': ProfileElementDfSAIP,
|
|
'df-snpn': ProfileElementDfSNPN,
|
|
'df-5gprose': ProfileElementDf5GProSe,
|
|
}
|
|
if pe_type in class4petype:
|
|
return class4petype[pe_type]
|
|
else:
|
|
return None
|
|
|
|
@classmethod
|
|
def from_der(cls, der: bytes,
|
|
pe_sequence: Optional['ProfileElementSequence'] = None) -> 'ProfileElement':
|
|
"""Construct an instance from given raw, DER encoded bytes.
|
|
|
|
Args:
|
|
der: raw, DER-encoded bytes of a single PE
|
|
pe_sequence: back-reference to the PE-Sequence of which this PE is part of
|
|
"""
|
|
pe_type, decoded = asn1.decode('ProfileElement', der)
|
|
pe_cls = cls.class_for_petype(pe_type)
|
|
if pe_cls:
|
|
inst = pe_cls(decoded, pe_sequence=pe_sequence)
|
|
else:
|
|
inst = ProfileElement(decoded, pe_sequence=pe_sequence)
|
|
inst.type = pe_type
|
|
# run any post-decoder a derived class may have
|
|
if hasattr(inst, '_post_decode'):
|
|
inst._post_decode()
|
|
return inst
|
|
|
|
def to_der(self) -> bytes:
|
|
"""Build an encoded DER representation of the instance."""
|
|
# run any pre-encoder a derived class may have
|
|
if hasattr(self, '_pre_encode'):
|
|
self._pre_encode()
|
|
return asn1.encode('ProfileElement', (self.type, self.decoded))
|
|
|
|
def __str__(self) -> str:
|
|
return self.type
|
|
|
|
class FsProfileElement(ProfileElement):
|
|
"""A file-system bearing profile element, like MF, USIM, ....
|
|
|
|
We keep two major representations of the data:
|
|
* The "decoded" member, as introduced by our parent class, containing asn1tools syntax
|
|
* the "files" dict, consisting of File values indexed by PE-name strings
|
|
|
|
The methods pe2files and files2pe convert between those two representations.
|
|
"""
|
|
def __init__(self, decoded = None, mandated: bool = True, **kwargs):
|
|
super().__init__(decoded, mandated, **kwargs)
|
|
# indexed by PE-Name
|
|
self.files = {}
|
|
# resolve ASN.1 type definition; needed to e.g. iterate field names (for file pe-names)
|
|
self.tdef = asn1.types['ProfileElement'].type.name_to_member[self.type]
|
|
|
|
def file_template_for_path(self, path: Path, adf: Optional[str] = None) -> Optional[templates.FileTemplate]:
|
|
"""Resolve the FileTemplate for given path, if we have any matching.
|
|
|
|
Args:
|
|
path: the path for which we would like to resolve the FileTemplate
|
|
adf: string name of the ADF which might be used with this PE
|
|
"""
|
|
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
|
for f in template.files:
|
|
if f.path == path:
|
|
return f
|
|
# optionally prefix with ADF name of NAA
|
|
if adf and Path(adf) + f.path == path:
|
|
return f
|
|
|
|
def supports_file_for_path(self, path: Path, adf: Optional[str] = None) -> bool:
|
|
"""Does this ProfileElement support a file of given path?"""
|
|
return self.file_template_for_path(path) != None
|
|
|
|
def add_file(self, file: File):
|
|
"""Add a File to the ProfileElement."""
|
|
logger.debug("%s.add_file: %s" % (self.__class__.__name__, repr(file)))
|
|
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
|
if file.pe_name in self.files:
|
|
raise KeyError('Cannot add file: %s already exists' % file.pename)
|
|
self.files[file.pe_name] = file
|
|
if self.pe_sequence:
|
|
if file.pe_name == 'mf': #file.fid == 0x3f00:
|
|
if self.pe_sequence.mf:
|
|
raise ValueError("PE Sequence already has MF, cannot add another one")
|
|
file.node = FsNodeMF(file)
|
|
self.pe_sequence.mf = file.node
|
|
self.pe_sequence.cur_df = file.node
|
|
else:
|
|
if not template.extends and file.pe_name != template.base_df().pe_name:
|
|
# FsNodeDF of the first [A]DF of the PE
|
|
pe_df = self.files[template.base_df().pe_name].node
|
|
if file.template:
|
|
for d in file.template.ppath: # TODO: revert list?
|
|
pe_df = pe_df[d]
|
|
self.pe_sequence.cur_df = pe_df
|
|
elif template.parent:
|
|
# this is a template that belongs into the [A]DF of another template
|
|
# 1) find the PE for the referenced template
|
|
parent_pe = self.pe_sequence.get_closest_prev_pe_for_templateID(self, template.parent.oid)
|
|
# 2) resolve the [A]DF that forms the base of that parent PE
|
|
pe_df = parent_pe.files[template.parent.base_df().pe_name].node
|
|
self.pe_sequence.cur_df = pe_df
|
|
self.pe_sequence.cur_df = self.pe_sequence.cur_df.add_file(file)
|
|
|
|
def file2pe(self, file: File):
|
|
"""Update the "decoded" member for the given file with the contents from the given File instance.
|
|
We expect that the File instance is part of self.files"""
|
|
if self.files[file.pe_name] != file:
|
|
raise ValueError("The file you passed is not part of this ProfileElement")
|
|
self.decoded[file.pe_name] = file.to_tuples()
|
|
|
|
def files2pe(self):
|
|
"""Update the "decoded" member for each file with the contents of the "files" member."""
|
|
for k, f in self.files.items():
|
|
self.decoded[k] = f.to_tuples()
|
|
|
|
def pe2files(self):
|
|
"""Update the "files" member with the contents of the "decoded" member."""
|
|
logger.debug("%s.pe2files(%s)" % (self.__class__.__name__, self.type))
|
|
tdict = {x.name: x for x in self.tdef.root_members}
|
|
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
|
for k, v in self.decoded.items():
|
|
if tdict[k].type_name == 'File':
|
|
file = File(k, v, template.files_by_pename.get(k, None))
|
|
self.add_file(file)
|
|
|
|
def create_file(self, pename: str) -> File:
|
|
"""Programmatically create a file by its PE-Name."""
|
|
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
|
file = File(pename, None, template.files_by_pename.get(pename, None))
|
|
self.add_file(file)
|
|
self.decoded[pename] = []
|
|
return file
|
|
|
|
def _post_decode(self):
|
|
# not entirely sure about doing this this automatism
|
|
self.pe2files()
|
|
|
|
def _pre_encode(self):
|
|
# should we do self.pe2files()? I don't think so
|
|
#self.files2pe()
|
|
pass
|
|
|
|
class ProfileElementGFM(ProfileElement):
|
|
type = 'genericFileManagement'
|
|
|
|
@staticmethod
|
|
def path_str(path: List[int]) -> str:
|
|
return '/'.join(['%04X' % x for x in path])
|
|
|
|
def __init__(self, decoded = None, mandated: bool = True, **kwargs):
|
|
super().__init__(decoded, mandated, **kwargs)
|
|
# indexed by PE-Name
|
|
self.files = {}
|
|
self.tdef = asn1.types['ProfileElement'].type.name_to_member[self.type]
|
|
if decoded:
|
|
return
|
|
self.decoded['fileManagementCMD'] = []
|
|
|
|
def supports_file_for_path(self, path: Path, adf: Optional[str] = None) -> bool:
|
|
"""Does this ProfileElement support a file of given path?"""
|
|
# GFM supports all files in all paths...
|
|
return True
|
|
|
|
def add_file(self, file: File, path: Path):
|
|
logger.debug("%s.add_file(path=%s, %s)" % (self.__class__.__name__, path, repr(file)))
|
|
if hasattr(self.pe_sequence, 'mf'):
|
|
if isinstance(path, Path):
|
|
parent = self.pe_sequence.mf.lookup_by_path(Path(path[:-1]))
|
|
else:
|
|
parent = self.pe_sequence.mf.lookup_by_fidpath(path[:-1])
|
|
|
|
path_str = self.path_str(parent.fid_path + [file.fid])
|
|
else:
|
|
path_str = str(path)
|
|
if path_str in self.files:
|
|
raise KeyError('Cannot add file: %s already exists' % path_str)
|
|
self.files[path_str] = file
|
|
if hasattr(self.pe_sequence, 'mf'):
|
|
self.pe_sequence.cur_df = parent.add_file(file)
|
|
|
|
def pe2files(self):
|
|
"""Update the "files" member with the contents of the "decoded" member."""
|
|
def perform(self, path: List[int], file_elements):
|
|
if len(file_elements):
|
|
if self.pe_sequence:
|
|
self.pe_sequence.cd(path)
|
|
file = File(None, file_elements)
|
|
file.path = path
|
|
file.pe_name = self.path_str(path + [file.fid])
|
|
self.add_file(file, path + [file.fid])
|
|
|
|
logger.debug("="*70 + " " + self.type)
|
|
#logger.debug(self.decoded)
|
|
path = [0x3f00] # current DF at start of PE: MF
|
|
file_elements = []
|
|
# looks like TCA added one level too much in the ASN.1 hierarchy here
|
|
for fmc in self.decoded['fileManagementCMD']:
|
|
for fmc2 in fmc:
|
|
if fmc2[0] == 'filePath':
|
|
# selecting a new path means we're done with the previous file
|
|
perform(self, path, file_elements)
|
|
file_elements = []
|
|
if fmc2[1] == b'':
|
|
path = [0x3f00]
|
|
else:
|
|
path = [0x3f00] + File.path_from_gfm(fmc2[1])
|
|
#logger.debug("filePath %s -> path=%s" % (fmc2[1], path))
|
|
elif fmc2[0] == 'createFCP':
|
|
# new FCP means new file; perform the old one
|
|
perform(self, path, file_elements)
|
|
file_elements = [('fileDescriptor', fmc2[1])]
|
|
elif fmc2[0] == 'fillFileOffset' or fmc2[0] == 'fillFileContent':
|
|
file_elements.append(fmc2)
|
|
else:
|
|
raise ValueError("Unknown GFM choice '%s'" % fmc2[0])
|
|
# add the last file, if we still have any pending data in file_elements
|
|
perform(self, path, file_elements)
|
|
|
|
def files2pe(self):
|
|
"""Update the "decoded" member from the "files" member."""
|
|
# FIXME: implement this
|
|
# sort / iterate by path; issue filePath, createFCP and fillFile{Offset,Content} elements
|
|
|
|
|
|
def _post_decode(self):
|
|
# not entirely sure about this automatism
|
|
self.pe2files()
|
|
|
|
|
|
class ProfileElementMF(FsProfileElement):
|
|
"""Class representing the ProfileElement for the MF (Master File)"""
|
|
type = 'mf'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.MF)
|
|
for fname in ['mf', 'ef-iccid', 'ef-dir', 'ef-arr']:
|
|
self.decoded[fname] = []
|
|
# TODO: resize EF.DIR?
|
|
|
|
class ProfileElementPuk(ProfileElement):
|
|
"""Class representing the ProfileElement for a PUK (PIN Unblocking Code)"""
|
|
type = 'pukCodes'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['pukCodes'] = []
|
|
self.add_puk(0x01, b'11111111')
|
|
self.add_puk(0x81, b'22222222')
|
|
|
|
def add_puk(self, key_ref: int, puk_value: bytes, max_attempts:int = 10, retries_left:int = 10):
|
|
"""Add a PUK to the pukCodes ProfileElement"""
|
|
if key_ref < 0 or key_ref > 0xff:
|
|
raise ValueError('key_ref must be uint8')
|
|
if len(puk_value) != 8:
|
|
raise ValueError('puk_value must be 8 bytes long')
|
|
if max_attempts < 0 or max_attempts > 0xf:
|
|
raise ValueError('max_attempts must be 4 bit')
|
|
if retries_left < 0 or max_attempts > 0xf:
|
|
raise ValueError('retries_left must be 4 bit')
|
|
puk = {
|
|
'keyReference': key_ref,
|
|
'pukValue': puk_value,
|
|
'maxNumOfAttemps-retryNumLeft': (max_attempts << 4) | retries_left,
|
|
}
|
|
self.decoded['pukCodes'].append(puk)
|
|
|
|
|
|
class ProfileElementPin(ProfileElement):
|
|
"""Class representing the ProfileElement for a PIN (Personal Identification Number)"""
|
|
type = 'pinCodes'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['pinCodes'] = ('pinconfig', [])
|
|
self.add_pin(0x01, b'0000\xff\xff\xff\xff', unblock_ref=1, pin_attrib=6)
|
|
self.add_pin(0x10, b'11111111', pin_attrib=3)
|
|
|
|
def add_pin(self, key_ref: int, pin_value: bytes, max_attempts : int = 3, retries_left : int = 3,
|
|
unblock_ref: Optional[int] = None, pin_attrib: int = 7):
|
|
"""Add a PIN to the pinCodes ProfileElement"""
|
|
if key_ref < 0 or key_ref > 0xff:
|
|
raise ValueError('key_ref must be uint8')
|
|
if pin_attrib < 0 or pin_attrib > 0xff:
|
|
raise ValueError('pin_attrib must be uint8')
|
|
if len(pin_value) != 8:
|
|
raise ValueError('pin_value must be 8 bytes long')
|
|
if max_attempts < 0 or max_attempts > 0xf:
|
|
raise ValueError('max_attempts must be 4 bit')
|
|
if retries_left < 0 or max_attempts > 0xf:
|
|
raise ValueError('retries_left must be 4 bit')
|
|
pin = {
|
|
'keyReference': key_ref,
|
|
'pinValue': pin_value,
|
|
'maxNumOfAttemps-retryNumLeft': (max_attempts << 4) | retries_left,
|
|
'pinAttributes': pin_attrib,
|
|
}
|
|
if unblock_ref:
|
|
pin['unblockingPINReference'] = unblock_ref
|
|
self.decoded['pinCodes'][1].append(pin)
|
|
|
|
|
|
class ProfileElementTelecom(FsProfileElement):
|
|
"""Class representing the ProfileElement for DF.TELECOM"""
|
|
type = 'telecom'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['templateID'] = str(oid.DF_TELECOM_v2)
|
|
for fname in ['df-telecom', 'ef-arr']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementCD(FsProfileElement):
|
|
"""Class representing the ProfileElement for DF.CD"""
|
|
type = 'cd'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['templateID'] = str(oid.DF_CD)
|
|
for fname in ['df-cd']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementPhonebook(FsProfileElement):
|
|
"""Class representing the ProfileElement for DF.PHONEBOOK"""
|
|
type = 'phonebook'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.DF_PHONEBOOK_ADF_USIM)
|
|
for fname in ['df-phonebook']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementGsmAccess(FsProfileElement):
|
|
"""Class representing the ProfileElement for ADF.USIM/DF.GSM-ACCESS"""
|
|
type = 'gsm-access'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.DF_GSM_ACCESS_ADF_USIM)
|
|
for fname in ['df-gsm-access']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementDf5GS(FsProfileElement):
|
|
"""Class representing the ProfileElement for ADF.USIM/DF.5GS"""
|
|
type = 'df-5gs'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.DF_5GS_v3)
|
|
for fname in ['df-df-5gs']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementEAP(FsProfileElement):
|
|
"""Class representing the ProfileElement for DF.EAP"""
|
|
type = 'eap'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.DF_EAP)
|
|
for fname in ['df-eap', 'ef-eapstatus']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementDfSAIP(FsProfileElement):
|
|
"""Class representing the ProfileElement for DF.SAIP"""
|
|
type = 'df-saip'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.DF_SAIP)
|
|
for fname in ['df-df-saip']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementDfSNPN(FsProfileElement):
|
|
"""Class representing the ProfileElement for DF.SNPN"""
|
|
type = 'df-snpn'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.DF_SNPN)
|
|
for fname in ['df-df-snpn']:
|
|
self.decoded[fname] = []
|
|
|
|
class ProfileElementDf5GProSe(FsProfileElement):
|
|
"""Class representing the ProfileElement for DF.5GProSe"""
|
|
type = 'df-5gprose'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded['templateID'] = str(oid.DF_5GProSe)
|
|
for fname in ['df-df-5g-prose']:
|
|
self.decoded[fname] = []
|
|
|
|
|
|
class SecurityDomainKeyComponent:
|
|
"""Representation of a key-component of a key for a security domain."""
|
|
def __init__(self, key_type: str, key_data: bytes, mac_length: int = 8):
|
|
self.key_type = key_type
|
|
self.key_data = key_data
|
|
self.mac_length = mac_length
|
|
|
|
def __repr__(self) -> str:
|
|
return 'SdKeyComp(type=%s, mac_len=%u, data=%s)' % (self.key_type, self.mac_length,
|
|
b2h(self.key_data))
|
|
|
|
|
|
@classmethod
|
|
def from_saip_dict(cls, saip: dict) -> 'SecurityDomainKeyComponent':
|
|
"""Construct instance from the dict as generated by SAIP asn.1 decoder."""
|
|
return cls(KeyType.parse(saip['keyType']), saip['keyData'], saip['macLength'])
|
|
|
|
def to_saip_dict(self) -> dict:
|
|
"""Express instance in the dict format required by SAIP asn.1 encoder."""
|
|
return {'keyType': KeyType.build(self.key_type),
|
|
'keyData': self.key_data,
|
|
'macLength': self.mac_length}
|
|
|
|
class SecurityDomainKey:
|
|
"""Representation of a key used for SCP access to a security domain."""
|
|
def __init__(self, key_version_number: int, key_id: int, key_usage_qualifier: dict,
|
|
key_components: List[SecurityDomainKeyComponent]):
|
|
self.key_usage_qualifier = key_usage_qualifier
|
|
self.key_identifier = key_id
|
|
self.key_version_number = key_version_number
|
|
self.key_components = key_components
|
|
|
|
def __repr__(self) -> str:
|
|
return 'SdKey(KVN=0x%02x, ID=0x%02x, Usage=0x%x, Comp=%s)' % (self.key_version_number,
|
|
self.key_identifier,
|
|
build_construct(KeyUsageQualifier, self.key_usage_qualifier)[0],
|
|
repr(self.key_components))
|
|
|
|
@classmethod
|
|
def from_saip_dict(cls, saip: dict) -> 'SecurityDomainKey':
|
|
"""Construct instance from the dict as generated by SAIP asn.1 decoder."""
|
|
inst = cls(int.from_bytes(saip['keyVersionNumber'], "big"),
|
|
int.from_bytes(saip['keyIdentifier'], "big"),
|
|
KeyUsageQualifier.parse(saip['keyUsageQualifier']),
|
|
[SecurityDomainKeyComponent.from_saip_dict(x) for x in saip['keyComponents']])
|
|
return inst
|
|
|
|
def to_saip_dict(self) -> dict:
|
|
"""Express instance in the dict format required by SAIP asn.1 encoder."""
|
|
return {'keyUsageQualifier': KeyUsageQualifier.build(self.key_usage_qualifier),
|
|
'keyIdentifier': bytes([self.key_identifier]),
|
|
'keyVersionNumber': bytes([self.key_version_number]),
|
|
'keyComponents': [k.to_saip_dict() for k in self.key_components]}
|
|
|
|
def get_key_component(self, key_type):
|
|
for kc in self.key_components:
|
|
if kc.key_type == key_type:
|
|
return kc.key_data
|
|
return None
|
|
|
|
|
|
class ProfileElementSD(ProfileElement):
|
|
"""Class representing a securityDomain ProfileElement."""
|
|
type = 'securityDomain'
|
|
|
|
# TODO: New parameters "openPersoData" and "catTpParameters" have been defined for Security
|
|
# Domains in v2.2. A V2.0 or V2.1 eUICC may reject these new options; it is hence recommended to
|
|
# avoid using such parameters in Profiles downloaded to a V2.0 or V2.1 eUICC.
|
|
|
|
class C9(BER_TLV_IE, tag=0xC9, nested=UiccSdInstallParams):
|
|
pass
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
self._post_decode()
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['instance'] = {
|
|
'applicationLoadPackageAID': h2b('A0000001515350'),
|
|
'classAID': h2b('A000000251535041'),
|
|
'instanceAID': h2b('A000000151000000'),
|
|
# Optional: extraditeSecurityDomainAID
|
|
'applicationPrivileges': h2b('82FC80'),
|
|
# Optioal: lifeCycleState
|
|
'applicationSpecificParametersC9': h2b('8201f09301f08701f0'), # we assume user uses add_scp()
|
|
# Optional: systemSpecificParameters
|
|
'applicationParameters': {
|
|
# TAR: B20100, MSL: 12
|
|
'uiccToolkitApplicationSpecificParametersField': h2b('0100000100000002011203B2010000'),
|
|
},
|
|
# Optional: processData
|
|
# Optional: controlReferenceTemplate
|
|
}
|
|
self.decoded['keyList'] = [] # we assume user uses add_key() method for all keys
|
|
# Optional: sdPersoData
|
|
# Optional: openPersoData
|
|
# Optional: catTpParameters
|
|
self._post_decode()
|
|
|
|
def _post_decode(self):
|
|
self.usip = self.C9()
|
|
self.usip.from_bytes(self.decoded['instance']['applicationSpecificParametersC9'])
|
|
self.keys = [SecurityDomainKey.from_saip_dict(x) for x in self.decoded['keyList']]
|
|
|
|
def _pre_encode(self):
|
|
self.decoded['keyList'] = [x.to_saip_dict() for x in self.keys]
|
|
self.decoded['instance']['applicationSpecificParametersC9'] = self.usip.to_bytes()
|
|
|
|
def has_scp(self, scp: int) -> bool:
|
|
"""Determine if SD Installation parameters already specify given SCP."""
|
|
return self.usip.nested_collection.has_scp(scp)
|
|
|
|
def add_scp(self, scp: int, i: int):
|
|
"""Add given SCP (and i parameter) to list of SCP of the Security Domain Install Params.
|
|
Example: add_scp(0x03, 0x70) for SCP03, or add_scp(0x02, 0x55) for SCP02."""
|
|
self.usip.nested_collection.add_scp(scp, i)
|
|
self._pre_encode()
|
|
|
|
def remove_scp(self, scp: int):
|
|
"""Remove given SCP from list of SCP of the Security Domain Install Params."""
|
|
self.usip.nested_collection.remove_scp(scp)
|
|
self._pre_encode()
|
|
|
|
def find_key(self, key_version_number: int, key_id: int) -> Optional[SecurityDomainKey]:
|
|
"""Find and return (if any) the SecurityDomainKey for given KVN + KID."""
|
|
for k in self.keys:
|
|
if k.key_version_number == key_version_number and k.key_identifier == key_id:
|
|
return k
|
|
return None
|
|
|
|
def add_key(self, key: SecurityDomainKey):
|
|
"""Add a given SecurityDomainKey to the keyList of the securityDomain."""
|
|
if self.find_key(key.key_version_number, key.key_identifier):
|
|
raise ValueError('Key for KVN=0x%02x / KID=0x%02x already exists' % (key.key_version_number,
|
|
key.key_identifier))
|
|
self.keys.append(key)
|
|
self._pre_encode()
|
|
|
|
def remove_key(self, key_version_number: int, key_id: int):
|
|
key = self.find_key(key_version_number, key_id)
|
|
if not key:
|
|
raise ValueError('No key for KVN=0x%02x / KID=0x%02x found' % (key_version_number, key_id))
|
|
self.keys.remove(key)
|
|
self._pre_encode()
|
|
|
|
class ProfileElementSSD(ProfileElementSD):
|
|
"""Class representing a securityDomain ProfileElement for a SSD."""
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# defaults [overriding ProfileElementSD) taken from SAIP v2.3.1 Section 11.2.12
|
|
self.decoded['instance']['instanceAID'] = h2b('A00000055910100102736456616C7565')
|
|
self.decoded['instance']['applicationPrivileges'] = h2b('808000')
|
|
self.decoded['instance']['applicationParameters'] = {
|
|
# TAR: 6C7565, MSL: 12
|
|
'uiccToolkitApplicationSpecificParametersField': h2b('01000001000000020112036C756500'),
|
|
}
|
|
|
|
class ProfileElementApplication(ProfileElement):
|
|
"""Class representing an application ProfileElement."""
|
|
type = 'application'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
|
|
@classmethod
|
|
def from_file(cls,
|
|
filename:str,
|
|
aid:Hexstr,
|
|
sd_aid:Hexstr = None,
|
|
non_volatile_code_limit:int = None,
|
|
volatile_data_limit:int = None,
|
|
non_volatile_data_limit:int = None,
|
|
hash_value:Hexstr = None) -> 'ProfileElementApplication':
|
|
"""Fill contents of application ProfileElement from a .cap file."""
|
|
|
|
inst = cls()
|
|
Construct_data_limit = StripHeaderAdapter(GreedyBytes, 4, steps = [2,4])
|
|
|
|
if filename.lower().endswith('.cap'):
|
|
cap = javacard.CapFile(filename)
|
|
load_block_object = cap.get_loadfile()
|
|
elif filename.lower().endswith('.ijc'):
|
|
fd = open(filename, 'rb')
|
|
load_block_object = fd.read()
|
|
else:
|
|
raise ValueError('Invalid file type, file must either .cap or .ijc')
|
|
|
|
# Mandatory
|
|
inst.decoded['loadBlock'] = {
|
|
'loadPackageAID': h2b(aid),
|
|
'loadBlockObject': load_block_object
|
|
}
|
|
|
|
# Optional
|
|
if sd_aid:
|
|
inst.decoded['loadBlock']['securityDomainAID'] = h2b(sd_aid)
|
|
if non_volatile_code_limit:
|
|
inst.decoded['loadBlock']['nonVolatileCodeLimitC6'] = Construct_data_limit.build(non_volatile_code_limit)
|
|
if volatile_data_limit:
|
|
inst.decoded['loadBlock']['volatileDataLimitC7'] = Construct_data_limit.build(volatile_data_limit)
|
|
if non_volatile_data_limit:
|
|
inst.decoded['loadBlock']['nonVolatileDataLimitC8'] = Construct_data_limit.build(non_volatile_data_limit)
|
|
if hash_value:
|
|
inst.decoded['loadBlock']['hashValue'] = h2b(hash_value)
|
|
|
|
return inst
|
|
|
|
def to_file(self, filename:str):
|
|
"""Write loadBlockObject contents of application ProfileElement to a .cap or .ijc file."""
|
|
|
|
load_package_aid = b2h(self.decoded['loadBlock']['loadPackageAID'])
|
|
load_block_object = self.decoded['loadBlock']['loadBlockObject']
|
|
|
|
if filename.lower().endswith('.cap'):
|
|
with io.BytesIO(load_block_object) as f, zipfile.ZipFile(filename, 'w') as z:
|
|
javacard.ijc_to_cap(f, z, load_package_aid)
|
|
elif filename.lower().endswith('.ijc'):
|
|
with open(filename, 'wb') as f:
|
|
f.write(load_block_object)
|
|
else:
|
|
raise ValueError('Invalid file type, file must either .cap or .ijc')
|
|
|
|
def add_instance(self,
|
|
aid:Hexstr,
|
|
class_aid:Hexstr,
|
|
inst_aid:Hexstr,
|
|
app_privileges:Hexstr,
|
|
app_spec_pars:Hexstr,
|
|
uicc_toolkit_app_spec_pars:Hexstr = None,
|
|
uicc_access_app_spec_pars:Hexstr = None,
|
|
uicc_adm_access_app_spec_pars:Hexstr = None,
|
|
volatile_memory_quota:Hexstr = None,
|
|
non_volatile_memory_quota:Hexstr = None,
|
|
process_data:list[Hexstr] = None):
|
|
"""Create a new instance and add it to the instanceList"""
|
|
|
|
# Mandatory
|
|
inst = {'applicationLoadPackageAID': h2b(aid),
|
|
'classAID': h2b(class_aid),
|
|
'instanceAID': h2b(inst_aid),
|
|
'applicationPrivileges': h2b(app_privileges),
|
|
'applicationSpecificParametersC9': h2b(app_spec_pars)}
|
|
|
|
# Optional
|
|
if uicc_toolkit_app_spec_pars or uicc_access_app_spec_pars or uicc_adm_access_app_spec_pars:
|
|
inst['applicationParameters'] = {}
|
|
if uicc_toolkit_app_spec_pars:
|
|
inst['applicationParameters']['uiccToolkitApplicationSpecificParametersField'] = \
|
|
h2b(uicc_toolkit_app_spec_pars)
|
|
if uicc_access_app_spec_pars:
|
|
inst['applicationParameters']['uiccAccessApplicationSpecificParametersField'] = \
|
|
h2b(uicc_access_app_spec_pars)
|
|
if uicc_adm_access_app_spec_pars:
|
|
inst['applicationParameters']['uiccAdministrativeAccessApplicationSpecificParametersField'] = \
|
|
h2b(uicc_adm_access_app_spec_pars)
|
|
if volatile_memory_quota is not None or non_volatile_memory_quota is not None:
|
|
inst['systemSpecificParameters'] = {}
|
|
Construct_data_limit = StripHeaderAdapter(GreedyBytes, 4, steps = [2,4])
|
|
if volatile_memory_quota is not None:
|
|
inst['systemSpecificParameters']['volatileMemoryQuotaC7'] = \
|
|
Construct_data_limit.build(volatile_memory_quota)
|
|
if non_volatile_memory_quota is not None:
|
|
inst['systemSpecificParameters']['nonVolatileMemoryQuotaC8'] = \
|
|
Construct_data_limit.build(non_volatile_memory_quota)
|
|
if len(process_data) > 0:
|
|
inst['processData'] = []
|
|
for proc in process_data:
|
|
inst['processData'].append(h2b(proc))
|
|
|
|
# Append created instance to instance list
|
|
if 'instanceList' not in self.decoded.keys():
|
|
self.decoded['instanceList'] = []
|
|
self.decoded['instanceList'].append(inst)
|
|
|
|
def remove_instance(self, inst_aid:Hexstr):
|
|
"""Remove an instance from the instanceList"""
|
|
inst_list = self.decoded.get('instanceList', [])
|
|
for inst in enumerate(inst_list):
|
|
if b2h(inst[1].get('instanceAID', None)) == inst_aid:
|
|
inst_list.pop(inst[0])
|
|
return
|
|
raise ValueError("instance AID: '%s' not present in instanceList, cannot remove instance" % inst[1])
|
|
|
|
|
|
|
|
class ProfileElementRFM(ProfileElement):
|
|
"""Class representing the ProfileElement for RFM (Remote File Management)."""
|
|
type = 'rfm'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None,
|
|
inst_aid: Optional[bytes] = None, sd_aid: Optional[bytes] = None,
|
|
adf_aid: Optional[bytes] = None,
|
|
tar_list: Optional[List[bytes]] = [], msl: Optional[int] = 0x06, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
ADM1_ACCESS = h2b('02000100')
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['instanceAID'] = inst_aid
|
|
self.decoded['securityDomainAID'] = sd_aid
|
|
self.decoded['tarList'] = tar_list
|
|
self.decoded['minimumSecurityLevel'] = bytes([msl])
|
|
self.decoded['uiccAccessDomain'] = ADM1_ACCESS
|
|
self.decoded['uiccAdminAccessDomain'] = ADM1_ACCESS
|
|
if adf_aid:
|
|
self.decoded['adfRFMAccess'] = {
|
|
'adfAID': adf_aid,
|
|
'adfAccessDomain': ADM1_ACCESS,
|
|
'adfAdminAccessDomain': ADM1_ACCESS,
|
|
}
|
|
|
|
class ProfileElementUSIM(FsProfileElement):
|
|
"""Class representing the ProfileElement for ADF.USIM Mandatory Files"""
|
|
type = 'usim'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['templateID'] = str(oid.ADF_USIM_by_default_v2)
|
|
for fname in ['adf-usim', 'ef-imsi', 'ef-arr', 'ef-ust', 'ef-spn', 'ef-est', 'ef-acc', 'ef-ecc']:
|
|
self.decoded[fname] = []
|
|
|
|
@property
|
|
def adf_name(self) -> str:
|
|
return b2h(self.decoded['adf-usim'][0][1]['dfName'])
|
|
|
|
@property
|
|
def imsi(self) -> Optional[str]:
|
|
template = templates.ProfileTemplateRegistry.get_by_oid(self.templateID)
|
|
f = File('ef-imsi', self.decoded['ef-imsi'], template.files_by_pename.get('ef-imsi', None))
|
|
return dec_imsi(b2h(f.body))
|
|
|
|
class ProfileElementOptUSIM(FsProfileElement):
|
|
"""Class representing the ProfileElement for ADF.USIM Optional Files"""
|
|
type = 'opt-usim'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['templateID'] = str(oid.ADF_USIMopt_not_by_default_v2)
|
|
|
|
class ProfileElementISIM(FsProfileElement):
|
|
"""Class representing the ProfileElement for ADF.ISIM Mandatory Files"""
|
|
type = 'isim'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['templateID'] = str(oid.ADF_ISIM_by_default)
|
|
for fname in ['adf-isim', 'ef-impi', 'ef-impu', 'ef-domain', 'ef-ist', 'ef-arr']:
|
|
self.decoded[fname] = []
|
|
|
|
@property
|
|
def adf_name(self) -> str:
|
|
return b2h(self.decoded['adf-isim'][0][1]['dfName'])
|
|
|
|
class ProfileElementOptISIM(FsProfileElement):
|
|
"""Class representing the ProfileElement for ADF.ISIM Optional Files"""
|
|
type = 'opt-isim'
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.decoded['templateID'] = str(oid.ADF_ISIMopt_not_by_default_v2)
|
|
|
|
|
|
class ProfileElementAKA(ProfileElement):
|
|
"""Class representing the ProfileElement for Authentication and Key Agreement (AKA)."""
|
|
type = 'akaParameter'
|
|
# TODO: RES size for USIM test algorithm can be set to 32, 64 or 128 bits. This value was
|
|
# previously limited to 128 bits. Recommendation: Avoid using RES size 32 or 64 in Profiles
|
|
# downloaded to V2.1 eUICCs.
|
|
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults for a MNO-SD
|
|
self.set_milenage(b'\x00'*16, b'\x00'*16)
|
|
|
|
def _fixup_sqnInit_dec(self) -> None:
|
|
"""asn1tools has a bug when working with SEQUENCE OF that have DEFAULT values. Let's work around
|
|
this."""
|
|
sqn_init = self.decoded.get('sqnInit', None)
|
|
if not sqn_init:
|
|
return
|
|
# this weird '0x' value in a string is what we get from our (slightly hacked) ASN.1 syntax
|
|
if sqn_init == '0x000000000000':
|
|
# SEQUENCE (SIZE (32)) OF OCTET STRING (SIZE (6))
|
|
self.decoded['sqnInit'] = [b'\x00'*6] * 32
|
|
|
|
def _fixup_sqnInit_enc(self) -> None:
|
|
"""asn1tools has a bug when working with SEQUENCE OF that have DEFAULT values. Let's work around
|
|
this."""
|
|
sqn_init = self.decoded.get('sqnInit', None)
|
|
if not sqn_init:
|
|
return
|
|
for s in sqn_init:
|
|
if any(s):
|
|
return
|
|
# none of the fields were initialized with a non-default (non-zero) value, so we can skip it
|
|
del self.decoded['sqnInit']
|
|
|
|
def _post_decode(self):
|
|
# work around asn1tools bug regarding DEFAULT for a SEQUENCE OF
|
|
self._fixup_sqnInit_dec()
|
|
|
|
def _pre_encode(self):
|
|
# work around asn1tools bug regarding DEFAULT for a SEQUENCE OF
|
|
self._fixup_sqnInit_enc()
|
|
|
|
def set_milenage(self, k: bytes, opc: bytes):
|
|
"""Configure akaParametes for MILENAGE."""
|
|
self.decoded['algoConfiguration'] = ('algoParameter', {
|
|
'algorithmID': 1,
|
|
'algorithmOptions': b'\x00', # not relevant for milenage
|
|
'key': k,
|
|
'opc': opc,
|
|
})
|
|
|
|
def set_xor3g(self, k: bytes):
|
|
"""Configure akaParametes for XOR-3G."""
|
|
self.decoded['algoConfiguration'] = ('algoParameter', {
|
|
'algorithmID': 3,
|
|
'algorithmOptions': b'\x00', # not relevant for milenage
|
|
'key': k,
|
|
'opc': b'', # not used for MILENAGE
|
|
})
|
|
|
|
def set_tuak(self, k: bytes, topc: bytes, num_of_keccak: int = 1):
|
|
"""Configure akaParametes for TUAK."""
|
|
self.decoded['algoConfiguration'] = ('algoParameter', {
|
|
'algorithmID': 2,
|
|
'algorithmOptions': b'\x00', # not relevant for milenage
|
|
'key': k,
|
|
'opc': topc,
|
|
'numberOfKeccak': bytes([num_of_keccak]),
|
|
})
|
|
|
|
def set_mapping(self, aid: bytes, options: int = 6):
|
|
"""Configure akaParametes for a mapping from another AID."""
|
|
self.decoded['algoConfiguration'] = ('mappingParameter', {
|
|
'mappingOptions': bytes([options]),
|
|
'mappingSource': aid,
|
|
})
|
|
|
|
class ProfileElementHeader(ProfileElement):
|
|
"""Class representing the ProfileElement for the Header of the PE-Sequence."""
|
|
type = 'header'
|
|
def __init__(self, decoded: Optional[dict] = None,
|
|
ver_major: Optional[int] = 2, ver_minor: Optional[int] = 3,
|
|
iccid: Optional[Hexstr] = '0'*20, profile_type: Optional[str] = None,
|
|
**kwargs):
|
|
"""You would usually initialize an instance either with a "decoded" argument (as read from
|
|
a DER-encoded SAIP file via asn1tools), or [some of] the other arguments in case you're
|
|
constructing a Profile Header from scratch.
|
|
|
|
Args:
|
|
decoded: asn1tools-generated decoded structure for this PE
|
|
ver_major: Major SAIP version
|
|
ver_minor: Minor SAIP version
|
|
iccid: ICCID of the profile
|
|
profile_type: operational, testing or bootstrap
|
|
"""
|
|
super().__init__(decoded, **kwargs)
|
|
if decoded:
|
|
return
|
|
# provide some reasonable defaults
|
|
self.decoded = {
|
|
'major-version': ver_major,
|
|
'minor-version': ver_minor,
|
|
'iccid': h2b(iccid),
|
|
'eUICC-Mandatory-services': {}, # needs to be recomputed at the end
|
|
'eUICC-Mandatory-GFSTEList': [], # needs to be recomputed at the end
|
|
}
|
|
if profile_type:
|
|
self.decoded['profileType'] = profile_type
|
|
|
|
def mandatory_service_add(self, service_name):
|
|
self.decoded['eUICC-Mandatory-services'][service_name] = None
|
|
|
|
def mandatory_service_remove(self, service_name):
|
|
if service_name in self.decoded['eUICC-Mandatory-services'].keys():
|
|
del self.decoded['eUICC-Mandatory-services'][service_name]
|
|
else:
|
|
raise ValueError("service not in eUICC-Mandatory-services list, cannot remove")
|
|
|
|
class ProfileElementEnd(ProfileElement):
|
|
"""Class representing the ProfileElement for the End of the PE-Sequence."""
|
|
type = 'end'
|
|
def __init__(self, decoded: Optional[dict] = None, **kwargs):
|
|
super().__init__(decoded, **kwargs)
|
|
|
|
def bertlv_first_segment(binary: bytes) -> Tuple[bytes, bytes]:
|
|
"""obtain the first segment of a binary concatenation of BER-TLV objects.
|
|
Returns: tuple of first TLV and remainder."""
|
|
_tagdict, remainder = bertlv_parse_tag(binary)
|
|
length, remainder = bertlv_parse_len(remainder)
|
|
tl_length = len(binary) - len(remainder)
|
|
tlv_length = tl_length + length
|
|
return binary[:tlv_length], binary[tlv_length:]
|
|
|
|
class ProfileElementSequence:
|
|
"""A sequence of ProfileElement objects, which is the overall representation of an eSIM profile.
|
|
|
|
This primarily contains a list of PEs (pe_list member) as well as a number of convenience indexes
|
|
like the pe_by_type and pes_by_naa dicts that allow easier access to individual PEs within the
|
|
sequence."""
|
|
def __init__(self):
|
|
"""After calling the constructor, you have to further initialize the instance by either
|
|
calling the parse_der() method, or by manually adding individual PEs, including the header and
|
|
end PEs."""
|
|
self.pe_list: List[ProfileElement] = []
|
|
self.pe_by_type: Dict = {}
|
|
self.pes_by_naa: Dict = {}
|
|
self.mf: Optional[FsNodeMF] = None
|
|
self._cur_df: Optional[FsNodeDF] = None # current DF while adding files from FS-templates
|
|
|
|
@property
|
|
def cur_df(self) -> Optional['FsNodeDF']:
|
|
"""Current DF; this is where the next files are created."""
|
|
return self._cur_df
|
|
|
|
@cur_df.setter
|
|
def cur_df(self, new_df: 'FsNodeDF'):
|
|
if self._cur_df == new_df:
|
|
return
|
|
self._cur_df = new_df
|
|
|
|
def add_hdr_and_end(self):
|
|
"""Initialize the PE Sequence with a header and end PE."""
|
|
if len(self.pe_list):
|
|
raise ValueError("Cannot add header + end PE to a non-empty PE-Sequence")
|
|
# start with a minimal/empty sequence of header + end
|
|
self.append(ProfileElementHeader())
|
|
self.append(ProfileElementEnd())
|
|
|
|
def append(self, pe: ProfileElement):
|
|
"""Append a given PE to the end of the PE Sequence"""
|
|
self.pe_list.append(pe)
|
|
self._process_pelist()
|
|
self.renumber_identification()
|
|
|
|
def get_pes_for_type(self, tname: str) -> List[ProfileElement]:
|
|
"""Return list of profile elements present for given profile element type."""
|
|
return self.pe_by_type.get(tname, [])
|
|
|
|
def get_pe_for_type(self, tname: str) -> Optional[ProfileElement]:
|
|
"""Return a single profile element for given profile element type. Works only for
|
|
types of which there is only a single instance in the PE Sequence!"""
|
|
l = self.get_pes_for_type(tname)
|
|
if len(l) == 0:
|
|
return None
|
|
assert len(l) == 1
|
|
return l[0]
|
|
|
|
def get_pes_for_templateID(self, tid: oid.OID) -> List[ProfileElement]:
|
|
"""Return list of profile elements present for given profile element type."""
|
|
res = []
|
|
for pe in self.pe_list:
|
|
if not pe.templateID:
|
|
continue
|
|
if tid.prefix_match(pe.templateID):
|
|
res.append(pe)
|
|
return res
|
|
|
|
def get_closest_prev_pe_for_templateID(self, cur: ProfileElement, tid: oid.OID) -> Optional[ProfileElement]:
|
|
"""Return the PE of given templateID that is the closest PE prior to the given PE in the
|
|
PE-Sequence."""
|
|
try:
|
|
cur_idx = self.pe_list.index(cur)
|
|
except ValueError:
|
|
# we must be building the pe_list and cur is not yet part: scan from end of list
|
|
cur_idx = len(self.pe_list)
|
|
for i in reversed(range(cur_idx)):
|
|
pe = self.pe_list[i]
|
|
if not pe.templateID:
|
|
continue
|
|
if tid.prefix_match(pe.templateID):
|
|
return pe
|
|
|
|
def parse_der(self, der: bytes) -> None:
|
|
"""Parse a sequence of PE from SAIP DER format and store the result in self.pe_list."""
|
|
self.pe_list = []
|
|
remainder = der
|
|
while len(remainder):
|
|
first_tlv, remainder = bertlv_first_segment(remainder)
|
|
self.pe_list.append(ProfileElement.from_der(first_tlv, pe_sequence=self))
|
|
self._process_pelist()
|
|
|
|
def _process_pelist(self) -> None:
|
|
"""Post-process the PE-list; update convenience accessor dicts."""
|
|
self._rebuild_pe_by_type()
|
|
self._rebuild_pes_by_naa()
|
|
|
|
def _rebuild_pe_by_type(self) -> None:
|
|
"""Re-build the self.pe_by_type convenience accessor dict."""
|
|
self.pe_by_type = {}
|
|
# build a dict {pe_type: [pe, pe, pe]}
|
|
for pe in self.pe_list:
|
|
if pe.type in self.pe_by_type:
|
|
self.pe_by_type[pe.type].append(pe)
|
|
else:
|
|
self.pe_by_type[pe.type] = [pe]
|
|
|
|
def _rebuild_pes_by_naa(self) -> None:
|
|
"""rebuild the self.pes_by_naa dict {naa: [ [pe, pe, pe], [pe, pe] ]} form,
|
|
which basically means for every NAA there's a list of instances, and each consists
|
|
of a list of a list of PEs."""
|
|
self.pres_by_naa = {}
|
|
petype_not_naa_related = ['securityDomain', 'rfm', 'application', 'end']
|
|
naa = ['mf', 'usim', 'isim', 'csim']
|
|
cur_naa = None
|
|
cur_naa_list = []
|
|
for pe in self.pe_list:
|
|
# skip all PE that are not related to NAA
|
|
if pe.type in petype_not_naa_related:
|
|
continue
|
|
if pe.type in naa:
|
|
if cur_naa:
|
|
if not cur_naa in self.pes_by_naa:
|
|
self.pes_by_naa[cur_naa] = []
|
|
self.pes_by_naa[cur_naa].append(cur_naa_list)
|
|
cur_naa = pe.type
|
|
cur_naa_list = []
|
|
cur_naa_list.append(pe)
|
|
# append the final one
|
|
if cur_naa and len(cur_naa_list) > 0:
|
|
if not cur_naa in self.pes_by_naa:
|
|
self.pes_by_naa[cur_naa] = []
|
|
self.pes_by_naa[cur_naa].append(cur_naa_list)
|
|
|
|
def rebuild_mandatory_services(self):
|
|
"""(Re-)build the eUICC Mandatory services list of the ProfileHeader based on what's in the
|
|
PE-Sequence. You would normally call this at the very end, before encoding a PE-Sequence
|
|
to its DER format."""
|
|
@staticmethod
|
|
def file_type_walker(node: 'FsNode', **kwargs):
|
|
return node.file.file_type
|
|
# services that we cannot auto-determine and which must hence be manually specified
|
|
manual_services = ['contactless', 'mbms', 'cat-tp', 'suciCalculatorApi', 'dns-resolution',
|
|
'scp11ac', 'scp11c-authorization-mechanism', 's16mode', 'eaka']
|
|
svc_set = set()
|
|
# rebuild mandatory NAAs
|
|
for naa in self.pes_by_naa.keys():
|
|
if naa not in ['usim', 'isim', 'csim']:
|
|
continue
|
|
# see if any of the instances is mandatory
|
|
for inst in self.pes_by_naa[naa]:
|
|
if 'mandated' in inst[0].header:
|
|
svc_set.add(naa)
|
|
# rebuild algorithms of all mandatory akaParameters
|
|
for aka in self.get_pes_for_type('akaParameter'):
|
|
if 'mandated' in aka.header:
|
|
if aka.decoded['algoConfiguration'][0] == 'algoParameter':
|
|
algo_par = aka.decoded['algoConfiguration'][1]
|
|
algo_id = algo_par['algorithmID']
|
|
if algo_id == 1:
|
|
svc_set.add('milenage')
|
|
elif algo_id == 2:
|
|
if len(algo_par['key']) == 32:
|
|
svc_set.add('tuak256')
|
|
else:
|
|
svc_set.add('tuak128')
|
|
elif algo_id == 3:
|
|
svc_set.add('usim-test-algorithm')
|
|
# rebuild algorithms of all mandatory cdmaParameter
|
|
for cdma in self.get_pes_for_type('cdmaParameter'):
|
|
if 'mandated' in cdma.header:
|
|
svc_set.add('cave')
|
|
# TODO: gba-{usim,isim} (determine from EF.GBA* ?)
|
|
# determine if EAP is mandatory
|
|
for eap in self.get_pes_for_type('eap'):
|
|
if 'mandated' in eap.header:
|
|
svc_set.add('eap')
|
|
# determine if javacard is mandatory
|
|
for app in self.get_pes_for_type('application'):
|
|
if 'mandated' in app.header:
|
|
# javacard / multos distinction is not automatically possible, but multos is hypothetical
|
|
svc_set.add('javacard')
|
|
# recompute multiple-{usim,isim,csim}
|
|
for naa_name in ['usim','isim','csim']:
|
|
if naa_name in self.pes_by_naa[naa]:
|
|
if len(self.pes_by_naa[naa]) > 1:
|
|
svc_set.add('multiple-' + naa_name)
|
|
# BER-TLV (recursively scan all files for related type)
|
|
ftype_list = self.mf.walk(file_type_walker)
|
|
if 'BT' in ftype_list:
|
|
svc_set.add('ber-tlv')
|
|
# FIXME:dfLinked files (scan all files, check for non-empty Fcp.linkPath presence of DFs)
|
|
# TODO: 5G related bits (derive from EF.UST or file presence?)
|
|
hdr_pe = self.get_pe_for_type('header')
|
|
# patch in the 'manual' services from the existing list:
|
|
for old_svc in hdr_pe.decoded['eUICC-Mandatory-services'].keys():
|
|
if old_svc in manual_services:
|
|
svc_set.add(old_svc)
|
|
hdr_pe.decoded['eUICC-Mandatory-services'] = {x: None for x in svc_set}
|
|
|
|
def rebuild_mandatory_gfstelist(self):
|
|
"""(Re-)build the eUICC Mandatory GFSTEList of the ProfileHeader based on what's in the
|
|
PE-Sequence. You would normally call this at the very end, before encoding a PE-Sequence
|
|
to its DER format."""
|
|
template_set = set()
|
|
for pe in self.pe_list:
|
|
if pe.header and 'mandated' in pe.header:
|
|
if 'templateID' in pe.decoded:
|
|
template_set.add(pe.decoded['templateID'])
|
|
hdr_pe = self.get_pe_for_type('header')
|
|
hdr_pe.decoded['eUICC-Mandatory-GFSTEList'] = list(template_set)
|
|
|
|
@classmethod
|
|
def from_der(cls, der: bytes) -> 'ProfileElementSequence':
|
|
"""Construct an instance from given raw, DER encoded bytes."""
|
|
inst = cls()
|
|
inst.parse_der(der)
|
|
return inst
|
|
|
|
def to_der(self) -> bytes:
|
|
"""Build an encoded DER representation of the instance."""
|
|
out = b''
|
|
for pe in self.pe_list:
|
|
out += pe.to_der()
|
|
return out
|
|
|
|
def renumber_identification(self):
|
|
"""Re-generate the 'identification' numbering of all PE headers."""
|
|
i = 1
|
|
for pe in self.pe_list:
|
|
hdr = pe.header
|
|
if not hdr:
|
|
continue
|
|
pe.header['identification'] = i
|
|
i += 1
|
|
|
|
def get_index_by_pe(self, pe: ProfileElement) -> int:
|
|
"""Return a list with the indices of all instances of PEs of petype."""
|
|
ret = []
|
|
i = 0
|
|
for cur in self.pe_list:
|
|
if cur == pe:
|
|
return i
|
|
i += 1
|
|
raise ValueError('PE %s is not part of PE Sequence' % (pe))
|
|
|
|
def insert_at_index(self, idx: int, pe: ProfileElement) -> None:
|
|
"""Insert a given [new] ProfileElement at given index into the PE Sequence."""
|
|
self.pe_list.insert(idx, pe)
|
|
self._process_pelist()
|
|
self.renumber_identification()
|
|
|
|
def insert_after_pe(self, pe_before: ProfileElement, pe_new: ProfileElement) -> None:
|
|
"""Insert a given [new] ProfileElement after a given [existing] PE in the PE Sequence."""
|
|
idx = self.get_index_by_pe(pe_before)
|
|
self.insert_at_index(idx+1, pe_new)
|
|
|
|
def get_index_by_type(self, petype: str) -> List[int]:
|
|
"""Return a list with the indices of all instances of PEs of petype."""
|
|
ret = []
|
|
i = 0
|
|
for pe in self.pe_list:
|
|
if pe.type == petype:
|
|
ret.append(i)
|
|
i += 1
|
|
return ret
|
|
|
|
def add_ssd(self, ssd: ProfileElementSSD):
|
|
"""Add a SSD (Supplementary Security Domain) After MNO-SD/ISD-P."""
|
|
# find MNO-SD index
|
|
idx = self.get_index_by_type('securityDomain')[0]
|
|
# insert _after_ MNO-SD
|
|
self.insert_at_index(idx+1, ssd)
|
|
|
|
def remove_naas_of_type(self, naa: Naa) -> None:
|
|
"""Remove all instances of NAAs of given type. This can be used, for example,
|
|
to remove all CSIM NAAs from a profile. Will not just remove the PEs, but also
|
|
any records in 'eUICC-Mandatory-services' or 'eUICC-Mandatory-GFSTEList'."""
|
|
hdr = self.pe_by_type['header'][0]
|
|
# remove any associated mandatory services
|
|
for service in naa.mandatory_services:
|
|
if service in hdr.decoded['eUICC-Mandatory-services']:
|
|
del hdr.decoded['eUICC-Mandatory-services'][service]
|
|
# remove any associated mandatory filesystem templates
|
|
for template in naa.templates:
|
|
hdr.decoded['eUICC-Mandatory-GFSTEList'] = [x for x in hdr.decoded['eUICC-Mandatory-GFSTEList'] if not template.prefix_match(x)]
|
|
# determine the ADF names (AIDs) of all NAA ADFs
|
|
naa_adf_names = []
|
|
if naa.pe_types[0] in self.pe_by_type:
|
|
for pe in self.pe_by_type[naa.pe_types[0]]:
|
|
adf_name = naa.adf_name()
|
|
adf = File(adf_name, pe.decoded[adf_name])
|
|
naa_adf_names.append(adf.df_name)
|
|
# remove PEs of each NAA instance
|
|
if naa.name in self.pes_by_naa:
|
|
for inst in self.pes_by_naa[naa.name]:
|
|
# delete all the PEs of the NAA
|
|
self.pe_list = [pe for pe in self.pe_list if pe not in inst]
|
|
self._process_pelist()
|
|
# remove any RFM PEs for the just-removed ADFs
|
|
if 'rfm' in self.pe_by_type:
|
|
to_delete_pes = []
|
|
for rfm_pe in self.pe_by_type['rfm']:
|
|
if 'adfRFMAccess' in rfm_pe.decoded:
|
|
if rfm_pe.decoded['adfRFMAccess']['adfAID'] in naa_adf_names:
|
|
to_delete_pes.append(rfm_pe)
|
|
self.pe_list = [pe for pe in self.pe_list if pe not in to_delete_pes]
|
|
self._process_pelist()
|
|
# TODO: remove any records related to the ADFs from EF.DIR
|
|
|
|
@staticmethod
|
|
def naa_for_path(path: Path) -> Optional[Naa]:
|
|
"""determine the NAA for the given path"""
|
|
rel_path = path.relative_to_mf()
|
|
if len(rel_path) == 0:
|
|
# this is the MF itself
|
|
return None
|
|
df = rel_path[0]
|
|
if df == 'ADF.USIM':
|
|
return NaaUsim
|
|
elif df == 'ADF.ISIM':
|
|
return NaaIsim
|
|
elif df == 'ADF.CSIM':
|
|
return NaaCsim
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def peclass_for_path(path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]:
|
|
"""Return the ProfileElement class that can contain a file with given path."""
|
|
naa = ProfileElementSequence.naa_for_path(path)
|
|
if naa:
|
|
# TODO: find specific PE within Naa
|
|
for pet_name in naa.pe_types:
|
|
pe_cls = ProfileElement.class_for_petype(pet_name)
|
|
ft = pe_cls().file_template_for_path(path.relative_to_mf(), adf='ADF.'+naa.name.upper())
|
|
if ft:
|
|
return pe_cls, ft
|
|
else:
|
|
rel_path = path.relative_to_mf()
|
|
if len(rel_path) == 0:
|
|
ft = ProfileElementMF().file_template_for_path(Path(['MF']))
|
|
return ProfileElementMF, ft
|
|
f = rel_path[0]
|
|
if f.startswith('EF') or len(path) == 1 and path[0] == 'MF':
|
|
ft = ProfileElementMF().file_template_for_path(path)
|
|
if not ft:
|
|
return ProfileElementGFM, None
|
|
return ProfileElementMF, ft
|
|
if f == 'DF.CD':
|
|
ft = ProfileElementCD().file_template_for_path(rel_path)
|
|
if not ft:
|
|
return ProfileElementGFM, None
|
|
return ProfileElementCD, ft
|
|
if f == 'DF.TELECOM':
|
|
ft = ProfileElementTelecom().file_template_for_path(rel_path)
|
|
if not ft:
|
|
return ProfileElementGFM, None
|
|
return ProfileElementTelecom, ft
|
|
return ProfileElementGFM, None
|
|
|
|
def pe_for_path(self, path: Path) -> Tuple[Optional[ProfileElement], Optional[templates.FileTemplate]]:
|
|
"""Return the ProfileElement instance that can contain a file with matching path. This will
|
|
either be an existing PE within the sequence, or it will be a newly-allocated PE that is
|
|
inserted into the sequence."""
|
|
pe_class, ft = ProfileElementSequence.peclass_for_path(path)
|
|
logger.debug("peclass_for_path(%s): %s, %s" % (path, pe_class, repr(ft)))
|
|
if not pe_class:
|
|
raise NotImplementedError('No support for GenericFileManagement yet')
|
|
# check if we already have an instance
|
|
# TODO: this assumes we only have one instance of each PE; exception will be raised if not
|
|
pe = self.get_pe_for_type(pe_class.type)
|
|
if not pe:
|
|
# create a new instance
|
|
pe = pe_class(pe_sequence=self)
|
|
# FIXME: add it at the right location in the pe_sequence
|
|
self.append(pe)
|
|
return pe, ft
|
|
|
|
def add_file_at_path(self, path: Path, l: List):
|
|
"""Add a file at given path. This assumes that there's only one instance of USIM/ISIM/CSIM
|
|
inside the profile, as otherwise the path name would not be globally unique."""
|
|
pe, ft = self.pe_for_path(path)
|
|
logger.debug("pe_for_path(%s): %s, %s" % (path, pe, ft))
|
|
pe_name = ft.pe_name if ft else None
|
|
file = File(pe_name, l, template=ft, name=path[-1])
|
|
if isinstance(pe, ProfileElementGFM):
|
|
pe.add_file(file, path)
|
|
else:
|
|
pe.add_file(file)
|
|
return file
|
|
|
|
def __repr__(self) -> str:
|
|
return "PESequence(%s: %s)" % (self.iccid, ', '.join([str(x) for x in self.pe_list]))
|
|
|
|
def __iter__(self) -> str:
|
|
yield from self.pe_list
|
|
|
|
@property
|
|
def iccid(self) -> Optional[str]:
|
|
"""The ICCID of the profile."""
|
|
if not 'header' in self.pe_by_type:
|
|
return None
|
|
if len(self.pe_by_type['header']) < 1:
|
|
return None
|
|
pe_hdr_dec = self.pe_by_type['header'][0].decoded
|
|
if not 'iccid' in pe_hdr_dec:
|
|
return None
|
|
return b2h(pe_hdr_dec['iccid'])
|
|
|
|
def cd(self, path: List[int]):
|
|
"""Change the current directory to the [absolute] "path"."""
|
|
path = list(path) # make a copy before pop below
|
|
# remove the leading MF, in case it's specified explicitly
|
|
while len(path) and path[0] == 0x3f00:
|
|
path.pop(0)
|
|
df = self.mf
|
|
for p in path:
|
|
if not p in df.children:
|
|
raise ValueError("%s doesn't contain child %04X" % (df, p))
|
|
df = df.children[p]
|
|
if not isinstance(df, FsNodeDF):
|
|
raise ValueError("%s is not a DF, cannot change into it" % (df))
|
|
self.cur_df = df
|
|
|
|
|
|
class FsNode:
|
|
"""A node in the filesystem hierarchy. Each node can have a parent node and any number of children.
|
|
Each node is identified uniquely within the parent by its numeric FID and its optional human-readable
|
|
name. Each node usually is associated with an instance of the File class for the actual content of
|
|
the file. FsNode is the base class used by more specific nodes, such as FsNode{EF,DF,ADF,MF}."""
|
|
def __init__(self, fid: int, parent: Optional['FsNode'], file: Optional[File] = None,
|
|
name: Optional[str] = None):
|
|
self.fid = fid
|
|
self.file = file
|
|
self.parent = None
|
|
self._name = name
|
|
if not self._name and self.file and self.file.name:
|
|
self._name = self.file.name
|
|
if parent:
|
|
parent.add_child(self)
|
|
|
|
def __str__(self):
|
|
return '%s(%s)' % (self.__class__.__name__, self.fid_path_str)
|
|
|
|
def __repr__(self):
|
|
return '%s(%s, %s)' % (self.__class__.__name__, self.fid_path_str, self.name_path_str)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._name or '%04X' % self.fid
|
|
|
|
@property
|
|
def fid_path(self) -> List[int]:
|
|
"""Return the path of the node as list of integers."""
|
|
if self.parent and self.parent != self:
|
|
return self.parent.fid_path + [self.fid]
|
|
else:
|
|
return [self.fid]
|
|
|
|
@property
|
|
def name_path(self) -> List[str]:
|
|
"""Return the path of the node as list of integers."""
|
|
if self.parent and self.parent != self:
|
|
return self.parent.name_path + [self.name]
|
|
else:
|
|
return [self.name]
|
|
|
|
@property
|
|
def fid_path_str(self) -> str:
|
|
return "/".join(['%04X' % x for x in self.fid_path])
|
|
|
|
@property
|
|
def name_path_str(self) -> str:
|
|
return "/".join([x for x in self.name_path])
|
|
|
|
@property
|
|
def mf(self) -> 'FsNodeMF':
|
|
"""Return the MF (root) of the hierarchy."""
|
|
x = self
|
|
while x.parent != x:
|
|
x = x.parent
|
|
return x
|
|
|
|
def walk(self, fn, **kwargs):
|
|
"""call 'fn(self, ``**kwargs``) for the File."""
|
|
return [fn(self, **kwargs)]
|
|
|
|
class FsNodeEF(FsNode):
|
|
"""An EF (Entry File) in the filesystem hierarchy."""
|
|
|
|
def print_tree(self, indent: int = 0):
|
|
print("%s%s: %s" % (' '*indent, self, repr(self.file)))
|
|
|
|
class FsNodeDF(FsNode):
|
|
"""A DF (Dedicated File) in the filesystem hierarchy."""
|
|
def __init__(self, fid: int, parent: 'FsNodeDf', file: Optional[File] = None,
|
|
name: Optional[str] = None):
|
|
super().__init__(fid, parent, file, name)
|
|
self.children = {}
|
|
self.children_by_name = {}
|
|
|
|
def __iter__(self) -> FsNode:
|
|
"""Iterator over the children of this DF."""
|
|
yield from self.children.values()
|
|
|
|
def __getitem__(self, fid: Union[int, str]) -> FsNode:
|
|
"""Access child-nodes via dict-like lookup syntax."""
|
|
if fid in self.children_by_name:
|
|
return self.children_by_name[fid]
|
|
else:
|
|
return self.children[fid]
|
|
|
|
def add_child(self, child: FsNode):
|
|
"""Add a child to the list of children of this DF."""
|
|
if child.parent:
|
|
raise ValueError('%s already has parent: %s' % (child, child.parent))
|
|
if child.fid in self.children:
|
|
#raise ValueError('%s already contains %s' % (self, self.children[child.fid]))
|
|
pass
|
|
if child.name in self.children_by_name:
|
|
#raise ValueError('%s already contains %s' % (self, self.children_by_name[child.name]))
|
|
pass
|
|
self.children[child.fid] = child
|
|
self.children_by_name[child.name] = child
|
|
child.parent = self
|
|
|
|
def add_file(self, file: File) -> 'FsNodeDF':
|
|
"""Create and link an appropriate FsNode for the given 'file' and insert it.
|
|
Returns the new current DF (it might have changed)."""
|
|
cur_df = self
|
|
if file.node:
|
|
raise ValueError('File %s already has a node!' % file)
|
|
elif file.file_type in ['TR', 'LF', 'CY', 'BT']:
|
|
file.node = FsNodeEF(file.fid, cur_df, file)
|
|
elif file.file_type == 'DF':
|
|
file.node = FsNodeDF(file.fid, cur_df, file)
|
|
cur_df = file.node
|
|
elif file.file_type == 'ADF':
|
|
# implicit "cd /"
|
|
file.node = FsNodeADF(file.df_name, file.fid, cur_df.mf, file)
|
|
cur_df = file.node
|
|
else:
|
|
raise ValueError("Cannot add %s of unknown file_type %s to tree" % (file, file.file_type))
|
|
logger.debug("%s.add_file(%s) -> return cur_df=%s" % (self, file, cur_df))
|
|
return cur_df
|
|
|
|
def print_tree(self, indent: int = 0):
|
|
print("%s%s: %s" % (' '*indent, self, repr(self.file)))
|
|
for c in self: # using the __iter__ method above
|
|
c.print_tree(indent+1)
|
|
|
|
def lookup_by_path(self, path: Path) -> FsNode:
|
|
"""Look-up a FsNode based on the [name based] given (absolute) path."""
|
|
rel_path = path.relative_to_mf()
|
|
cur = self.mf
|
|
for d in rel_path:
|
|
if not d in cur.children_by_name:
|
|
raise KeyError('Could not find %s in %s while looking up %s from %s' % (d, cur, path, self))
|
|
cur = cur.children_by_name[d]
|
|
return cur
|
|
|
|
def lookup_by_fidpath(self, path: List[int]) -> FsNode:
|
|
"""Look-up a FsNode based on the [fid baesd] given (absolute) path."""
|
|
path = list(path) # make a copy before modification
|
|
while len(path) and path[0] == 0x3f00:
|
|
path.pop(0)
|
|
cur = self.mf
|
|
for d in path:
|
|
if not d in cur.children:
|
|
raise KeyError('Could not find %s in %s while looking up %s from %s' % (d, cur, path, self))
|
|
cur = cur.children[d]
|
|
return cur
|
|
|
|
def walk(self, fn, **kwargs):
|
|
"""call 'fn(self, ``**kwargs``) for the DF and recursively for all children."""
|
|
ret = super().walk(fn, **kwargs)
|
|
for c in self.children.values():
|
|
ret += c.walk(fn, **kwargs)
|
|
return ret
|
|
|
|
class FsNodeADF(FsNodeDF):
|
|
"""An ADF (Application Dedicated File) in the filesystem hierarchy."""
|
|
def __init__(self, df_name: Hexstr, fid: Optional[int] = None, parent: Optional[FsNodeDF] = None,
|
|
file: Optional[File] = None, name: Optional[str] = None):
|
|
self.df_name = df_name
|
|
super().__init__(fid, parent, file, name)
|
|
|
|
def __str__(self):
|
|
# self.df_name is usually None for an ADF like ADF.USIM or ADF.ISIM so we need to guard against it
|
|
return '%s(%s)' % (self.__class__.__name__, b2h(self.df_name) if self.df_name else None)
|
|
|
|
class FsNodeMF(FsNodeDF):
|
|
"""The MF (Master File) in the filesystem hierarchy."""
|
|
def __init__(self, file: Optional[File] = None):
|
|
super().__init__(0x3f00, parent=None, file=file, name='MF')
|
|
self.parent = self
|