diff --git a/pySim/global_platform/__init__.py b/pySim/global_platform/__init__.py index 306746f6..e836e088 100644 --- a/pySim/global_platform/__init__.py +++ b/pySim/global_platform/__init__.py @@ -35,6 +35,10 @@ from pySim.filesystem import * from pySim.profile import CardProfile from pySim.ota import SimFileAccessAndToolkitAppSpecParams from pySim.javacard import CapFile +from pySim.runtime import RuntimeLchan +from pySim.log import PySimLogger + +log = PySimLogger.get(__name__) # GPCS Table 11-48 Load Parameter Tags class NonVolatileCodeMinMemoryReq(BER_TLV_IE, tag=0xC6): @@ -543,7 +547,8 @@ class ADF_SD(CardADF): self._cmd.poutput('Unknown data object "%s", available options: %s' % (tlv_cls_name, do_names)) return - (data, _sw) = self._cmd.lchan.scc.get_data(cla=0x80, tag=tlv_cls.tag) + + data = get_data(self._cmd.lchan, tag=tlv_cls.tag) ie = tlv_cls() ie.from_tlv(h2b(data)) self._cmd.poutput_json(ie.to_dict()) @@ -564,27 +569,7 @@ class ADF_SD(CardADF): """Perform the GlobalPlatform STORE DATA command in order to store some card-specific data. See GlobalPlatform CardSpecification v2.3 Section 11.11 for details.""" response_permitted = opts.response == 'may_be_returned' - self.store_data(h2b(opts.DATA), opts.data_structure, opts.encryption, response_permitted) - - def store_data(self, data: bytes, structure:str = 'none', encryption:str = 'none', response_permitted: bool = False) -> bytes: - """Perform the GlobalPlatform STORE DATA command in order to store some card-specific data. - See GlobalPlatform CardSpecification v2.3 Section 11.11 for details.""" - max_cmd_len = self._cmd.lchan.scc.max_cmd_len - # Table 11-89 of GP Card Specification v2.3 - remainder = data - block_nr = 0 - response = '' - while len(remainder): - chunk = remainder[:max_cmd_len] - remainder = remainder[max_cmd_len:] - p1b = build_construct(ADF_SD.StoreData, - {'last_block': len(remainder) == 0, 'encryption': encryption, - 'structure': structure, 'response': response_permitted}) - hdr = "80E2%02x%02x%02x" % (p1b[0], block_nr, len(chunk)) - data, _sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(chunk) + "00") - block_nr += 1 - response += data - return h2b(response) + store_data(self._cmd.lchan, h2b(opts.DATA), opts.data_structure, opts.encryption, response_permitted) put_key_parser = argparse.ArgumentParser() put_key_parser.add_argument('--old-key-version-nr', type=auto_uint8, default=0, help='Old Key Version Number') @@ -635,20 +620,8 @@ class ADF_SD(CardADF): p2 = opts.key_id if len(opts.key_type) > 1: p2 |= 0x80 - self.put_key(opts.old_key_version_nr, opts.key_version_nr, p2, kdb) + put_key(self._cmd.lchan, opts.old_key_version_nr, opts.key_version_nr, p2, kdb) - # Table 11-68: Key Data Field - Format 1 (Basic Format) - KeyDataBasic = GreedyRange(Struct('key_type'/KeyType, - 'kcb'/Prefixed(Int8ub, GreedyBytes), - 'kcv'/Prefixed(Int8ub, GreedyBytes))) - - def put_key(self, old_kvn:int, kvn: int, kid: int, key_dict: dict) -> bytes: - """Perform the GlobalPlatform PUT KEY command in order to store a new key on the card. - See GlobalPlatform CardSpecification v2.3 Section 11.8 for details.""" - key_data = kvn.to_bytes(1, 'big') + build_construct(ADF_SD.AddlShellCommands.KeyDataBasic, key_dict) - hdr = "80D8%02x%02x%02x" % (old_kvn, kid, len(key_data)) - data, _sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(key_data) + "00") - return data get_status_parser = argparse.ArgumentParser() get_status_parser.add_argument('subset', choices=list(StatusSubset.ksymapping.values()), @@ -660,31 +633,10 @@ class ADF_SD(CardADF): def do_get_status(self, opts): """Perform GlobalPlatform GET STATUS command in order to retrieve status information on Issuer Security Domain, Executable Load File, Executable Module or Applications.""" - grd_list = self.get_status(opts.subset, opts.aid) + grd_list = get_status(self._cmd.lchan, opts.subset, opts.aid) for grd in grd_list: self._cmd.poutput_json(grd.to_dict()) - def get_status(self, subset:str, aid_search_qualifier:Hexstr = '') -> List[GpRegistryRelatedData]: - subset_hex = b2h(build_construct(StatusSubset, subset)) - aid = ApplicationAID(decoded=aid_search_qualifier) - cmd_data = aid.to_tlv() + h2b('5c054f9f70c5cc') - p2 = 0x02 # TLV format according to Table 11-36 - grd_list = [] - while True: - hdr = "80F2%s%02x%02x" % (subset_hex, p2, len(cmd_data)) - data, sw = self._cmd.lchan.scc.send_apdu(hdr + b2h(cmd_data) + "00") - remainder = h2b(data) - while len(remainder): - # tlv sequence, each element is one GpRegistryRelatedData() - grd = GpRegistryRelatedData() - _dec, remainder = grd.from_tlv(remainder) - grd_list.append(grd) - if sw != '6310': - return grd_list - else: - p2 |= 0x01 - return grd_list - set_status_parser = argparse.ArgumentParser() set_status_parser.add_argument('scope', choices=list(SetStatusScope.ksymapping.values()), help='Defines the scope of the requested status change') @@ -698,14 +650,7 @@ class ADF_SD(CardADF): """Perform GlobalPlatform SET STATUS command in order to change the life cycle state of the Issuer Security Domain, Supplementary Security Domain or Application. This normally requires prior authentication with a Secure Channel Protocol.""" - self.set_status(opts.scope, opts.status, opts.aid) - - def set_status(self, scope:str, status:str, aid:Hexstr = ''): - SetStatus = Struct(Const(0x80, Byte), Const(0xF0, Byte), - 'scope'/SetStatusScope, 'status'/CLifeCycleState, - 'aid'/Prefixed(Int8ub, COptional(GreedyBytes))) - apdu = build_construct(SetStatus, {'scope':scope, 'status':status, 'aid':aid}) - _data, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(apdu)) + set_status(self._cmd.lchan, opts.scope, opts.status, opts.aid) inst_perso_parser = argparse.ArgumentParser() inst_perso_parser.add_argument('application_aid', type=is_hexstr, help='Application AID') @@ -715,7 +660,8 @@ class ADF_SD(CardADF): """Perform GlobalPlatform INSTALL [for personalization] command in order to inform a Security Domain that the following STORE DATA commands are meant for a specific AID (specified here).""" # Section 11.5.2.3.6 / Table 11-47 - self.install(0x20, 0x00, "0000%02x%s000000" % (len(opts.application_aid)//2, opts.application_aid)) + install(self._cmd.lchan, 0x20, 0x00, "0000%02x%s000000" % + (len(opts.application_aid)//2, opts.application_aid)) inst_inst_parser = argparse.ArgumentParser() inst_inst_parser.add_argument('--load-file-aid', type=is_hexstr, default='', @@ -750,7 +696,7 @@ class ADF_SD(CardADF): # convert from list to "true-dict" as required by construct.FlagsEnum decoded['privileges'] = {x: True for x in decoded['privileges']} ifi_bytes = build_construct(InstallForInstallCD, decoded) - self.install(p1, 0x00, b2h(ifi_bytes)) + install(self._cmd.lchan, p1, 0x00, b2h(ifi_bytes)) inst_load_parser = argparse.ArgumentParser() inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True, @@ -775,11 +721,7 @@ class ADF_SD(CardADF): 'load_parameters'/Prefixed(Int8ub, GreedyBytes), 'load_token'/Prefixed(Int8ub, GreedyBytes)) ifl_bytes = build_construct(InstallForLoadCD, vars(opts)) - self.install(0x02, 0x00, b2h(ifl_bytes)) - - def install(self, p1:int, p2:int, data:Hexstr) -> ResTuple: - cmd_hex = "80E6%02x%02x%02x%s00" % (p1, p2, len(data)//2, data) - return self._cmd.lchan.scc.send_apdu_checksw(cmd_hex) + install(self._cmd.lchan, 0x02, 0x00, b2h(ifl_bytes)) del_cc_parser = argparse.ArgumentParser() del_cc_parser.add_argument('aid', type=is_hexstr, @@ -793,7 +735,7 @@ class ADF_SD(CardADF): File, an Application or an Executable Load File and its related Applications.""" p2 = 0x80 if opts.delete_related_objects else 0x00 aid = ApplicationAID(decoded=opts.aid) - self.delete(0x00, p2, b2h(aid.to_tlv())) + delete(self._cmd.lchan, 0x00, p2, b2h(aid.to_tlv())) del_key_parser = argparse.ArgumentParser() del_key_parser.add_argument('--key-id', type=auto_uint7, help='Key Identifier (KID)') @@ -814,11 +756,7 @@ class ADF_SD(CardADF): cmd += "d001%02x" % opts.key_id if opts.key_ver is not None: cmd += "d201%02x" % opts.key_ver - self.delete(0x00, p2, cmd) - - def delete(self, p1:int, p2:int, data:Hexstr) -> ResTuple: - cmd_hex = "80E4%02x%02x%02x%s00" % (p1, p2, len(data)//2, data) - return self._cmd.lchan.scc.send_apdu_checksw(cmd_hex) + delete(self._cmd.lchan, 0x00, p2, cmd) load_parser = argparse.ArgumentParser() load_parser_from_grp = load_parser.add_mutually_exclusive_group(required=True) @@ -831,33 +769,15 @@ class ADF_SD(CardADF): """Perform a GlobalPlatform LOAD command. (We currently only support loading without DAP and without ciphering.)""" if opts.from_hex is not None: - self.load(h2b(opts.from_hex)) + load(self._cmd.lchan, h2b(opts.from_hex)) elif opts.from_file is not None: - self.load(opts.from_file.read()) + load(self._cmd.lchan, opts.from_file.read()) elif opts.from_cap_file is not None: cap = CapFile(opts.from_cap_file) - self.load(cap.get_loadfile()) + load(self._cmd.lchan, cap.get_loadfile()) else: raise ValueError('load source not specified!') - def load(self, contents:bytes, chunk_len:int = 240): - # TODO:tune chunk_len based on the overhead of the used SCP? - # build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case - remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents - # transfer this in various chunks to the card - total_size = len(remainder) - block_nr = 0 - while len(remainder): - block = remainder[:chunk_len] - remainder = remainder[chunk_len:] - # build LOAD command APDU according to GPC_SPE_034 section 11.6.2 / Table 11-56 - p1 = 0x00 if len(remainder) else 0x80 - p2 = block_nr % 256 - block_nr += 1 - cmd_hex = "80E8%02x%02x%02x%s00" % (p1, p2, len(block), b2h(block)) - _rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex) - self._cmd.poutput("Loaded a total of %u bytes in %u blocks. Don't forget install_for_install (and make selectable) now!" % (total_size, block_nr)) - install_cap_parser = argparse.ArgumentParser(usage='%(prog)s FILE [--install-parameters | --install-parameters-*]') install_cap_parser.add_argument('cap_file', type=str, metavar='FILE', help='JAVA-CARD CAP file to install') @@ -918,7 +838,7 @@ class ADF_SD(CardADF): self._cmd.poutput("step #1: install for load...") self.do_install_for_load("--load-file-aid %s --security-domain-aid %s" % (load_file_aid, security_domain_aid)) self._cmd.poutput("step #2: load...") - self.load(load_file) + load(self._cmd.lchan, load_file) self._cmd.poutput("step #3: install_for_install (and make selectable)...") self.do_install_for_install("--load-file-aid %s --module-aid %s --application-aid %s --install-parameters %s --make-selectable" % (load_file_aid, module_aid, application_aid, install_parameters)) @@ -958,7 +878,7 @@ class ADF_SD(CardADF): host_challenge = h2b(opts.host_challenge) if opts.host_challenge else get_random_bytes(8) kset = GpCardKeyset(opts.key_ver, h2b(opts.key_enc), h2b(opts.key_mac), h2b(opts.key_dek)) scp02 = SCP02(card_keys=kset) - self._establish_scp(scp02, host_challenge, opts.security_level) + establish_scp(self._cmd.lchan, scp02, host_challenge, opts.security_level) est_scp03_parser = deepcopy(est_scp02_parser) est_scp03_parser.description = None @@ -986,27 +906,14 @@ class ADF_SD(CardADF): host_challenge = h2b(opts.host_challenge) if opts.host_challenge else get_random_bytes(s_mode) kset = GpCardKeyset(opts.key_ver, h2b(opts.key_enc), h2b(opts.key_mac), h2b(opts.key_dek)) scp03 = SCP03(card_keys=kset, s_mode = s_mode) - self._establish_scp(scp03, host_challenge, opts.security_level) - - def _establish_scp(self, scp, host_challenge, security_level): - # perform the common functionality shared by SCP02 and SCP03 establishment - init_update_apdu = scp.gen_init_update_apdu(host_challenge=host_challenge) - init_update_resp, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(init_update_apdu)) - scp.parse_init_update_resp(h2b(init_update_resp)) - ext_auth_apdu = scp.gen_ext_auth_apdu(security_level) - _ext_auth_resp, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(ext_auth_apdu)) - self._cmd.poutput("Successfully established a %s secure channel" % str(scp)) - # store a reference to the SCP instance - self._cmd.lchan.scc.scp = scp - self._cmd.update_prompt() - + establish_scp(self._cmd.lchan, scp03, host_challenge, opts.security_level) def do_release_scp(self, _opts): """Release a previously establiehed secure channel.""" if not self._cmd.lchan.scc.scp: self._cmd.poutput("Cannot release SCP as none is established") return - self._cmd.lchan.scc.scp = None + release_scp(self._cmd.lchan) self._cmd.update_prompt() # Card Application of a Security Domain @@ -1075,3 +982,120 @@ def compute_kcv(key_type: str, key: bytes) -> Optional[bytes]: return None else: return kcv_calculator(key)[:3] + +def store_data(lchan: RuntimeLchan, data: bytes, structure:str = 'none', encryption:str = 'none', + response_permitted: bool = False) -> bytes: + """ + Perform the GlobalPlatform STORE DATA command in order to store some card-specific data. + See GlobalPlatform CardSpecification v2.3 Section 11.11 for details. + """ + max_cmd_len = lchan.scc.max_cmd_len + # Table 11-89 of GP Card Specification v2.3 + remainder = data + block_nr = 0 + response = '' + while len(remainder): + chunk = remainder[:max_cmd_len] + remainder = remainder[max_cmd_len:] + p1b = build_construct(ADF_SD.StoreData, + {'last_block': len(remainder) == 0, 'encryption': encryption, + 'structure': structure, 'response': response_permitted}) + hdr = "80E2%02x%02x%02x" % (p1b[0], block_nr, len(chunk)) + data, _sw = lchan.scc.send_apdu_checksw(hdr + b2h(chunk) + "00") + block_nr += 1 + response += data + return h2b(response) + + + +def get_data(lchan: RuntimeLchan, tag: int) -> bytes: + (data, _sw) = lchan.scc.get_data(cla=0x80, tag=tag) + return data + +def put_key(lchan: RuntimeLchan, old_kvn:int, kvn: int, kid: int, key_dict: dict) -> bytes: + """ + Perform the GlobalPlatform PUT KEY command in order to store a new key on the card. + See GlobalPlatform CardSpecification v2.3 Section 11.8 for details. + """ + + # Table 11-68: Key Data Field - Format 1 (Basic Format) + KeyDataBasic = GreedyRange(Struct('key_type'/KeyType, + 'kcb'/Prefixed(Int8ub, GreedyBytes), + 'kcv'/Prefixed(Int8ub, GreedyBytes))) + + key_data = kvn.to_bytes(1, 'big') + build_construct(KeyDataBasic, key_dict) + hdr = "80D8%02x%02x%02x" % (old_kvn, kid, len(key_data)) + data, _sw = lchan.scc.send_apdu_checksw(hdr + b2h(key_data) + "00") + return data + +def get_status(lchan: RuntimeLchan, subset:str, aid_search_qualifier:Hexstr = '') -> List[GpRegistryRelatedData]: + subset_hex = b2h(build_construct(StatusSubset, subset)) + aid = ApplicationAID(decoded=aid_search_qualifier) + cmd_data = aid.to_tlv() + h2b('5c054f9f70c5cc') + p2 = 0x02 # TLV format according to Table 11-36 + grd_list = [] + while True: + hdr = "80F2%s%02x%02x" % (subset_hex, p2, len(cmd_data)) + data, sw = lchan.scc.send_apdu(hdr + b2h(cmd_data) + "00") + remainder = h2b(data) + while len(remainder): + # tlv sequence, each element is one GpRegistryRelatedData() + grd = GpRegistryRelatedData() + _dec, remainder = grd.from_tlv(remainder) + grd_list.append(grd) + if sw != '6310': + return grd_list + else: + p2 |= 0x01 + return grd_list + +def set_status(lchan: RuntimeLchan, scope:str, status:str, aid:Hexstr = ''): + SetStatus = Struct(Const(0x80, Byte), Const(0xF0, Byte), + 'scope'/SetStatusScope, 'status'/CLifeCycleState, + 'aid'/Prefixed(Int8ub, COptional(GreedyBytes))) + apdu = build_construct(SetStatus, {'scope':scope, 'status':status, 'aid':aid}) + _data, _sw = lchan.scc.send_apdu_checksw(b2h(apdu)) + + inst_perso_parser = argparse.ArgumentParser() + inst_perso_parser.add_argument('application_aid', type=is_hexstr, help='Application AID') + +def install(lchan: RuntimeLchan, p1:int, p2:int, data:Hexstr) -> ResTuple: + cmd_hex = "80E6%02x%02x%02x%s00" % (p1, p2, len(data)//2, data) + return lchan.scc.send_apdu_checksw(cmd_hex) + +def delete(lchan: RuntimeLchan, p1:int, p2:int, data:Hexstr) -> ResTuple: + cmd_hex = "80E4%02x%02x%02x%s00" % (p1, p2, len(data)//2, data) + return lchan.scc.send_apdu_checksw(cmd_hex) + +def load(lchan: RuntimeLchan, contents:bytes, chunk_len:int = 240): + # TODO:tune chunk_len based on the overhead of the used SCP? + # build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case + remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents + # transfer this in various chunks to the card + total_size = len(remainder) + block_nr = 0 + while len(remainder): + block = remainder[:chunk_len] + remainder = remainder[chunk_len:] + # build LOAD command APDU according to GPC_SPE_034 section 11.6.2 / Table 11-56 + p1 = 0x00 if len(remainder) else 0x80 + p2 = block_nr % 256 + block_nr += 1 + cmd_hex = "80E8%02x%02x%02x%s00" % (p1, p2, len(block), b2h(block)) + _rsp_hex, _sw = lchan.scc.send_apdu_checksw(cmd_hex) + log.info("Loaded a total of %u bytes in %u blocks. Don't forget install_for_install (and make selectable) now!", + total_size, block_nr) + +def establish_scp(lchan: RuntimeLchan, scp, host_challenge, security_level): + # perform the common functionality shared by SCP02 and SCP03 establishment + init_update_apdu = scp.gen_init_update_apdu(host_challenge=host_challenge) + init_update_resp, _sw = lchan.scc.send_apdu_checksw(b2h(init_update_apdu)) + scp.parse_init_update_resp(h2b(init_update_resp)) + ext_auth_apdu = scp.gen_ext_auth_apdu(security_level) + _ext_auth_resp, _sw = lchan.scc.send_apdu_checksw(b2h(ext_auth_apdu)) + log.info("Successfully established a %s secure channel", str(scp)) + # store a reference to the SCP instance + lchan.scc.scp = scp + +def release_scp(lchan: RuntimeLchan): + lchan.scc.scp = None