Created
February 18, 2020 19:44
-
-
Save JosephRedfern/79314fdf1875166e08489746029851c2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imaplib | |
import re | |
import random | |
from email.parser import BytesHeaderParser | |
from typing import List | |
import tqdm | |
class EmailMiner: | |
""" | |
Based on method/code described here: https://obem.be/2020/02/18/mining-my-mailbox-for-top-email-service-providers.html | |
""" | |
def __init__(self, hostname: str, username: str, password: str, port: int=imaplib.IMAP4_PORT, use_ssl: bool=True): | |
self.hostname = hostname | |
self.username = username | |
self.password = password | |
self.port = port | |
self.use_ssl = use_ssl | |
self.emails = None | |
def analyse(self, limit: int=None): | |
self.emails = self.download_emails(limit=limit) | |
processed = self.process_emails() | |
for hostname in processed: | |
print(hostname) | |
def download_emails(self, limit: int=None) -> List[str]: | |
""" | |
Download emails from IMAP server. | |
""" | |
# annoyingly, rather than just passing a flag, we need to use a different class for SSL. | |
imap_class = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4 | |
with imap_class(host=self.hostname, port=self.port) as imap: | |
imap.login(self.username, self.password) | |
imap.select("INBOX") | |
_, data = imap.search(None, "ALL") | |
# split message_ids to get a list of messages in mailbox | |
all_message_ids = data[0].split(b" ") | |
# if we have specified a limit, then randomly sample n message IDs. this will fail if limit > message count | |
if limit: | |
message_ids = random.sample(all_message_ids, limit) | |
else: | |
message_ids = all_message_ids | |
messages = [] | |
for mid in tqdm.tqdm(message_ids): | |
_, data = imap.fetch(mid, "(RFC822)") | |
# i'm not even going to pretend to understand the structure of the IMAP response... but these indices work for email | |
message = data[0][1] | |
messages.append(message) | |
return messages | |
def process_emails(self): | |
hostnames = [] | |
parser = BytesHeaderParser() | |
for email in self.emails: | |
parsed = parser.parsebytes(email) | |
recieved_headers = parsed.get_all("Received") | |
relevant_hostname = None | |
if not recieved_headers: # this is null for some emails? perhaps ones sent by google? | |
continue | |
# according to the original blog post, if there's >1 received header, we should examine the second. | |
if len(recieved_headers) > 1: | |
second_header = recieved_headers[1] # get the second header | |
match = re.match(r"by (?P<host>[^\s]*)", second_header) | |
if match: | |
relevant_hostname = match.group("host") | |
if relevant_hostname is None: | |
first_header = recieved_headers[0] | |
match = re.match(r"EHLO (?P<host>[^)]*)", first_header) | |
if match: | |
print("got one!") | |
relevant_hostname = match.group("host") | |
if relevant_hostname: | |
hostnames.append(relevant_hostname) | |
return hostnames | |
if __name__ == "__main__": | |
from credentials import HOSTNAME, USERNAME, PASSWORD, PORT | |
miner = EmailMiner(HOSTNAME, USERNAME, PASSWORD, PORT, use_ssl=True) | |
miner.analyse(limit=1000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment