citi-alerts/main.py

314 lines
10 KiB
Python
Executable file

#!/usr/bin/env python3
import sqlite3
import json
import imaplib
import logging
import os
import email
from html.parser import HTMLParser
from typing import Any, Callable, Iterable, List, Set
import http.client
import urllib
from datetime import datetime, timedelta
from db import migrate_db, create_conn
from db.notification import Notification, NotificationManager
from db.transaction import Transaction, TransactionManager
__MAX_PUSHOVER_EXPIRATION = 10800 # 3 hours, max allowed by Pushover's API
__logger = None
def get_logger() -> logging.Logger:
global __logger
if __logger is None:
level = logging._nameToLevel.get(os.environ.get("LOG_LEVEL", "").upper(), logging.INFO)
logging.basicConfig(level=level)
__logger = logging.getLogger("citi-alerts")
__logger.setLevel(level)
return __logger
class MyHTMLParser(HTMLParser):
def __init__(self):
self.__output = {}
self.__start_tds = 0
self.__processing_card_ending_in = False
self.__processing_merchant = False
self.__processing_date = False
self.__processing_time = False
super().__init__()
def output(self, email_message_id: str) -> Transaction:
transaction = Transaction(
id=0,
acknowledged=False,
amount=self.__output["amount"],
card_ending_in=self.__output["card_ending_in"],
date=self.__output["date"],
email_message_id=email_message_id,
merchant=self.__output["merchant"],
time=self.__output["time"],
ts_insert=datetime.now(),
ts_update=datetime.now(),
)
self.__output = {}
return transaction
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
self.__start_tds += 1
def handle_endtag(self, tag: str) -> None:
self.__start_tds -= 1
def handle_data(self, data: str) -> None:
if self.__start_tds > 0:
if len(self.__output) == 5:
return
data = data.strip()
if not data:
return
if self.__processing_card_ending_in:
self.__processing_card_ending_in = False
self.__output["card_ending_in"] = data
return
if self.__processing_merchant:
self.__processing_merchant = False
self.__output["merchant"] = data
return
if self.__processing_date:
self.__processing_date = False
self.__output["date"] = data
return
if self.__processing_time:
self.__processing_time = False
self.__output["time"] = data
return
if data.startswith("Amount: $"):
self.__output["amount"] = data.removeprefix("Amount: ")
return
if data == "Card Ending In":
self.__processing_card_ending_in = True
return
if data == "Merchant":
self.__processing_merchant = True
return
if data == "Date":
self.__processing_date = True
return
if data == "Time":
self.__processing_time = True
return
def grouper(data: Iterable, group_size: int) -> Iterable[List]:
if group_size < 1:
raise Exception("group_size must be >= 1")
group = []
group_len = 0
for d in data:
group.append(d)
group_len += 1
if group_len == group_size:
yield group
group.clear()
group_len = 0
if group_len:
yield group
def send_pushover(notification_manager: NotificationManager, token: str, user: str, transaction: Transaction) -> dict[str, Any]:
conn = http.client.HTTPSConnection("api.pushover.net:443")
try:
conn.request(
"POST",
"/1/messages.json",
urllib.parse.urlencode(
{
"token": token,
"user": user,
"priority": "2",
"retry": "3600",
"expire": f"{__MAX_PUSHOVER_EXPIRATION}",
"message": f'{transaction.amount} at "{transaction.merchant}" with card {transaction.card_ending_in} on {transaction.date} at {transaction.time}',
}
),
{"Content-type": "application/x-www-form-urlencoded"},
)
response = conn.getresponse()
body = response.read()
if response.getcode() != 200:
raise Exception(f"failed to send notifcation via Pushover: {str(body, 'utf-8')}")
response_payload = json.loads(body)
notification = notification_manager.insert_notification(Notification(
id=0,
ts_insert=datetime.now(),
ts_update=datetime.now(),
acknowledged=False,
expired=False,
id_transaction=transaction.id,
pushover_receipt=response_payload["receipt"],
user=pushover_user,
))
log.debug(f"recording notification id {notification.id}")
return response_payload
finally:
conn.close()
def get_pushover_receipt_status(receipt: str, token: str) -> dict[str, Any]:
conn = http.client.HTTPSConnection("api.pushover.net:443")
try:
conn.request(
"GET",
f"/1/receipts/{receipt}.json",
urllib.parse.urlencode(
{
"token": token,
}
),
{"Content-type": "application/x-www-form-urlencoded"},
)
response = conn.getresponse()
return json.loads(response.read())
finally:
conn.close()
def get_transactions(
imap_host: str,
imap_port: int,
imap_user: str,
imap_password: str,
imap_mailbox: str,
ignore_message_id_callback: Callable[[str], bool],
) -> Iterable[Transaction]:
log = get_logger()
log.info(f"attempting to connect to {imap_host}:{imap_port} with SSL")
with imaplib.IMAP4_SSL(host=imap_host, port=imap_port) as mail:
log.info(f"attemping to sign in with {imap_user}")
mail.login(user=imap_user, password=imap_password)
log.info(f"selecting mailbox {imap_mailbox}")
mail.select(mailbox=f'"{imap_mailbox}"', readonly=True)
one_week_ago = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
log.info(f"searching for Citi transaction e-mails as old as one week ago ({one_week_ago})")
status, data = mail.search(None, f'SUBJECT "transaction was made" SINCE {one_week_ago}')
if status != "OK":
raise Exception(f"failed to search for Citi transaction e-mails: {status}")
for email_ids in grouper(data[0].split(), 50):
log.info("getting a batch of up to 50 emails")
status, data = mail.fetch(
",".join(map(lambda email_id: str(email_id, "utf-8"), email_ids)),
"(RFC822)",
)
if status != "OK":
raise Exception(f"failed to fetch Citi transaction e-mails: {status}")
for email_data in data:
if not isinstance(email_data, tuple):
continue
msg = email.message_from_bytes(email_data[1])
msg_id = msg.get("message-id")
if ignore_message_id_callback(msg_id):
log.debug(f"ignoring message id {msg_id}")
continue
body = b""
if msg.is_multipart():
for part in msg.walk():
sub_body = part.get_payload(decode=True)
if sub_body is None:
continue
body += sub_body
else:
body = msg.get_payload(decode=True)
parser = MyHTMLParser()
parser.feed(str(body, "utf-8"))
yield parser.output(msg_id)
if __name__ == "__main__":
log = get_logger()
db_conn = create_conn(os.environ["DB_FILE_PATH"])
migrate_db(log, db_conn)
transaction_manager = TransactionManager(log, db_conn)
notification_manager = NotificationManager(log, db_conn)
pushover_token = os.environ["PUSHOVER_TOKEN"]
pushover_user = os.environ["PUSHOVER_USER"]
with open(os.environ["IMAP_PASSWORD_FILE"]) as password_file:
imap_password = password_file.read()
transactions = get_transactions(
imap_host=os.environ["IMAP_HOST"],
imap_port=int(os.environ["IMAP_PORT"]),
imap_user=os.environ["IMAP_USER"],
imap_password=imap_password,
imap_mailbox=os.environ["IMAP_MAILBOX"],
ignore_message_id_callback=lambda email_message_id: transaction_manager.get_by_email_message_id(
email_message_id
)
is not None,
)
count = 0
for transaction in transactions:
count += 1
log.info(f"got message id {transaction.email_message_id}: {transaction.__dict__}")
log.debug(f"recording message id {transaction.email_message_id} to message id list")
transaction = transaction_manager.insert_transaction(transaction)
log.debug(f"sending pushover notification for message id {transaction.email_message_id}")
response = send_pushover(notification_manager, pushover_token, pushover_user, transaction)
log.info(f"recorded {count} transactions")
log.info("checking transactions without an acknowledgement")
for transaction, notification in notification_manager.list_unacknowledged_transactions_with_notifications():
response = get_pushover_receipt_status(notification.pushover_receipt, pushover_token)
# if status is falsey, it has been >= 1 week
if not response["status"] or datetime.now() >= notification.ts_update + timedelta(days=1):
log.info(f"notification id {notification.id} for transaction id {transaction.id} is expired, retrying")
send_pushover(notification_manager, pushover_token, pushover_user, transaction)
notification_manager.expire_notification(notification.id)
continue
if response["acknowledged"]:
log.info(f"notification id {notification.id} for transaction id {transaction.id} is acknowledged")
notification_manager.acknowledge_notification(notification.id)
transaction_manager.acknowledge_transaction(transaction.id)
continue
log.debug(f"notification id {notification.id} is not expired nor acknowledged, continuing")