citi-alerts/db/migration.py

66 lines
3.1 KiB
Python

import logging
import sqlite3
__STEPS = [
"""
BEGIN;
CREATE TABLE "transaction" (
id INTEGER PRIMARY KEY NOT NULL CHECK (typeof(id) = 'integer'),
email_message_id TEXT NOT NULL CHECK (typeof(email_message_id) = 'text'),
amount TEXT NOT NULL CHECK (typeof(amount) = 'text'),
card_ending_in TEXT NOT NULL CHECK (typeof(card_ending_in) = 'text'),
merchant TEXT NOT NULL CHECK (typeof(merchant) = 'text'),
date TEXT NOT NULL CHECK (typeof(date) = 'text'),
time TEXT NOT NULL CHECK (typeof(time) = 'text'),
acknowledged BOOLEAN NOT NULL CHECK (typeof(acknowledged) = 'integer'),
ts_update INTEGER NOT NULL CHECK (typeof(ts_update) = 'integer'),
ts_insert INTEGER NOT NULL CHECK (typeof(ts_insert) = 'integer')
);
CREATE UNIQUE INDEX ix__transaction__email_message_id ON
"transaction" (email_message_id);
CREATE INDEX ix__transaction__acknowledged ON
"transaction" (acknowledged);
CREATE TABLE notification (
id INTEGER PRIMARY KEY NOT NULL CHECK (typeof(id) = 'integer'),
id_transaction INTEGER NOT NULL CHECK (typeof(id_transaction) = 'integer'),
user TEXT NOT NULL CHECK (typeof(user) = 'text'),
pushover_receipt TEXT NOT NULL CHECK (typeof(pushover_receipt) = 'text'),
acknowledged BOOLEAN NOT NULL CHECK (typeof(acknowledged) = 'integer'),
expired BOOLEAN NOT NULL CHECK (typeof(expired) = 'integer'),
ts_update INTEGER NOT NULL CHECK (typeof(ts_update) = 'integer'),
ts_insert INTEGER NOT NULL CHECK (typeof(ts_insert) = 'integer'),
FOREIGN KEY (id_transaction) REFERENCES "transaction" (id)
);
CREATE INDEX ix__notification__id_transaction ON
notification (id_transaction);
CREATE TABLE schema_version (
id INTEGER PRIMARY KEY NOT NULL CHECK (typeof(id) = 'integer'),
ts_insert INTEGER NOT NULL CHECK (typeof(ts_insert) = 'integer')
);
INSERT INTO schema_version (id, ts_insert) VALUES (1, strftime('%s'));
COMMIT;
"""
]
def migrate_db(log: logging.Logger, db_conn: sqlite3.Connection) -> None:
schema_verison = 0
try:
log.info("checking latest schema_version")
result = db_conn.execute("SELECT MAX(id) FROM schema_version").fetchone()
if result is not None:
schema_verison = result[0]
log.info(f"schema version is {schema_verison}")
except Exception:
log.info(f"missing schema_version table, assuming {schema_verison}")
for step_number, step in enumerate(__STEPS[schema_verison:], start=1):
log.info(f"applying schema_version {step_number}")
db_conn.executescript(step)
log.info(f"successfully applied schema_version {step_number}")