143 lines
4.5 KiB
Python
143 lines
4.5 KiB
Python
|
#!/usr/bin/env python3
|
||
|
from email import message
|
||
|
import imaplib
|
||
|
import os
|
||
|
import email
|
||
|
from html.parser import HTMLParser
|
||
|
from typing import Any, Iterable, List, Set
|
||
|
|
||
|
class Transaction:
|
||
|
def __init__(self, message_id="", amount="", card_ending_in="", merchant="", date="", time=""):
|
||
|
self.message_id = message_id
|
||
|
self.amount = amount
|
||
|
self.card_ending_in = card_ending_in
|
||
|
self.merchant = merchant
|
||
|
self.date = date
|
||
|
self.time = time
|
||
|
|
||
|
def all_set(self) -> bool:
|
||
|
for val in self.__dict__.values():
|
||
|
if not val:
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
class MyHTMLParser(HTMLParser):
|
||
|
def __init__(self):
|
||
|
self.output = Transaction()
|
||
|
self.__start_tds = 0
|
||
|
self.__processing_card_ending_in = False
|
||
|
self.__processing_merchant = False
|
||
|
self.__processing_date = False
|
||
|
self.__processing_time = False
|
||
|
super().__init__()
|
||
|
|
||
|
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
|
||
|
self.__start_tds += 1
|
||
|
|
||
|
def handle_endtag(self, tag: str) -> None:
|
||
|
self.__start_tds -= 1
|
||
|
|
||
|
def handle_data(self, data: str) -> None:
|
||
|
if self.__start_tds > 0:
|
||
|
if self.output.all_set():
|
||
|
return
|
||
|
|
||
|
data = data.strip()
|
||
|
if not data:
|
||
|
return
|
||
|
|
||
|
if self.__processing_card_ending_in:
|
||
|
self.__processing_card_ending_in = False
|
||
|
self.output.card_ending_in = data
|
||
|
return
|
||
|
|
||
|
if self.__processing_merchant:
|
||
|
self.__processing_merchant = False
|
||
|
self.output.merchant = data
|
||
|
return
|
||
|
|
||
|
if self.__processing_date:
|
||
|
self.__processing_date = False
|
||
|
self.output.date = data
|
||
|
return
|
||
|
|
||
|
if self.__processing_time:
|
||
|
self.__processing_time = False
|
||
|
self.output.time = data
|
||
|
return
|
||
|
|
||
|
if data.startswith("Amount: $"):
|
||
|
self.output.amount = data.removeprefix("Amount: ")
|
||
|
return
|
||
|
|
||
|
if data == "Card Ending In":
|
||
|
self.__processing_card_ending_in = True
|
||
|
return
|
||
|
|
||
|
if data == "Merchant":
|
||
|
self.__processing_merchant = True
|
||
|
return
|
||
|
|
||
|
if data == "Date":
|
||
|
self.__processing_date = True
|
||
|
return
|
||
|
|
||
|
if data == "Time":
|
||
|
self.__processing_time = True
|
||
|
return
|
||
|
|
||
|
|
||
|
def get_transactions(imap_host: str, imap_port: int, imap_user: str, imap_password: str, imap_mailbox: str, ignore_message_ids: Set[str]) -> Iterable[Transaction]:
|
||
|
with imaplib.IMAP4_SSL(host=imap_host, port=imap_port) as mail:
|
||
|
mail.login(user=imap_user, password=imap_password)
|
||
|
mail.select(mailbox=f'"{imap_mailbox}"', readonly=True)
|
||
|
typ, data = mail.search(None, 'SUBJECT "transaction was made"')
|
||
|
for num in data[0].split():
|
||
|
typ, data = mail.fetch(num, "(RFC822)")
|
||
|
msg = email.message_from_bytes(data[0][1])
|
||
|
msg_id = msg.get("Message-ID")
|
||
|
|
||
|
if msg_id in ignore_message_ids:
|
||
|
continue
|
||
|
|
||
|
body = b""
|
||
|
if msg.is_multipart():
|
||
|
for part in msg.walk():
|
||
|
sub_body = part.get_payload(decode=True)
|
||
|
if sub_body is None:
|
||
|
continue
|
||
|
|
||
|
body += sub_body
|
||
|
else:
|
||
|
body = msg.get_payload(decode=True)
|
||
|
|
||
|
parser = MyHTMLParser()
|
||
|
parser.output.message_id = msg_id
|
||
|
parser.feed(str(body, 'utf-8'))
|
||
|
|
||
|
yield parser.output
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
with open(os.environ["IMAP_PASSWORD_FILE"]) as password_file:
|
||
|
imap_password = password_file.read()
|
||
|
|
||
|
message_id_ignore_set: Set[str] = set()
|
||
|
with open(os.environ["MESSAGE_ID_LIST"], "a+") as message_id_file:
|
||
|
message_id_file.seek(0, 0)
|
||
|
for message_id in message_id_file:
|
||
|
print(message_id.strip())
|
||
|
message_id_ignore_set.add(message_id.strip())
|
||
|
|
||
|
transactions = get_transactions(
|
||
|
imap_host=os.environ["IMAP_HOST"],
|
||
|
imap_port=int(os.environ["IMAP_PORT"]),
|
||
|
imap_user=os.environ["IMAP_USER"],
|
||
|
imap_password=imap_password,
|
||
|
imap_mailbox=os.environ["IMAP_MAILBOX"],
|
||
|
ignore_message_ids=message_id_ignore_set,
|
||
|
)
|
||
|
|
||
|
for transaction in transactions:
|
||
|
message_id_file.writelines([transaction.message_id, "\n"])
|