#!/usr/bin/env python3 import sqlite3 import json import imaplib import logging import os import email from html.parser import HTMLParser from typing import Any, Callable, Iterable, List, Set import http.client import urllib from datetime import datetime, timedelta from db import migrate_db, create_conn from db.notification import Notification, NotificationManager from db.transaction import Transaction, TransactionManager __MAX_PUSHOVER_EXPIRATION = 10800 # 3 hours, max allowed by Pushover's API __logger = None def get_logger() -> logging.Logger: global __logger if __logger is None: level = logging._nameToLevel.get(os.environ.get("LOG_LEVEL", "").upper(), logging.INFO) logging.basicConfig(level=level) __logger = logging.getLogger("citi-alerts") __logger.setLevel(level) return __logger class MyHTMLParser(HTMLParser): def __init__(self): self.__output = {} self.__start_tds = 0 self.__processing_card_ending_in = False self.__processing_merchant = False self.__processing_date = False self.__processing_time = False super().__init__() def output(self, email_message_id: str) -> Transaction: transaction = Transaction( id=0, acknowledged=False, amount=self.__output["amount"], card_ending_in=self.__output["card_ending_in"], date=self.__output["date"], email_message_id=email_message_id, merchant=self.__output["merchant"], time=self.__output["time"], ts_insert=datetime.now(), ts_update=datetime.now(), ) self.__output = {} return transaction def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: self.__start_tds += 1 def handle_endtag(self, tag: str) -> None: self.__start_tds -= 1 def handle_data(self, data: str) -> None: if self.__start_tds > 0: if len(self.__output) == 5: return data = data.strip() if not data: return if self.__processing_card_ending_in: self.__processing_card_ending_in = False self.__output["card_ending_in"] = data return if self.__processing_merchant: self.__processing_merchant = False self.__output["merchant"] = data return if self.__processing_date: self.__processing_date = False self.__output["date"] = data return if self.__processing_time: self.__processing_time = False self.__output["time"] = data return if data.startswith("Amount: $"): self.__output["amount"] = data.removeprefix("Amount: ") return if data == "Card Ending In": self.__processing_card_ending_in = True return if data == "Merchant": self.__processing_merchant = True return if data == "Date": self.__processing_date = True return if data == "Time": self.__processing_time = True return def grouper(data: Iterable, group_size: int) -> Iterable[List]: if group_size < 1: raise Exception("group_size must be >= 1") group = [] group_len = 0 for d in data: group.append(d) group_len += 1 if group_len == group_size: yield group group.clear() group_len = 0 if group_len: yield group def send_pushover(notification_manager: NotificationManager, token: str, user: str, transaction: Transaction) -> dict[str, Any]: conn = http.client.HTTPSConnection("api.pushover.net:443") try: conn.request( "POST", "/1/messages.json", urllib.parse.urlencode( { "token": token, "user": user, "priority": "2", "retry": "3600", "expire": f"{__MAX_PUSHOVER_EXPIRATION}", "message": f'{transaction.amount} at "{transaction.merchant}" with card {transaction.card_ending_in} on {transaction.date} at {transaction.time}', } ), {"Content-type": "application/x-www-form-urlencoded"}, ) response = conn.getresponse() body = response.read() if response.getcode() != 200: raise Exception(f"failed to send notifcation via Pushover: {str(body, 'utf-8')}") response_payload = json.loads(body) notification = notification_manager.insert_notification(Notification( id=0, ts_insert=datetime.now(), ts_update=datetime.now(), acknowledged=False, expired=False, id_transaction=transaction.id, pushover_receipt=response_payload["receipt"], user=pushover_user, )) log.debug(f"recording notification id {notification.id}") return response_payload finally: conn.close() def get_pushover_receipt_status(receipt: str, token: str) -> dict[str, Any]: conn = http.client.HTTPSConnection("api.pushover.net:443") try: conn.request( "GET", f"/1/receipts/{receipt}.json", urllib.parse.urlencode( { "token": token, } ), {"Content-type": "application/x-www-form-urlencoded"}, ) response = conn.getresponse() return json.loads(response.read()) finally: conn.close() def get_transactions( imap_host: str, imap_port: int, imap_user: str, imap_password: str, imap_mailbox: str, ignore_message_id_callback: Callable[[str], bool], ) -> Iterable[Transaction]: log = get_logger() log.info(f"attempting to connect to {imap_host}:{imap_port} with SSL") with imaplib.IMAP4_SSL(host=imap_host, port=imap_port) as mail: log.info(f"attemping to sign in with {imap_user}") mail.login(user=imap_user, password=imap_password) log.info(f"selecting mailbox {imap_mailbox}") mail.select(mailbox=f'"{imap_mailbox}"', readonly=True) one_week_ago = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y") log.info(f"searching for Citi transaction e-mails as old as one week ago ({one_week_ago})") status, data = mail.search(None, f'SUBJECT "transaction was made" SINCE {one_week_ago}') if status != "OK": raise Exception(f"failed to search for Citi transaction e-mails: {status}") for email_ids in grouper(data[0].split(), 50): log.info("getting a batch of up to 50 emails") status, data = mail.fetch( ",".join(map(lambda email_id: str(email_id, "utf-8"), email_ids)), "(RFC822)", ) if status != "OK": raise Exception(f"failed to fetch Citi transaction e-mails: {status}") for email_data in data: if not isinstance(email_data, tuple): continue msg = email.message_from_bytes(email_data[1]) msg_id = msg.get("message-id") if ignore_message_id_callback(msg_id): log.debug(f"ignoring message id {msg_id}") continue body = b"" if msg.is_multipart(): for part in msg.walk(): sub_body = part.get_payload(decode=True) if sub_body is None: continue body += sub_body else: body = msg.get_payload(decode=True) parser = MyHTMLParser() parser.feed(str(body, "utf-8")) yield parser.output(msg_id) if __name__ == "__main__": log = get_logger() db_conn = create_conn(os.environ["DB_FILE_PATH"]) migrate_db(log, db_conn) transaction_manager = TransactionManager(log, db_conn) notification_manager = NotificationManager(log, db_conn) pushover_token = os.environ["PUSHOVER_TOKEN"] pushover_user = os.environ["PUSHOVER_USER"] with open(os.environ["IMAP_PASSWORD_FILE"]) as password_file: imap_password = password_file.read() transactions = get_transactions( imap_host=os.environ["IMAP_HOST"], imap_port=int(os.environ["IMAP_PORT"]), imap_user=os.environ["IMAP_USER"], imap_password=imap_password, imap_mailbox=os.environ["IMAP_MAILBOX"], ignore_message_id_callback=lambda email_message_id: transaction_manager.get_by_email_message_id( email_message_id ) is not None, ) count = 0 for transaction in transactions: count += 1 log.info(f"got message id {transaction.email_message_id}: {transaction.__dict__}") log.debug(f"recording message id {transaction.email_message_id} to message id list") transaction = transaction_manager.insert_transaction(transaction) log.debug(f"sending pushover notification for message id {transaction.email_message_id}") response = send_pushover(notification_manager, pushover_token, pushover_user, transaction) log.info(f"recorded {count} transactions") log.info("checking transactions without an acknowledgement") for transaction, notification in notification_manager.list_unacknowledged_transactions_with_notifications(): response = get_pushover_receipt_status(notification.pushover_receipt, pushover_token) # if status is falsey, it has been >= 1 week if not response["status"] or datetime.now() >= notification.ts_update + timedelta(days=1): log.info(f"notification id {notification.id} for transaction id {transaction.id} is expired, retrying") send_pushover(notification_manager, pushover_token, pushover_user, transaction) notification_manager.expire_notification(notification.id) continue if response["acknowledged"]: log.info(f"notification id {notification.id} for transaction id {transaction.id} is acknowledged") notification_manager.acknowledge_notification(notification.id) transaction_manager.acknowledge_transaction(transaction.id) continue log.debug(f"notification id {notification.id} is not expired nor acknowledged, continuing")