Created
August 8, 2024 06:07
-
-
Save ColeMurray/63baface4dcbede69390c5a3c9f22940 to your computer and use it in GitHub Desktop.
Using GPT to auto-label gmail
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import base64 | |
import json | |
import logging | |
from datetime import datetime, timedelta | |
from typing import List | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import Resource, build | |
from googleapiclient.errors import HttpError | |
from dotenv import load_dotenv | |
from openai import OpenAI | |
import sqlite3 | |
import requests | |
import json | |
import logging | |
import time | |
from ratelimit import limits, sleep_and_retry | |
from google.auth.transport.requests import Request | |
# Load environment variables | |
load_dotenv() | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
llm_log_file = 'llm_interactions.jsonl' | |
# Constants and configurations | |
SCOPES = [ | |
"https://www.googleapis.com/auth/gmail.readonly", | |
"https://www.googleapis.com/auth/gmail.labels", | |
"https://www.googleapis.com/auth/gmail.modify", | |
] | |
TOKEN_FILE = "token.json" | |
CREDENTIALS_FILE = "credentials.json" | |
LAST_RUN_FILE = "last_run.json" | |
PROCESSED_LABEL = "Processed" | |
CATEGORY_LABELS = [ | |
"Marketing", | |
"Response Needed / High Priority", | |
"Bills", | |
"Subscriptions", | |
"Newsletters", | |
"Personal", | |
"Work", | |
"Events", | |
"Travel", | |
"Receipts", | |
"Low quality", | |
"Notifications" | |
] | |
DATABASE_FILE = "email_states.db" | |
PREVIEW_MODE = False | |
# OpenAI configuration | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
OPENAI_MODEL = "gpt-4o-mini" | |
LLM_SERVICE = os.getenv("LLM_SERVICE", "OpenAI") # Default to OpenAI if not specified | |
# Ollama API URL | |
OLLAMA_API_URL = "http://0.0.0.0:11434/api/chat" | |
# Set up rate limiting: adjust as needed for your local setup | |
@sleep_and_retry | |
@limits(calls=500, period=60) | |
def call_ollama_api(prompt): | |
payload = { | |
"stream": False, | |
"format": "json", | |
"model": "llama3.1", | |
"messages": [ | |
{"role": "user", "content": prompt} | |
] | |
} | |
try: | |
start_time = time.time() | |
response = requests.post(OLLAMA_API_URL, json=payload) | |
response.raise_for_status() | |
end_time = time.time() | |
# Log the request and response | |
log_entry = { | |
"request_timestamp": start_time, | |
"response_timestamp": end_time, | |
"duration": end_time - start_time, | |
"request": payload, | |
"response": response.json() | |
} | |
with open(llm_log_file, 'a') as f: | |
f.write(json.dumps(log_entry) + '\n') | |
return response.json()['message']['content'] | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Ollama API error: {str(e)}") | |
raise | |
def initialize_db(): | |
"""Initialize the SQLite database and create the necessary tables.""" | |
conn = sqlite3.connect(DATABASE_FILE) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS email_states ( | |
email_id TEXT PRIMARY KEY, | |
labels TEXT | |
) | |
''') | |
conn.commit() | |
conn.close() | |
def store_email_state(email_id: str, labels: List[str]): | |
"""Stores the initial state of an email's labels in the database.""" | |
conn = sqlite3.connect(DATABASE_FILE) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT INTO email_states (email_id, labels) VALUES (?, ?) | |
ON CONFLICT(email_id) DO UPDATE SET labels=excluded.labels; | |
''', (email_id, json.dumps(labels))) | |
conn.commit() | |
conn.close() | |
def retrieve_email_state(email_id: str) -> List[str]: | |
"""Retrieves the stored state of an email's labels from the database.""" | |
conn = sqlite3.connect(DATABASE_FILE) | |
cursor = conn.cursor() | |
cursor.execute('SELECT labels FROM email_states WHERE email_id = ?', (email_id,)) | |
row = cursor.fetchone() | |
conn.close() | |
return json.loads(row[0]) if row else [] | |
def get_gmail_client() -> Resource: | |
"""Creates and returns a Gmail client.""" | |
creds = None | |
if os.path.exists(TOKEN_FILE): | |
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) | |
creds = flow.run_local_server(port=8080) | |
with open(TOKEN_FILE, "w") as token: | |
token.write(creds.to_json()) | |
return build("gmail", "v1", credentials=creds) | |
def get_last_run_time() -> datetime: | |
"""Gets the last run time from file or returns a default time.""" | |
if os.path.exists(LAST_RUN_FILE): | |
with open(LAST_RUN_FILE, 'r') as f: | |
data = json.load(f) | |
return datetime.fromisoformat(data['last_run']) | |
return datetime.now() - timedelta(days=7) # Default to 7 days ago if no last run | |
def update_last_run_time(): | |
"""Updates the last run time in the file.""" | |
with open(LAST_RUN_FILE, 'w') as f: | |
json.dump({'last_run': datetime.now().isoformat()}, f) | |
def build_query(last_run: datetime) -> str: | |
"""Builds the query string for fetching emails.""" | |
#return f"is:unread -label:{PROCESSED_LABEL} after:{last_run.strftime('%Y/%m/%d')}" | |
return f"is:unread after:{last_run.strftime('%Y/%m/%d')}" | |
def fetch_emails(gmail: Resource, query: str) -> List[dict]: | |
"""Fetches emails based on the given query.""" | |
try: | |
results = gmail.users().messages().list(userId="me", q=query).execute() | |
return results.get("messages", []) | |
except HttpError as error: | |
logging.error(f"Failed to fetch emails: {error}") | |
raise | |
def get_or_create_label(gmail: Resource, label_name: str) -> str: | |
"""Gets or creates a label and returns its ID.""" | |
try: | |
results = gmail.users().labels().list(userId="me").execute() | |
labels = results.get("labels", []) | |
for label in labels: | |
if label["name"] == label_name: | |
return label["id"] | |
# If the label doesn't exist, create it | |
label = { | |
"name": label_name, | |
"labelListVisibility": "labelShow", | |
"messageListVisibility": "show" | |
} | |
created_label = gmail.users().labels().create(userId="me", body=label).execute() | |
return created_label["id"] | |
except HttpError as error: | |
logging.error(f"An error occurred while managing label {label_name}: {error}") | |
return None | |
def categorize_email_with_openai(email_content: str) -> str: | |
"""Categorizes an email using OpenAI's language model.""" | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
prompt = f""" | |
Categorize the following email into one of these categories: {', '.join(CATEGORY_LABELS)}. | |
Respond with only the category name. | |
Email content: | |
{email_content} | |
""" | |
try: | |
start_time = time.time() | |
response = client.chat.completions.create( | |
model=OPENAI_MODEL, | |
messages=[ | |
{"role": "system", "content": "You are an AI assistant that categorizes emails."}, | |
{"role": "user", "content": prompt} | |
], | |
max_tokens=10, | |
temperature=0.3 | |
) | |
end_time = time.time() | |
log_entry = { | |
"request_timestamp": start_time, | |
"response_timestamp": end_time, | |
"duration": end_time - start_time, | |
"request": {"prompt": prompt}, | |
"response": response.choices[0].text.strip() | |
} | |
with open(llm_log_file, 'a') as f: | |
f.write(json.dumps(log_entry) + '\n') | |
category = response.choices[0].message.content.strip() | |
return category if category in CATEGORY_LABELS else "Other" | |
except Exception as e: | |
logging.error(f"Error in OpenAI categorization: {e}") | |
return "Other" | |
def categorize_email_with_ollama(email_content: str) -> str: | |
"""Categorizes an email using the local Ollama LLM.""" | |
try: | |
system_prompt = f"""You are an AI trained to categorize emails into predefined categories. | |
1. Provide a concise explanation for the selected category basd on the email. | |
2. Categorize the following email into one of these categories. Only use the provided category labels and their descriptions: | |
CATEGORY_LABELS = [ | |
"Marketing" – Emails promoting products, services, or sales, | |
"Response Needed / High Priority" – Emails requiring urgent attention or action, | |
"Bills" – Emails related to payments or invoices, | |
"Subscriptions" – Emails about subscription services or renewals, | |
"Newsletters" – Regularly scheduled updates or informational emails, | |
"Personal" – Emails from friends, family, or personal contacts, | |
"Work" – Emails related to your job or professional activities, | |
"Events" – Emails about upcoming events or invitations, | |
"Travel" – Emails regarding travel plans, bookings, or itineraries, | |
"Receipts" – Emails containing proof of purchase or transaction details, | |
"Low quality" – Emails with spammy content or poor quality, | |
"Notifications" – Automated emails notifying about account activities or updates | |
]. | |
Please respond in the following JSON format and restrict your response to the provided category labels: | |
{{ | |
"explanation": "string", | |
"category": "string" | |
}} | |
""" | |
prompt = f""" | |
<Email> | |
{email_content} | |
</Email> | |
""" | |
response = call_ollama_api(prompt + system_prompt) | |
print(response) | |
print(type(response)) | |
category = json.loads(response)['category'] | |
print(category) | |
return category if category in CATEGORY_LABELS else "Other" | |
except Exception as e: | |
logging.error(f"Error in Ollama categorization: {str(e)}") | |
return "Other" | |
def categorize_email(email_content: str) -> str: | |
"""Wrapper function to categorize email using the selected LLM service.""" | |
if LLM_SERVICE == "OpenAI": | |
return categorize_email_with_openai(email_content) | |
elif LLM_SERVICE == "Ollama": | |
return categorize_email_with_ollama(email_content) | |
else: | |
logging.error("Invalid LLM service specified.") | |
return "Other" | |
def get_email_content(gmail: Resource, email_id: str) -> str: | |
"""Retrieves the content of an email.""" | |
try: | |
message = gmail.users().messages().get(userId="me", id=email_id, format="full").execute() | |
headers = message["payload"]["headers"] | |
subject = next(h["value"] for h in headers if h["name"].lower() == "subject") | |
from_header = next(h["value"] for h in headers if h["name"].lower() == "from") | |
# Process the email body, considering both simple and multipart emails | |
body = '' | |
parts = message.get("payload", {}).get("parts", []) | |
if "data" in message["payload"]["body"]: | |
body = message["payload"]["body"]["data"] | |
else: | |
for part in parts: | |
if "data" in part["body"]: | |
body += base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8") | |
elif "parts" in part: # Nested parts in multipart emails | |
for subpart in part["parts"]: | |
if "data" in subpart["body"]: | |
body += base64.urlsafe_b64decode(subpart["body"]["data"]).decode("utf-8") | |
# If no 'data' is found in any parts, body remains empty | |
if not body: | |
logging.warning(f"No content found in the body of email {email_id}") | |
print(f"Subject: {subject}\nFrom: {from_header}\n") | |
return f"Subject: {subject}\nFrom: {from_header}\n\n{body}" | |
except HttpError as error: | |
logging.error(f"An error occurred while retrieving email {email_id}: {error}") | |
return "" | |
except Exception as e: | |
logging.error(f"Unexpected error when processing email {email_id}: {e}") | |
return "" | |
def add_labels_to_email(gmail: Resource, email_id: str, label_ids: List[str]): | |
"""Adds labels to a specific email.""" | |
if PREVIEW_MODE: | |
logging.info(f"Preview: Would add labels {label_ids} to email {email_id}") | |
return | |
try: | |
gmail.users().messages().modify( | |
userId="me", | |
id=email_id, | |
body={"addLabelIds": label_ids} | |
).execute() | |
logging.info(f"Labels added to email {email_id}") | |
except HttpError as error: | |
logging.error(f"An error occurred while adding labels to email {email_id}: {error}") | |
def remove_from_inbox(gmail: Resource, email_id: str): | |
"""Remove an email from the inbox.""" | |
try: | |
gmail.users().messages().modify( | |
userId='me', | |
id=email_id, | |
body={'removeLabelIds': ['INBOX']} | |
).execute() | |
logging.info(f"Email {email_id} has been removed from the inbox.") | |
except HttpError as error: | |
logging.error(f"Failed to remove email {email_id} from the inbox: {error}") | |
def main(): | |
gm = get_gmail_client() | |
last_run = get_last_run_time() | |
query = build_query(last_run) | |
mails = fetch_emails(gm, query) | |
if not mails: | |
logging.info("No new unread emails found since the last run.") | |
return | |
logging.info(f"Found {len(mails)} new unread emails to process.") | |
processed_label_id = get_or_create_label(gm, PROCESSED_LABEL) | |
category_label_ids = {label: get_or_create_label(gm, label) for label in CATEGORY_LABELS} | |
if not processed_label_id or not all(category_label_ids.values()): | |
logging.error("Failed to get or create labels. Aborting...") | |
return | |
for mail in mails: | |
email_content = get_email_content(gm, mail["id"]) | |
original_labels = gm.users().messages().get(userId="me", id=mail["id"], format="minimal").execute().get( | |
'labelIds', []) | |
store_email_state(mail["id"], original_labels) | |
category = categorize_email(email_content) | |
if category == 'Other': | |
logging.warning(f"Could not categorize email {mail['id']}.") | |
# rollback_email(mail["id"]) | |
continue | |
label_ids_to_add = [processed_label_id, category_label_ids[category]] | |
add_labels_to_email(gm, mail["id"], label_ids_to_add) | |
# Check if the email needs to be moved out of the inbox | |
if category in ["Marketing", "Newsletters", "Low quality"]: | |
remove_from_inbox(gm, mail["id"]) | |
logging.info(f"Processed email {mail['id']} and categorized as {category}") | |
logging.info(f"Processed {len(mails)} emails.") | |
update_last_run_time() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment