start the transition to sqlite3 for storage

This commit is contained in:
Tony Blyler 2021-11-08 01:36:52 -05:00
parent fe02b11951
commit bba899677f
8 changed files with 326 additions and 51 deletions

1
.gitignore vendored
View file

@ -0,0 +1 @@
*/__pycache__

4
db/__init__.py Normal file
View file

@ -0,0 +1,4 @@
from db.migration import migrate_db
from db.conn import create_conn
__all__ = ["migrate_db", "create_conn"]

8
db/conn.py Normal file
View file

@ -0,0 +1,8 @@
import sqlite3
def create_conn(db_path: str) -> sqlite3.Connection:
db_conn = sqlite3.connect(db_path)
db_conn.execute("PRAGMA foreign_keys = ON")
return db_conn

0
db/message.py Normal file
View file

64
db/migration.py Normal file
View file

@ -0,0 +1,64 @@
import logging
import sqlite3
__STEPS = [
"""
BEGIN;
CREATE TABLE "transaction" (
id INTEGER PRIMARY KEY NOT NULL CHECK (typeof(id) = 'integer'),
email_message_id TEXT NOT NULL CHECK (typeof(email_message_id) = 'text'),
amount TEXT NOT NULL CHECK (typeof(amount) = 'text'),
card_ending_in TEXT NOT NULL CHECK (typeof(card_ending_in) = 'text'),
merchant TEXT NOT NULL CHECK (typeof(merchant) = 'text'),
date TEXT NOT NULL CHECK (typeof(date) = 'text'),
time TEXT NOT NULL CHECK (typeof(time) = 'text'),
acknowledged BOOLEAN NOT NULL CHECK (typeof(acknowledged) = 'integer'),
ts_update INTEGER NOT NULL CHECK (typeof(ts_update) = 'integer'),
ts_insert INTEGER NOT NULL CHECK (typeof(ts_insert) = 'integer')
);
CREATE UNIQUE INDEX ix__transaction__email_message_id ON
"transaction" (email_message_id);
CREATE INDEX ix__transaction__acknowledged ON
"transaction" (acknowledged);
CREATE TABLE notification (
id INTEGER PRIMARY KEY NOT NULL CHECK (typeof(id) = 'integer'),
id_transaction INTEGER NOT NULL CHECK (typeof(id_transaction) = 'integer'),
pushover_receipt TEXT NOT NULL CHECK (typeof(pushover_receipt) = 'text'),
acknowledged BOOLEAN NOT NULL CHECK (typeof(acknowledged) = 'integer'),
expired BOOLEAN NOT NULL CHECK (typeof(expired) = 'integer'),
ts_update INTEGER NOT NULL CHECK (typeof(ts_update) = 'integer'),
ts_insert INTEGER NOT NULL CHECK (typeof(ts_insert) = 'integer'),
FOREIGN KEY (id_transaction) REFERENCES "transaction" (id)
);
CREATE INDEX ix__notification__id_transaction ON
notification (id_transaction);
CREATE TABLE schema_version (
id INTEGER PRIMARY KEY NOT NULL CHECK (typeof(id) = 'integer'),
ts_insert INTEGER NOT NULL CHECK (typeof(ts_insert) = 'integer')
);
INSERT INTO schema_version (id, ts_insert) VALUES (1, strftime('%s'));
COMMIT;
"""
]
def migrate_db(log: logging.Logger, db_conn: sqlite3.Connection) -> None:
schema_verison = 0
try:
log.info("checking latest schema_version")
result = db_conn.execute("SELECT MAX(id) FROM schema_version").fetchone()
if result is not None:
schema_verison = result[0]
log.info(f"schema version is {schema_verison}")
except Exception:
log.info(f"missing schema_version table, assuming {schema_verison}")
for step_number, step in enumerate(__STEPS[schema_verison:], start=1):
log.info(f"applying schema_version {step_number}")
db_conn.executescript(step)
log.info(f"successfully applied schema_version {step_number}")

65
db/notification.py Normal file
View file

@ -0,0 +1,65 @@
from dataclasses import dataclass
from datetime import datetime
import logging
import sqlite3
from db import conn
@dataclass(frozen=True)
class Notification:
id: int
id_transaction: int
pushover_receipt: str
acknowledged: bool
expired: bool
ts_update: datetime
ts_insert: datetime
class NotificationManager:
def __init__(self, log: logging.Logger, db_conn: sqlite3.Connection):
self.__log = log
self.__db_conn = db_conn
def insert_notification(self, notification: Notification) -> Notification:
now = datetime.now()
result = self.__db_conn.execute(
"""
INSERT INTO notification (
id_transaction,
pushover_receipt,
acknowledged,
expired,
ts_update,
ts_insert
) VALUES (
?,
?,
?,
?,
?,
?
) RETURNING id;
""",
(
notification.id_transaction,
notification.pushover_receipt,
notification.acknowledged,
notification.expired,
now.strftime("%s"),
now.strftime("%s"),
),
).fetchone()
self.__db_conn.commit()
return Notification(
id=result[0],
id_transaction=notification.id_transaction,
pushover_receipt=notification.pushover_receipt,
acknowledged=notification.acknowledged,
expired=notification.expired,
ts_update=now,
ts_insert=now,
)

115
db/transaction.py Normal file
View file

@ -0,0 +1,115 @@
from datetime import datetime
import logging
import sqlite3
from typing import Iterable, Optional
from dataclasses import dataclass
@dataclass(frozen=True)
class Transaction:
id: int
email_message_id: str
amount: str
card_ending_in: str
merchant: str
date: str
time: str
acknowledged: bool
ts_update: datetime
ts_insert: datetime
class TransactionManager:
def __init__(self, log: logging.Logger, db_conn: sqlite3.Connection):
self.__log = log
self.__db_conn = db_conn
def get_by_email_message_id(self, email_message_id: str) -> Optional[Transaction]:
result = self.__db_conn.execute(
"""
SELECT
id,
email_message_id,
amount,
card_ending_in,
merchant,
date,
time,
acknowledged,
ts_update,
ts_insert
FROM "transaction"
WHERE email_message_id = ?
""",
(email_message_id,),
).fetchone()
if result is None:
return None
return Transaction(
id=result[0],
email_message_id=result[1],
amount=result[2],
card_ending_in=result[3],
merchant=result[4],
date=result[5],
time=result[6],
acknowledged=result[7],
ts_update=result[8],
ts_insert=result[9],
)
def insert_transaction(self, transaction: Transaction) -> Transaction:
now = datetime.now()
result = self.__db_conn.execute(
"""
INSERT INTO "transaction" (
email_message_id,
amount,
card_ending_in,
merchant,
date,
time,
acknowledged,
ts_update,
ts_insert
) VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?
) RETURNING id
""",
(
transaction.email_message_id,
transaction.amount,
transaction.card_ending_in,
transaction.merchant,
transaction.date,
transaction.time,
transaction.acknowledged,
int(now.strftime("%s")),
int(now.strftime("%s")),
),
).fetchone()
self.__db_conn.commit()
return Transaction(
id=result[0],
email_message_id=transaction.email_message_id,
amount=transaction.amount,
card_ending_in=transaction.card_ending_in,
merchant=transaction.merchant,
date=transaction.date,
time=transaction.time,
acknowledged=transaction.acknowledged,
ts_update=now,
ts_insert=now,
)

120
main.py
View file

@ -1,15 +1,20 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sqlite3
import json import json
import imaplib import imaplib
import logging import logging
import os import os
import email import email
from html.parser import HTMLParser from html.parser import HTMLParser
from typing import Any, Iterable, List, Set from typing import Any, Callable, Iterable, List, Set
import http.client import http.client
import urllib import urllib
from datetime import datetime, timedelta from datetime import datetime, timedelta
from db import migrate_db, create_conn
from db.notification import Notification, NotificationManager
from db.transaction import Transaction, TransactionManager
__logger = None __logger = None
@ -24,26 +29,9 @@ def get_logger() -> logging.Logger:
return __logger return __logger
class Transaction:
def __init__(self, message_id="", amount="", card_ending_in="", merchant="", date="", time=""):
self.message_id = message_id
self.amount = amount
self.card_ending_in = card_ending_in
self.merchant = merchant
self.date = date
self.time = time
def all_set(self) -> bool:
for val in self.__dict__.values():
if not val:
return False
return True
class MyHTMLParser(HTMLParser): class MyHTMLParser(HTMLParser):
def __init__(self): def __init__(self):
self.output = Transaction() self.__output = {}
self.__start_tds = 0 self.__start_tds = 0
self.__processing_card_ending_in = False self.__processing_card_ending_in = False
self.__processing_merchant = False self.__processing_merchant = False
@ -51,6 +39,24 @@ class MyHTMLParser(HTMLParser):
self.__processing_time = False self.__processing_time = False
super().__init__() super().__init__()
def output(self, email_message_id: str) -> Transaction:
transaction = Transaction(
id=0,
acknowledged=False,
amount=self.__output["amount"],
card_ending_in=self.__output["card_ending_in"],
date=self.__output["date"],
email_message_id=email_message_id,
merchant=self.__output["merchant"],
time=self.__output["time"],
ts_insert=datetime.now(),
ts_update=datetime.now(),
)
self.__output = {}
return transaction
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
self.__start_tds += 1 self.__start_tds += 1
@ -59,7 +65,7 @@ class MyHTMLParser(HTMLParser):
def handle_data(self, data: str) -> None: def handle_data(self, data: str) -> None:
if self.__start_tds > 0: if self.__start_tds > 0:
if self.output.all_set(): if len(self.__output) == 5:
return return
data = data.strip() data = data.strip()
@ -68,26 +74,26 @@ class MyHTMLParser(HTMLParser):
if self.__processing_card_ending_in: if self.__processing_card_ending_in:
self.__processing_card_ending_in = False self.__processing_card_ending_in = False
self.output.card_ending_in = data self.__output["card_ending_in"] = data
return return
if self.__processing_merchant: if self.__processing_merchant:
self.__processing_merchant = False self.__processing_merchant = False
self.output.merchant = data self.__output["merchant"] = data
return return
if self.__processing_date: if self.__processing_date:
self.__processing_date = False self.__processing_date = False
self.output.date = data self.__output["date"] = data
return return
if self.__processing_time: if self.__processing_time:
self.__processing_time = False self.__processing_time = False
self.output.time = data self.__output["time"] = data
return return
if data.startswith("Amount: $"): if data.startswith("Amount: $"):
self.output.amount = data.removeprefix("Amount: ") self.__output["amount"] = data.removeprefix("Amount: ")
return return
if data == "Card Ending In": if data == "Card Ending In":
@ -159,7 +165,7 @@ def get_transactions(
imap_user: str, imap_user: str,
imap_password: str, imap_password: str,
imap_mailbox: str, imap_mailbox: str,
ignore_message_ids: Set[str], ignore_message_id_callback: Callable[[str], bool],
) -> Iterable[Transaction]: ) -> Iterable[Transaction]:
log = get_logger() log = get_logger()
log.info(f"attempting to connect to {imap_host}:{imap_port} with SSL") log.info(f"attempting to connect to {imap_host}:{imap_port} with SSL")
@ -192,7 +198,7 @@ def get_transactions(
msg = email.message_from_bytes(email_data[1]) msg = email.message_from_bytes(email_data[1])
msg_id = msg.get("message-id") msg_id = msg.get("message-id")
if msg_id in ignore_message_ids: if ignore_message_id_callback(msg_id):
log.debug(f"ignoring message id {msg_id}") log.debug(f"ignoring message id {msg_id}")
continue continue
@ -208,43 +214,55 @@ def get_transactions(
body = msg.get_payload(decode=True) body = msg.get_payload(decode=True)
parser = MyHTMLParser() parser = MyHTMLParser()
parser.output.message_id = msg_id
parser.feed(str(body, "utf-8")) parser.feed(str(body, "utf-8"))
yield parser.output yield parser.output(msg_id)
if __name__ == "__main__": if __name__ == "__main__":
log = get_logger() log = get_logger()
db_conn = create_conn(os.environ["DB_FILE_PATH"])
migrate_db(log, db_conn)
transaction_manager = TransactionManager(log, db_conn)
notification_manager = NotificationManager(log, db_conn)
pushover_token = os.environ["PUSHOVER_TOKEN"] pushover_token = os.environ["PUSHOVER_TOKEN"]
pushover_user = os.environ["PUSHOVER_USER"] pushover_user = os.environ["PUSHOVER_USER"]
with open(os.environ["IMAP_PASSWORD_FILE"]) as password_file: with open(os.environ["IMAP_PASSWORD_FILE"]) as password_file:
imap_password = password_file.read() imap_password = password_file.read()
message_id_ignore_set: Set[str] = set() transactions = get_transactions(
with open(os.environ["MESSAGE_ID_LIST"], "a+") as message_id_file: imap_host=os.environ["IMAP_HOST"],
message_id_file.seek(0, 0) imap_port=int(os.environ["IMAP_PORT"]),
for message_id in message_id_file: imap_user=os.environ["IMAP_USER"],
message_id_ignore_set.add(message_id.strip()) imap_password=imap_password,
imap_mailbox=os.environ["IMAP_MAILBOX"],
transactions = get_transactions( ignore_message_id_callback=lambda email_message_id: transaction_manager.get_by_email_message_id(
imap_host=os.environ["IMAP_HOST"], email_message_id
imap_port=int(os.environ["IMAP_PORT"]),
imap_user=os.environ["IMAP_USER"],
imap_password=imap_password,
imap_mailbox=os.environ["IMAP_MAILBOX"],
ignore_message_ids=message_id_ignore_set,
) )
is not None,
)
count = 0 count = 0
for transaction in transactions: for transaction in transactions:
count += 1 count += 1
log.info(f"got message id {transaction.message_id}: {json.dumps(transaction.__dict__)}") log.info(f"got message id {transaction.email_message_id}: {transaction.__dict__}")
log.debug(f"sending pushover notification for message id {transaction.message_id}") log.debug(f"recording message id {transaction.email_message_id} to message id list")
send_pushover(pushover_token, pushover_user, transaction) transaction = transaction_manager.insert_transaction(transaction)
log.debug(f"recording message id {transaction.message_id} to message id list") log.debug(f"sending pushover notification for message id {transaction.email_message_id}")
message_id_file.writelines([transaction.message_id, "\n"]) notification_manager.insert_notification(
Notification(
id=0,
id_transaction=transaction.id,
pushover_receipt="foobar",
acknowledged=False,
expired=False,
ts_insert=0,
ts_update=0,
)
)
# send_pushover(pushover_token, pushover_user, transaction)
log.info(f"recorded {count} transactions") log.info(f"recorded {count} transactions")