card_key_provider: separate and refactor CSV column encryption

The CardKeyProviderCsv class implements a column decryption scheme
where columns are protected using a transport key. The CSV files
are enrcypted using contrib/csv-encrypt-columns.py.

The current implementation has two main problems:

- The decryption code in CardKeyProviderCsv is not specific to CSV files.
  It could be re-used in other formats, for example to decrypt columns
  (fields) red from a database. So let's split the decryption code in a
  separate class.

- The encryption code in csv-encrypt-columns.py accesses methods and
  properties in CardKeyProviderCsv. Also having the coresponding
  encryption code somewhere out of tree may be confusing. Let's improve
  the design and put encryption and decryption functions in a single
  class. Let's also make sure the encryption/decryption is covered by
  unittests.

Related: SYS#7725
Change-Id: I180457d4938f526d227c81020e4e03c6b3a57dab
This commit is contained in:
Philipp Maier
2025-11-17 16:36:17 +01:00
parent 08565e8a98
commit 4550574e03
3 changed files with 165 additions and 60 deletions

View File

@@ -24,20 +24,12 @@ import argparse
from Cryptodome.Cipher import AES from Cryptodome.Cipher import AES
from osmocom.utils import h2b, b2h, Hexstr from osmocom.utils import h2b, b2h, Hexstr
from pySim.card_key_provider import CardKeyProviderCsv from pySim.card_key_provider import CardKeyFieldCryptor
def dict_keys_to_upper(d: dict) -> dict: class CsvColumnEncryptor(CardKeyFieldCryptor):
return {k.upper():v for k,v in d.items()}
class CsvColumnEncryptor:
def __init__(self, filename: str, transport_keys: dict): def __init__(self, filename: str, transport_keys: dict):
self.filename = filename self.filename = filename
self.transport_keys = dict_keys_to_upper(transport_keys) self.crypt = CardKeyFieldCryptor(transport_keys)
def encrypt_col(self, colname:str, value: str) -> Hexstr:
key = self.transport_keys[colname]
cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV)
return b2h(cipher.encrypt(h2b(value)))
def encrypt(self) -> None: def encrypt(self) -> None:
with open(self.filename, 'r') as infile: with open(self.filename, 'r') as infile:
@@ -49,9 +41,8 @@ class CsvColumnEncryptor:
cw.writeheader() cw.writeheader()
for row in cr: for row in cr:
for key_colname in self.transport_keys: for fieldname in cr.fieldnames:
if key_colname in row: row[fieldname] = self.crypt.encrypt_field(fieldname, row[fieldname])
row[key_colname] = self.encrypt_col(key_colname, row[key_colname])
cw.writerow(row) cw.writerow(row)
if __name__ == "__main__": if __name__ == "__main__":
@@ -71,9 +62,5 @@ if __name__ == "__main__":
print("You must specify at least one key!") print("You must specify at least one key!")
sys.exit(1) sys.exit(1)
csv_column_keys = CardKeyProviderCsv.process_transport_keys(csv_column_keys)
for name, key in csv_column_keys.items():
print("Encrypting column %s using AES key %s" % (name, key))
cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys) cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
cce.encrypt() cce.encrypt()

View File

@@ -10,7 +10,7 @@ the need of manually entering the related card-individual data on every
operation with pySim-shell. operation with pySim-shell.
""" """
# (C) 2021-2024 by Sysmocom s.f.m.c. GmbH # (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
# All Rights Reserved # All Rights Reserved
# #
# Author: Philipp Maier, Harald Welte # Author: Philipp Maier, Harald Welte
@@ -31,16 +31,29 @@ operation with pySim-shell.
from typing import List, Dict, Optional from typing import List, Dict, Optional
from Cryptodome.Cipher import AES from Cryptodome.Cipher import AES
from osmocom.utils import h2b, b2h from osmocom.utils import h2b, b2h
from pySim.log import PySimLogger
import abc import abc
import csv import csv
import logging
log = PySimLogger.get("CARDKEY")
card_key_providers = [] # type: List['CardKeyProvider'] card_key_providers = [] # type: List['CardKeyProvider']
class CardKeyFieldCryptor:
"""
A Card key field encryption class that may be used by Card key provider implementations to add support for
a column-based encryption to protect sensitive material (cryptographic key material, ADM keys, etc.).
The sensitive material is encrypted using a "key-encryption key", occasionally also known as "transport key"
before it is stored into a file or database (see also GSMA FS.28). The "transport key" is then used to decrypt
the key material on demand.
"""
# well-known groups of columns relate to a given functionality. This avoids having # well-known groups of columns relate to a given functionality. This avoids having
# to specify the same transport key N number of times, if the same key is used for multiple # to specify the same transport key N number of times, if the same key is used for multiple
# fields of one group, like KIC+KID+KID of one SD. # fields of one group, like KIC+KID+KID of one SD.
CRYPT_GROUPS = { __CRYPT_GROUPS = {
'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'], 'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'], 'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'], 'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
@@ -48,6 +61,74 @@ CRYPT_GROUPS = {
'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'], 'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
} }
__IV = b'\x23' * 16
@staticmethod
def __dict_keys_to_upper(d: dict) -> dict:
return {k.upper():v for k,v in d.items()}
@staticmethod
def __process_transport_keys(transport_keys: dict, crypt_groups: dict):
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
new_dict = {}
for name, key in transport_keys.items():
if name in crypt_groups:
for field in crypt_groups[name]:
new_dict[field] = key
else:
new_dict[name] = key
return new_dict
def __init__(self, transport_keys: dict):
"""
Create new field encryptor/decryptor object and set transport keys, usually one for each column. In some cases
it is also possible to use a single key for multiple columns (see also __CRYPT_GROUPS)
Args:
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the
respective field (column) of the CSV. This is done so that different fields
(columns) can use different transport keys, which is strongly recommended by
GSMA FS.28
"""
self.transport_keys = self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys),
self.__CRYPT_GROUPS)
for name, key in self.transport_keys.items():
log.debug("Encrypting/decrypting field %s using AES key %s" % (name, key))
def decrypt_field(self, field_name: str, encrypted_val: str) -> str:
"""
Decrypt a single field. The decryption is only applied if we have a transport key is known under the provided
field name, otherwise the field is treated as plaintext and passed through as it is.
Args:
field_name : name of the field to decrypt (used to identify which key to use)
encrypted_val : encrypted field value
Returns:
plaintext field value
"""
if not field_name.upper() in self.transport_keys:
return encrypted_val
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
return b2h(cipher.decrypt(h2b(encrypted_val)))
def encrypt_field(self, field_name: str, plaintext_val: str) -> str:
"""
Encrypt a single field. The encryption is only applied if we have a transport key is known under the provided
field name, otherwise the field is treated as non sensitive and passed through as it is.
Args:
field_name : name of the field to decrypt (used to identify which key to use)
encrypted_val : encrypted field value
Returns:
plaintext field value
"""
if not field_name.upper() in self.transport_keys:
return plaintext_val
cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV)
return b2h(cipher.encrypt(h2b(plaintext_val)))
class CardKeyProvider(abc.ABC): class CardKeyProvider(abc.ABC):
"""Base class, not containing any concrete implementation.""" """Base class, not containing any concrete implementation."""
@@ -89,13 +170,9 @@ class CardKeyProvider(abc.ABC):
dictionary of {field, value} strings for each requested field from 'fields' dictionary of {field, value} strings for each requested field from 'fields'
""" """
class CardKeyProviderCsv(CardKeyProvider): class CardKeyProviderCsv(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified CSV file. """Card key provider implementation that allows to query against a specified CSV file."""
Supports column-based encryption as it is generally a bad idea to store cryptographic key material in
plaintext. Instead, the key material should be encrypted by a "key-encryption key", occasionally also
known as "transport key" (see GSMA FS.28)."""
IV = b'\x23' * 16
csv_file = None csv_file = None
filename = None filename = None
@@ -103,35 +180,13 @@ class CardKeyProviderCsv(CardKeyProvider):
""" """
Args: Args:
filename : file name (path) of CSV file containing card-individual key/data filename : file name (path) of CSV file containing card-individual key/data
transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the transport_keys : (see class CardKeyFieldCryptor)
respective field (column) of the CSV. This is done so that different fields
(columns) can use different transport keys, which is strongly recommended by
GSMA FS.28
""" """
self.csv_file = open(filename, 'r') self.csv_file = open(filename, 'r')
if not self.csv_file: if not self.csv_file:
raise RuntimeError("Could not open CSV file '%s'" % filename) raise RuntimeError("Could not open CSV file '%s'" % filename)
self.filename = filename self.filename = filename
self.transport_keys = self.process_transport_keys(transport_keys) self.crypt = CardKeyFieldCryptor(transport_keys)
@staticmethod
def process_transport_keys(transport_keys: dict):
"""Apply a single transport key to multiple fields/columns, if the name is a group."""
new_dict = {}
for name, key in transport_keys.items():
if name in CRYPT_GROUPS:
for field in CRYPT_GROUPS[name]:
new_dict[field] = key
else:
new_dict[name] = key
return new_dict
def _decrypt_field(self, field_name: str, encrypted_val: str) -> str:
"""decrypt a single field, if we have a transport key for the field of that name."""
if not field_name in self.transport_keys:
return encrypted_val
cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, self.IV)
return b2h(cipher.decrypt(h2b(encrypted_val)))
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]: def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
super()._verify_get_data(fields, key, value) super()._verify_get_data(fields, key, value)
@@ -147,7 +202,7 @@ class CardKeyProviderCsv(CardKeyProvider):
if row[key] == value: if row[key] == value:
for f in fields: for f in fields:
if f in row: if f in row:
rc.update({f: self._decrypt_field(f, row[f])}) rc.update({f: self.crypt.decrypt_field(f, row[f])})
else: else:
raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.filename, f)) raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.filename, f))
return rc return rc

View File

@@ -4,7 +4,7 @@ import unittest
import os import os
from pySim.card_key_provider import * from pySim.card_key_provider import *
class TestCardKeyProvider(unittest.TestCase): class TestCardKeyProviderCsv(unittest.TestCase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F", column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
@@ -76,5 +76,68 @@ class TestCardKeyProvider(unittest.TestCase):
result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID')) result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID'))
self.assertEqual(result, t.get('EXPECTED')) self.assertEqual(result, t.get('EXPECTED'))
class TestCardKeyFieldCryptor(unittest.TestCase):
def __init__(self, *args, **kwargs):
transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
"OPC" : "000102030405065545645645645D0E0F",
"KIC1" : "06410203546406456456450B0C0D0E0F",
"UICC_SCP03" : "00040267840507667609045645645E0F"}
self.crypt = CardKeyFieldCryptor(transport_keys)
super().__init__(*args, **kwargs)
def test_encrypt_field(self):
test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "OPC"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIC1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KID1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIK1"},
{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "opc"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kic1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kid1"},
{'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kik1"}]
for t in test_data:
result = self.crypt.encrypt_field(t.get('FIELDNAME'), t.get('PLAINTEXT_VAL'))
self.assertEqual(result, t.get('EXPECTED'))
def test_decrypt_field(self):
test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "OPC"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIC1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KID1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIK1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "opc"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kic1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kid1"},
{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kik1"}]
for t in test_data:
result = self.crypt.decrypt_field(t.get('FIELDNAME'), t.get('ENCRYPTED_VAL'))
self.assertEqual(result, t.get('EXPECTED'))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()