forked from public/pysim
add SUCI parameters
Change-Id: I3c0793b8a67bbd0c8247784bd3b5cbd265f94ec2
This commit is contained in:
@@ -20,13 +20,14 @@ import io
|
||||
import os
|
||||
import re
|
||||
import pprint
|
||||
import json
|
||||
from typing import List, Tuple, Generator, Optional
|
||||
|
||||
from construct.core import StreamError
|
||||
from osmocom.tlv import camel_to_snake
|
||||
from osmocom.utils import hexstr
|
||||
from pySim.utils import enc_iccid, dec_iccid, enc_imsi, dec_imsi, h2b, b2h, rpad, sanitize_iccid
|
||||
from pySim.ts_31_102 import EF_AD
|
||||
from pySim.ts_31_102 import EF_AD, EF_UST, EF_Routing_Indicator, EF_SUCI_Calc_Info
|
||||
from pySim.ts_51_011 import EF_SMSP
|
||||
from pySim.esim.saip import param_source
|
||||
from pySim.esim.saip import ProfileElement, ProfileElementSD, ProfileElementSequence
|
||||
@@ -1194,3 +1195,184 @@ class TuakNumberOfKeccak(IntegerParam, AlgoConfig):
|
||||
max_val = 255
|
||||
example_input = '1'
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
|
||||
class EfUstServiceParam(EnumParam):
|
||||
"""superclass for EF-UST service flag parameters"""
|
||||
service_idx = 0
|
||||
value_map = { 'enabled': True, 'disabled': False }
|
||||
default_source = param_source.ConstantSource
|
||||
example_input = sorted(value_map.keys())[0]
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
for pe in pes.get_pes_for_type('usim'):
|
||||
f_ust = pe.files['ef-ust']
|
||||
ef_ust = EF_UST()
|
||||
ust = ef_ust.decode_bin(f_ust.body)
|
||||
|
||||
ust[cls.service_idx]['activated'] = val
|
||||
|
||||
f_ust.body = ef_ust.encode_bin(ust)
|
||||
pe.file2pe(f_ust)
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for pe in pes.get_pes_for_type('usim'):
|
||||
f_ust = pe.files.get('ef-ust', None)
|
||||
if not f_ust:
|
||||
continue
|
||||
ef_ust = EF_UST()
|
||||
try:
|
||||
ust = ef_ust.decode_bin(f_ust.body)
|
||||
|
||||
service_flag = ust[cls.service_idx]['activated']
|
||||
yield { cls.name: cls.map_val_to_name(service_flag) }
|
||||
except:
|
||||
pass
|
||||
|
||||
class SuciActive(EfUstServiceParam):
|
||||
"""EF-UST service nr 124: enable or disable the SUCI service."""
|
||||
service_idx = 124
|
||||
name = '5G-SUCI-active'
|
||||
value_map = { 'SUCI-on': True, 'SUCI-off': False }
|
||||
example_input = sorted(value_map.keys())[0]
|
||||
|
||||
class SuciInUsim(EfUstServiceParam):
|
||||
"""EF-UST service nr 125: calculate SUCI in UE or in USIM"""
|
||||
service_idx = 125
|
||||
name = '5G-SUCI-in-USIM'
|
||||
value_map = { 'SUCI-in-UE': False, 'SUCI-in-USIM': True }
|
||||
example_input = sorted(value_map.keys())[0]
|
||||
|
||||
class SuciRi(ConfigurableParameter):
|
||||
"""SUCI Routing Indicator as in section 4.4.11.11 of 3GPP TS 31.102"""
|
||||
name = '5G-SUCI-RI'
|
||||
allow_chars = '0123456789'
|
||||
min_len = 1
|
||||
max_len = 4
|
||||
allow_types = (str,)
|
||||
example_input = '0'
|
||||
default_source = param_source.ConstantSource
|
||||
|
||||
KEY_RI = "routing_indicator"
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
for pe in pes.get_pes_for_type('df-5gs'):
|
||||
f_ri = pe.files.get('ef-routing-indicator', None)
|
||||
if f_ri is None:
|
||||
continue
|
||||
ef_ri = EF_Routing_Indicator()
|
||||
ri = ef_ri.decode_bin(f_ri.body)
|
||||
|
||||
ri[cls.KEY_RI] = str(val)
|
||||
|
||||
f_ri.body = ef_ri.encode_bin(ri)
|
||||
pe.file2pe(f_ri)
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
for pe in pes.get_pes_for_type('df-5gs'):
|
||||
f_ri = pe.files.get('ef-routing-indicator', None)
|
||||
if f_ri is None:
|
||||
continue
|
||||
ef_ri = EF_Routing_Indicator()
|
||||
try:
|
||||
ri = ef_ri.decode_bin(f_ri.body)
|
||||
yield { cls.name: ri.get(cls.KEY_RI) }
|
||||
except:
|
||||
pass
|
||||
|
||||
class SuciCalcInfo(ConfigurableParameter):
|
||||
"""SUCI Calculation Information as in section 4.4.11.8 of 3GPP TS 31.102"""
|
||||
name = '5G-SUCI-CalcInfo'
|
||||
example_input = '{}'
|
||||
default_source = param_source.ConstantSource
|
||||
allow_types = (str,)
|
||||
max_len = 2000
|
||||
|
||||
@classmethod
|
||||
def validate_val(cls, val):
|
||||
val = super().validate_val(val)
|
||||
|
||||
if not val:
|
||||
raise ValueError("SUCI Calc Info value is empty -- should at least be an empty dict like '{}'")
|
||||
|
||||
# check that it is a dict something like
|
||||
# {
|
||||
# "prot_scheme_id_list": [
|
||||
# {"priority": 0, "identifier": 2, "key_index": 1},
|
||||
# {"priority": 1, "identifier": 1, "key_index": 2},
|
||||
# ],
|
||||
# "hnet_pubkey_list": [
|
||||
# {"hnet_pubkey_identifier": 27,
|
||||
# "hnet_pubkey": "0472DA71976234CE833A6907425867B82E074D44EF907DFB4B3E21C1C2256EBCD15A7DED52FCBB097A4ED250E036C7B9C8C7004C4EEDC4F068CD7BF8D3F900E3B4"},
|
||||
# {"hnet_pubkey_identifier": 30,
|
||||
# "hnet_pubkey": "5A8D38864820197C3394B92613B20B91633CBD897119273BF8E4A6F4EEC0A650"},
|
||||
# ],
|
||||
# }
|
||||
|
||||
try:
|
||||
d = json.loads(val)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
raise ValueError(f"Cannot parse SUCI Calc Info: {e}") from e
|
||||
|
||||
KEY_PSI_LIST = 'prot_scheme_id_list'
|
||||
KEY_HPK_LIST = 'hnet_pubkey_list'
|
||||
KEYS_D = set((KEY_HPK_LIST, KEY_PSI_LIST))
|
||||
KEYS_PSI = set(('identifier', 'key_index', 'priority'))
|
||||
KEYS_HPK = set(('hnet_pubkey_identifier', 'hnet_pubkey'))
|
||||
|
||||
if not (isinstance(d, dict)
|
||||
and set(d.keys()) == KEYS_D):
|
||||
raise ValueError(f"Unexpected structure in SUCI Calc Info: expected dict with entries {KEYS_D}")
|
||||
|
||||
psi = d.get(KEY_PSI_LIST, None)
|
||||
if not all((set(e.keys()) == KEYS_PSI) for e in psi):
|
||||
raise ValueError("Unexpected structure in SUCI Calc Info:"
|
||||
f" in {KEY_PSI_LIST}, expected dict with entries {KEYS_PSI}")
|
||||
|
||||
hpk = d.get(KEY_HPK_LIST, None)
|
||||
if not all((set(e.keys()) == KEYS_HPK) for e in hpk):
|
||||
raise ValueError("Unexpected structure in SUCI Calc Info:"
|
||||
f" in {KEY_HPK_LIST}, expected dict with entries {KEYS_HPK}")
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def _apply_suci(cls, pes: ProfileElementSequence, val, pe_type="df-5gs", pe_file="ef-suci-calc-info"):
|
||||
for pe in pes.get_pes_for_type(pe_type):
|
||||
f_sucici = pe.files.get(pe_file, None)
|
||||
if not f_sucici:
|
||||
continue
|
||||
ef_sucici = EF_SUCI_Calc_Info()
|
||||
f_sucici.body = ef_sucici.encode_bin(val)
|
||||
pe.file2pe(f_sucici)
|
||||
|
||||
@classmethod
|
||||
def apply_val(cls, pes: ProfileElementSequence, val):
|
||||
cls._apply_suci(pes, val, "df-5gs", "ef-suci-calc-info")
|
||||
cls._apply_suci(pes, val, "df-saip", "ef-suci-calc-info-usim")
|
||||
|
||||
@classmethod
|
||||
def _get_suci(cls, pes: ProfileElementSequence, pe_type="df-5gs", pe_file="ef-suci-calc-info"):
|
||||
for pe in pes.get_pes_for_type(pe_type):
|
||||
f_sucici = pe.files.get(pe_file, None)
|
||||
if not f_sucici:
|
||||
continue
|
||||
ef_sucici = EF_SUCI_Calc_Info()
|
||||
sucici = ef_sucici.decode_bin(f_sucici.body)
|
||||
|
||||
# normalize to string (bytes cannot go into json)
|
||||
for hnet_pubkey in sucici.get('hnet_pubkey_list', ()):
|
||||
val = hnet_pubkey['hnet_pubkey']
|
||||
if isinstance(val, bytes):
|
||||
val = b2h(val)
|
||||
hnet_pubkey['hnet_pubkey'] = val
|
||||
|
||||
yield { cls.name: json.dumps(sucici) }
|
||||
|
||||
@classmethod
|
||||
def get_values_from_pes(cls, pes: ProfileElementSequence):
|
||||
yield from cls._get_suci(pes, "df-5gs", "ef-suci-calc-info")
|
||||
yield from cls._get_suci(pes, "df-saip", "ef-suci-calc-info-usim")
|
||||
|
||||
Reference in New Issue
Block a user