pySim-shell: add "fsdump" command
This command exports the entire filesystem state as one JSON document, which can be useful for storing it in a noSQL database, or for doing a structured diff between different such dumps. It's similar to "export", but then reasonably different to rectify a separate command. Change-Id: Ib179f57bc04d394efe11003ba191dca6098192d3
This commit is contained in:
183
pySim-shell.py
183
pySim-shell.py
@@ -554,10 +554,10 @@ class PySimCommands(CommandSet):
|
||||
context['COUNT'] += 1
|
||||
df = self._cmd.lchan.selected_file
|
||||
|
||||
# The currently selected file (not the file we are going to export)
|
||||
# must always be an ADF or DF. From this starting point we select
|
||||
# the EF we want to export. To maintain consistency we will then
|
||||
# select the current DF again (see comment below).
|
||||
# The currently selected file (not the file we are going to export)
|
||||
# must always be an ADF or DF. From this starting point we select
|
||||
# the EF we want to export. To maintain consistency we will then
|
||||
# select the current DF again (see comment below).
|
||||
if not isinstance(df, CardDF):
|
||||
raise RuntimeError(
|
||||
"currently selected file %s is not a DF or ADF" % str(df))
|
||||
@@ -640,6 +640,181 @@ class PySimCommands(CommandSet):
|
||||
raise RuntimeError(
|
||||
"unable to export %i dedicated files(s)%s" % (context['ERR'], exception_str_add))
|
||||
|
||||
def fsdump_df(self, context, as_json):
|
||||
"""Dump information about currently selected [A]DF"""
|
||||
df = self._cmd.lchan.selected_file
|
||||
df_path_list = df.fully_qualified_path(True)
|
||||
df_path = df.fully_qualified_path_str(True)
|
||||
|
||||
res = {
|
||||
'path': df_path_list,
|
||||
}
|
||||
|
||||
try:
|
||||
if not self._cmd.lchan.selected_file_fcp_hex:
|
||||
# An application without a real ADF (like ADF.ARA-M) / filesystem
|
||||
return
|
||||
|
||||
fcp_dec = self._cmd.lchan.selected_file_fcp
|
||||
res['fcp_raw'] = str(self._cmd.lchan.selected_file_fcp_hex)
|
||||
res['fcp'] = fcp_dec
|
||||
|
||||
except SwMatchError as e:
|
||||
res['error'] = {
|
||||
'sw_actual': e.sw_actual,
|
||||
'sw_expected': e.sw_expected,
|
||||
'message': e.description,
|
||||
}
|
||||
except Exception as e:
|
||||
raise(e)
|
||||
res['error'] = {
|
||||
'message': str(e)
|
||||
}
|
||||
|
||||
context['result']['files'][df_path] = res
|
||||
|
||||
def fsdump_ef(self, filename, context, as_json):
|
||||
"""Select and dump a single elementary file (EF) """
|
||||
# TODO: this is very similar to export_ef(), but I couldn't really come up with a way to share
|
||||
# code between the two. They only hypothetical option could be turn "export" into a mere
|
||||
# post-processing / printing function that works on the fsdump-generated dict/json?
|
||||
df = self._cmd.lchan.selected_file
|
||||
|
||||
# The currently selected file (not the file we are going to export)
|
||||
# must always be an ADF or DF. From this starting point we select
|
||||
# the EF we want to export. To maintain consistency we will then
|
||||
# select the current DF again (see comment below).
|
||||
if not isinstance(df, CardDF):
|
||||
raise RuntimeError("currently selected file %s is not a DF or ADF" % str(df))
|
||||
|
||||
df_path_list = df.fully_qualified_path(True)
|
||||
df_path = df.fully_qualified_path_str(True)
|
||||
df_path_fid = df.fully_qualified_path_str(False)
|
||||
|
||||
file_str = df_path + "/" + str(filename)
|
||||
|
||||
res = {
|
||||
'path': df_path_list + [str(filename)],
|
||||
}
|
||||
|
||||
try:
|
||||
fcp_dec = self._cmd.lchan.select(filename, self._cmd)
|
||||
|
||||
res['fcp_raw'] = str(self._cmd.lchan.selected_file_fcp_hex)
|
||||
res['fcp'] = fcp_dec
|
||||
|
||||
structure = self._cmd.lchan.selected_file_structure()
|
||||
if structure == 'transparent':
|
||||
if as_json:
|
||||
result = self._cmd.lchan.read_binary_dec()
|
||||
body = result[0]
|
||||
else:
|
||||
result = self._cmd.lchan.read_binary()
|
||||
body = str(result[0])
|
||||
elif structure == 'cyclic' or structure == 'linear_fixed':
|
||||
body = []
|
||||
# Use number of records specified in select response
|
||||
num_of_rec = self._cmd.lchan.selected_file_num_of_rec()
|
||||
if num_of_rec:
|
||||
for r in range(1, num_of_rec + 1):
|
||||
if as_json:
|
||||
result = self._cmd.lchan.read_record_dec(r)
|
||||
body.append(result[0])
|
||||
else:
|
||||
result = self._cmd.lchan.read_record(r)
|
||||
body.append(str(result[0]))
|
||||
|
||||
# When the select response does not return the number of records, read until we hit the
|
||||
# first record that cannot be read.
|
||||
else:
|
||||
r = 1
|
||||
while True:
|
||||
try:
|
||||
if as_json:
|
||||
result = self._cmd.lchan.read_record_dec(r)
|
||||
body.append(result[0])
|
||||
else:
|
||||
result = self._cmd.lchan.read_record(r)
|
||||
body.append(str(result[0]))
|
||||
except SwMatchError as e:
|
||||
# We are past the last valid record - stop
|
||||
if e.sw_actual == "9402":
|
||||
break
|
||||
# Some other problem occurred
|
||||
raise e
|
||||
r = r + 1
|
||||
elif structure == 'ber_tlv':
|
||||
tags = self._cmd.lchan.retrieve_tags()
|
||||
body = {}
|
||||
for t in tags:
|
||||
result = self._cmd.lchan.retrieve_data(t)
|
||||
(tag, l, val, remainer) = bertlv_parse_one(h2b(result[0]))
|
||||
body[t] = b2h(val)
|
||||
else:
|
||||
raise RuntimeError('Unsupported structure "%s" of file "%s"' % (structure, filename))
|
||||
res['body'] = body
|
||||
|
||||
except SwMatchError as e:
|
||||
res['error'] = {
|
||||
'sw_actual': e.sw_actual,
|
||||
'sw_expected': e.sw_expected,
|
||||
'message': e.description,
|
||||
}
|
||||
except Exception as e:
|
||||
raise(e)
|
||||
res['error'] = {
|
||||
'message': str(e)
|
||||
}
|
||||
|
||||
context['result']['files'][file_str] = res
|
||||
|
||||
# When reading the file is done, make sure the parent file is
|
||||
# selected again. This will be the usual case, however we need
|
||||
# to check before since we must not select the same DF twice
|
||||
if df != self._cmd.lchan.selected_file:
|
||||
self._cmd.lchan.select(df.fid or df.aid, self._cmd)
|
||||
|
||||
|
||||
fsdump_parser = argparse.ArgumentParser()
|
||||
fsdump_parser.add_argument(
|
||||
'--filename', type=str, default=None, help='only export specific (named) file')
|
||||
fsdump_parser.add_argument(
|
||||
'--json', action='store_true', help='export file contents as JSON (less reliable)')
|
||||
|
||||
@cmd2.with_argparser(fsdump_parser)
|
||||
def do_fsdump(self, opts):
|
||||
"""Export filesystem metadata and file contents of all files below current DF in
|
||||
machine-readable json format. This is similar to "export", but much easier to parse by
|
||||
downstream processing tools. You usually may want to call this from the MF and verify
|
||||
the ADM1 PIN (if available) to maximize the amount of readable files."""
|
||||
result = {
|
||||
'name': self._cmd.card.name,
|
||||
'atr': self._cmd.rs.identity['ATR'],
|
||||
'eid': self._cmd.rs.identity.get('EID', None),
|
||||
'iccid': self._cmd.rs.identity.get('ICCID', None),
|
||||
'aids': {x.aid:{} for x in self._cmd.rs.mf.applications.values()},
|
||||
'files': {},
|
||||
}
|
||||
context = {'result': result, 'DF_SKIP': 0, 'DF_SKIP_REASON': []}
|
||||
kwargs_export = {'as_json': opts.json}
|
||||
exception_str_add = ""
|
||||
|
||||
if opts.filename:
|
||||
# export only that one specified file
|
||||
self.fsdump_ef(opts.filename, context, **kwargs_export)
|
||||
else:
|
||||
# export an entire subtree
|
||||
try:
|
||||
self.walk(0, self.fsdump_ef, self.fsdump_df, context, **kwargs_export)
|
||||
except Exception as e:
|
||||
print("# Stopping early here due to exception: " + str(e))
|
||||
print("#")
|
||||
exception_str_add = ", also had to stop early due to exception:" + str(e)
|
||||
#raise e
|
||||
|
||||
self._cmd.poutput_json(context['result'])
|
||||
|
||||
|
||||
def do_desc(self, opts):
|
||||
"""Display human readable file description for the currently selected file"""
|
||||
desc = self._cmd.lchan.selected_file.desc
|
||||
|
||||
Reference in New Issue
Block a user