Add Pushover support and make the IMAP searching more efficient

This commit is contained in:
Tony Blyler 2021-11-07 01:50:24 -04:00
parent a914d20bec
commit 2e2a18c037
2 changed files with 138 additions and 24 deletions

7
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,7 @@
{
"python.formatting.provider": "black",
"python.formatting.blackArgs": [
"-l",
"120"
]
}

155
main.py
View file

@ -1,10 +1,28 @@
#!/usr/bin/env python3
from email import message
import json
import imaplib
import logging
import os
import email
from html.parser import HTMLParser
from typing import Any, Iterable, List, Set
import http.client
import urllib
from datetime import datetime, timedelta
__logger = None
def get_logger() -> logging.Logger:
global __logger
if __logger is None:
level = logging._nameToLevel.get(os.environ.get("LOG_LEVEL", "").upper(), logging.INFO)
logging.basicConfig(level=level)
__logger = logging.getLogger("citi-alerts")
__logger.setLevel(level)
return __logger
class Transaction:
def __init__(self, message_id="", amount="", card_ending_in="", merchant="", date="", time=""):
@ -22,6 +40,7 @@ class Transaction:
return True
class MyHTMLParser(HTMLParser):
def __init__(self):
self.output = Transaction()
@ -88,38 +107,118 @@ class MyHTMLParser(HTMLParser):
return
def get_transactions(imap_host: str, imap_port: int, imap_user: str, imap_password: str, imap_mailbox: str, ignore_message_ids: Set[str]) -> Iterable[Transaction]:
def grouper(data: Iterable, group_size: int) -> Iterable[List]:
if group_size < 1:
raise Exception("group_size must be >= 1")
group = []
group_len = 0
for d in data:
group.append(d)
group_len += 1
if group_len == group_size:
yield group
group.clear()
group_len = 0
if group_len:
yield group
def send_pushover(token: str, user: str, transaction: Transaction) -> dict[str, Any]:
conn = http.client.HTTPSConnection("api.pushover.net:443")
try:
conn.request(
"POST",
"/1/messages.json",
urllib.parse.urlencode(
{
"token": token,
"user": user,
"priority": "2",
"retry": "3600", # 3 hours, max allowed by Pushover's API
"expire": "10800", # 3 hours, max allowed by Pushover's API
"message": f'{transaction.amount} at "{transaction.merchant}" with card {transaction.card_ending_in} on {transaction.date} at {transaction.time}',
}
),
{"Content-type": "application/x-www-form-urlencoded"},
)
response = conn.getresponse()
body = response.read()
if response.getcode() != 200:
raise Exception(f"failed to send notifcation via Pushover: {str(body, 'utf-8')}")
return json.loads(body)
finally:
conn.close()
def get_transactions(
imap_host: str,
imap_port: int,
imap_user: str,
imap_password: str,
imap_mailbox: str,
ignore_message_ids: Set[str],
) -> Iterable[Transaction]:
log = get_logger()
log.info(f"attempting to connect to {imap_host}:{imap_port} with SSL")
with imaplib.IMAP4_SSL(host=imap_host, port=imap_port) as mail:
log.info(f"attemping to sign in with {imap_user}")
mail.login(user=imap_user, password=imap_password)
log.info(f"selecting mailbox {imap_mailbox}")
mail.select(mailbox=f'"{imap_mailbox}"', readonly=True)
typ, data = mail.search(None, 'SUBJECT "transaction was made"')
for num in data[0].split():
typ, data = mail.fetch(num, "(RFC822)")
msg = email.message_from_bytes(data[0][1])
msg_id = msg.get("Message-ID")
if msg_id in ignore_message_ids:
continue
one_week_ago = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
log.info(f"searching for Citi transaction e-mails as old as one week ago ({one_week_ago})")
status, data = mail.search(None, f'SUBJECT "transaction was made" SINCE {one_week_ago}')
if status != "OK":
raise Exception(f"failed to search for Citi transaction e-mails: {status}")
body = b""
if msg.is_multipart():
for part in msg.walk():
sub_body = part.get_payload(decode=True)
if sub_body is None:
continue
for email_ids in grouper(data[0].split(), 50):
log.info("getting a batch of up to 50 emails")
status, data = mail.fetch(
",".join(map(lambda email_id: str(email_id, "utf-8"), email_ids)),
"(RFC822)",
)
if status != "OK":
raise Exception(f"failed to fetch Citi transaction e-mails: {status}")
body += sub_body
else:
body = msg.get_payload(decode=True)
for email_data in data:
if not isinstance(email_data, tuple):
continue
parser = MyHTMLParser()
parser.output.message_id = msg_id
parser.feed(str(body, 'utf-8'))
msg = email.message_from_bytes(email_data[1])
msg_id = msg.get("message-id")
yield parser.output
if msg_id in ignore_message_ids:
log.debug(f"ignoring message id {msg_id}")
continue
body = b""
if msg.is_multipart():
for part in msg.walk():
sub_body = part.get_payload(decode=True)
if sub_body is None:
continue
body += sub_body
else:
body = msg.get_payload(decode=True)
parser = MyHTMLParser()
parser.output.message_id = msg_id
parser.feed(str(body, "utf-8"))
yield parser.output
if __name__ == "__main__":
log = get_logger()
pushover_token = os.environ["PUSHOVER_TOKEN"]
pushover_user = os.environ["PUSHOVER_USER"]
with open(os.environ["IMAP_PASSWORD_FILE"]) as password_file:
imap_password = password_file.read()
@ -127,7 +226,6 @@ if __name__ == "__main__":
with open(os.environ["MESSAGE_ID_LIST"], "a+") as message_id_file:
message_id_file.seek(0, 0)
for message_id in message_id_file:
print(message_id.strip())
message_id_ignore_set.add(message_id.strip())
transactions = get_transactions(
@ -139,5 +237,14 @@ if __name__ == "__main__":
ignore_message_ids=message_id_ignore_set,
)
count = 0
for transaction in transactions:
message_id_file.writelines([transaction.message_id, "\n"])
count += 1
log.info(f"got message id {transaction.message_id}: {json.dumps(transaction.__dict__)}")
log.debug(f"sending pushover notification for message id {transaction.message_id}")
send_pushover(pushover_token, pushover_user, transaction)
log.debug(f"recording message id {transaction.message_id} to message id list")
message_id_file.writelines([transaction.message_id, "\n"])
log.info(f"recorded {count} transactions")