citi-alerts/main.py

250 lines
8 KiB
Python
Executable file

#!/usr/bin/env python3
import json
import imaplib
import logging
import os
import email
from html.parser import HTMLParser
from typing import Any, Iterable, List, Set
import http.client
import urllib
from datetime import datetime, timedelta
__logger = None
def get_logger() -> logging.Logger:
global __logger
if __logger is None:
level = logging._nameToLevel.get(os.environ.get("LOG_LEVEL", "").upper(), logging.INFO)
logging.basicConfig(level=level)
__logger = logging.getLogger("citi-alerts")
__logger.setLevel(level)
return __logger
class Transaction:
def __init__(self, message_id="", amount="", card_ending_in="", merchant="", date="", time=""):
self.message_id = message_id
self.amount = amount
self.card_ending_in = card_ending_in
self.merchant = merchant
self.date = date
self.time = time
def all_set(self) -> bool:
for val in self.__dict__.values():
if not val:
return False
return True
class MyHTMLParser(HTMLParser):
def __init__(self):
self.output = Transaction()
self.__start_tds = 0
self.__processing_card_ending_in = False
self.__processing_merchant = False
self.__processing_date = False
self.__processing_time = False
super().__init__()
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
self.__start_tds += 1
def handle_endtag(self, tag: str) -> None:
self.__start_tds -= 1
def handle_data(self, data: str) -> None:
if self.__start_tds > 0:
if self.output.all_set():
return
data = data.strip()
if not data:
return
if self.__processing_card_ending_in:
self.__processing_card_ending_in = False
self.output.card_ending_in = data
return
if self.__processing_merchant:
self.__processing_merchant = False
self.output.merchant = data
return
if self.__processing_date:
self.__processing_date = False
self.output.date = data
return
if self.__processing_time:
self.__processing_time = False
self.output.time = data
return
if data.startswith("Amount: $"):
self.output.amount = data.removeprefix("Amount: ")
return
if data == "Card Ending In":
self.__processing_card_ending_in = True
return
if data == "Merchant":
self.__processing_merchant = True
return
if data == "Date":
self.__processing_date = True
return
if data == "Time":
self.__processing_time = True
return
def grouper(data: Iterable, group_size: int) -> Iterable[List]:
if group_size < 1:
raise Exception("group_size must be >= 1")
group = []
group_len = 0
for d in data:
group.append(d)
group_len += 1
if group_len == group_size:
yield group
group.clear()
group_len = 0
if group_len:
yield group
def send_pushover(token: str, user: str, transaction: Transaction) -> dict[str, Any]:
conn = http.client.HTTPSConnection("api.pushover.net:443")
try:
conn.request(
"POST",
"/1/messages.json",
urllib.parse.urlencode(
{
"token": token,
"user": user,
"priority": "2",
"retry": "3600", # 3 hours, max allowed by Pushover's API
"expire": "10800", # 3 hours, max allowed by Pushover's API
"message": f'{transaction.amount} at "{transaction.merchant}" with card {transaction.card_ending_in} on {transaction.date} at {transaction.time}',
}
),
{"Content-type": "application/x-www-form-urlencoded"},
)
response = conn.getresponse()
body = response.read()
if response.getcode() != 200:
raise Exception(f"failed to send notifcation via Pushover: {str(body, 'utf-8')}")
return json.loads(body)
finally:
conn.close()
def get_transactions(
imap_host: str,
imap_port: int,
imap_user: str,
imap_password: str,
imap_mailbox: str,
ignore_message_ids: Set[str],
) -> Iterable[Transaction]:
log = get_logger()
log.info(f"attempting to connect to {imap_host}:{imap_port} with SSL")
with imaplib.IMAP4_SSL(host=imap_host, port=imap_port) as mail:
log.info(f"attemping to sign in with {imap_user}")
mail.login(user=imap_user, password=imap_password)
log.info(f"selecting mailbox {imap_mailbox}")
mail.select(mailbox=f'"{imap_mailbox}"', readonly=True)
one_week_ago = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
log.info(f"searching for Citi transaction e-mails as old as one week ago ({one_week_ago})")
status, data = mail.search(None, f'SUBJECT "transaction was made" SINCE {one_week_ago}')
if status != "OK":
raise Exception(f"failed to search for Citi transaction e-mails: {status}")
for email_ids in grouper(data[0].split(), 50):
log.info("getting a batch of up to 50 emails")
status, data = mail.fetch(
",".join(map(lambda email_id: str(email_id, "utf-8"), email_ids)),
"(RFC822)",
)
if status != "OK":
raise Exception(f"failed to fetch Citi transaction e-mails: {status}")
for email_data in data:
if not isinstance(email_data, tuple):
continue
msg = email.message_from_bytes(email_data[1])
msg_id = msg.get("message-id")
if msg_id in ignore_message_ids:
log.debug(f"ignoring message id {msg_id}")
continue
body = b""
if msg.is_multipart():
for part in msg.walk():
sub_body = part.get_payload(decode=True)
if sub_body is None:
continue
body += sub_body
else:
body = msg.get_payload(decode=True)
parser = MyHTMLParser()
parser.output.message_id = msg_id
parser.feed(str(body, "utf-8"))
yield parser.output
if __name__ == "__main__":
log = get_logger()
pushover_token = os.environ["PUSHOVER_TOKEN"]
pushover_user = os.environ["PUSHOVER_USER"]
with open(os.environ["IMAP_PASSWORD_FILE"]) as password_file:
imap_password = password_file.read()
message_id_ignore_set: Set[str] = set()
with open(os.environ["MESSAGE_ID_LIST"], "a+") as message_id_file:
message_id_file.seek(0, 0)
for message_id in message_id_file:
message_id_ignore_set.add(message_id.strip())
transactions = get_transactions(
imap_host=os.environ["IMAP_HOST"],
imap_port=int(os.environ["IMAP_PORT"]),
imap_user=os.environ["IMAP_USER"],
imap_password=imap_password,
imap_mailbox=os.environ["IMAP_MAILBOX"],
ignore_message_ids=message_id_ignore_set,
)
count = 0
for transaction in transactions:
count += 1
log.info(f"got message id {transaction.message_id}: {json.dumps(transaction.__dict__)}")
log.debug(f"sending pushover notification for message id {transaction.message_id}")
send_pushover(pushover_token, pushover_user, transaction)
log.debug(f"recording message id {transaction.message_id} to message id list")
message_id_file.writelines([transaction.message_id, "\n"])
log.info(f"recorded {count} transactions")