card_key_provider: add PostgreSQL support

The Card Key Provider currently only has support for CSV files
as input. Unfortunately using CSV files does not scale very well
when the card inventory is very large and continously updated.
In this case a centralized storage in the form of a database
is the more suitable approach.

This patch adds PostgreSQL support next to the existing CSV
file support. It also adds an importer tool to import existing
CSV files into the database.

Change-Id: Icba625c02a60d7e1f519b506a46bda5ded0537d3
Related: SYS#7725
This commit is contained in:
Philipp Maier
2025-11-21 17:45:21 +01:00
parent eb7c5d85d0
commit fddab8639f
5 changed files with 587 additions and 10 deletions

View File

@@ -36,6 +36,7 @@ from pySim.log import PySimLogger
import abc
import csv
import logging
import yaml
log = PySimLogger.get("CARDKEY")
@@ -159,6 +160,7 @@ class CardKeyProviderCsv(CardKeyProvider):
csv_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor)
"""
log.info("Using CSV file as card key data source: %s" % csv_filename)
self.csv_file = open(csv_filename, 'r')
if not self.csv_file:
raise RuntimeError("Could not open CSV file '%s'" % csv_filename)
@@ -186,6 +188,69 @@ class CardKeyProviderCsv(CardKeyProvider):
return None
return return_dict
class CardKeyProviderPgsql(CardKeyProvider):
"""Card key provider implementation that allows to query against a specified PostgreSQL database table."""
def __init__(self, config_filename: str, transport_keys: dict):
"""
Args:
config_filename : file name (path) of CSV file containing card-individual key/data
transport_keys : (see class CardKeyFieldCryptor)
"""
import psycopg2
log.info("Using SQL database as card key data source: %s" % config_filename)
with open(config_filename, "r") as cfg:
config = yaml.load(cfg, Loader=yaml.FullLoader)
log.info("Card key database name: %s" % config.get('db_name'))
db_users = config.get('db_users')
user = db_users.get('reader')
if user is None:
raise ValueError("user for role 'reader' not set up in config file.")
self.conn = psycopg2.connect(dbname=config.get('db_name'),
user=user.get('name'),
password=user.get('pass'),
host=config.get('host'))
self.tables = config.get('table_names')
log.info("Card key database tables: %s" % str(self.tables))
self.crypt = CardKeyFieldCryptor(transport_keys)
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
import psycopg2
from psycopg2.sql import Identifier, SQL
db_result = None
for t in self.tables:
self.conn.rollback()
cur = self.conn.cursor()
# Make sure that the database table and the key column actually exists. If not, move on to the next table
cur.execute("SELECT column_name FROM information_schema.columns where table_name = %s;", (t,))
cols_result = cur.fetchall()
if cols_result == []:
log.warning("Card Key database seems to lack table %s, check config file!" % t)
continue
if (key.lower(),) not in cols_result:
continue
# Query requested columns from database table
query = SQL("SELECT {}").format(Identifier(fields[0].lower()))
for f in fields[1:]:
query += SQL(", {}").format(Identifier(f.lower()))
query += SQL(" FROM {} WHERE {} = %s LIMIT 1;").format(Identifier(t.lower()),
Identifier(key.lower()))
cur.execute(query, (value,))
db_result = cur.fetchone()
cur.close()
if db_result:
break
if db_result is None:
return None
result = dict(zip(fields, db_result))
for k in result.keys():
result[k] = self.crypt.decrypt_field(k, result.get(k))
return result
def card_key_provider_register(provider: CardKeyProvider, provider_list=card_key_providers):