|
import datetime |
|
import requests |
|
import json |
|
from msal import PublicClientApplication |
|
|
|
# Replace with your client ID from Azure App Registration |
|
CLIENT_ID = "a9641399-2aad-43f8-aac1-5bcccbcc004b" # No need for client secret |
|
AUTHORITY = "https://login.microsoftonline.com/common" # Supports personal and work accounts |
|
SCOPES = ["https://graph.microsoft.com/Mail.Read"] |
|
|
|
# Microsoft Graph endpoints |
|
GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0" |
|
|
|
def get_access_token() -> str: |
|
""" |
|
Acquires a valid access token to make requests to the Microsoft Graph API. |
|
|
|
Returns: |
|
str: The access token obtained from Azure AD. |
|
""" |
|
# Create a public client application |
|
app = PublicClientApplication(CLIENT_ID, authority=AUTHORITY) |
|
|
|
# Try acquiring token silently (useful for token cache reuse) |
|
accounts = app.get_accounts() |
|
if accounts: |
|
result = app.acquire_token_silent(SCOPES, account=accounts[0]) |
|
else: |
|
result = None |
|
|
|
# If no token found, initiate interactive login |
|
if not result: |
|
print("No cached token found. Initiating interactive login...") |
|
result = app.acquire_token_interactive(scopes=SCOPES, timeout=60) |
|
|
|
if "access_token" in result: |
|
with open('auth.json', 'w', encoding='utf-8') as json_file: |
|
json.dump(result, json_file, indent=4) |
|
return result["access_token"] |
|
raise Exception(f"Failed to obtain access token: {result.get('error_description')}") |
|
|
|
def get_emails_today(access_token) -> list[str]: |
|
today = datetime.datetime.utcnow().date().isoformat() # Today's date in UTC |
|
query_params = { |
|
"$filter": f"receivedDateTime ge {today}T00:00:00Z", |
|
"$select": "subject", |
|
"$orderby": "receivedDateTime desc" |
|
} |
|
headers = {"Authorization": f"Bearer {access_token}"} |
|
response = requests.get(f"{GRAPH_API_ENDPOINT}/me/messages", headers=headers, params=query_params, timeout=30) |
|
|
|
if response.status_code == 200: |
|
messages = response.json().get("value", []) |
|
return [message["subject"] for message in messages if "subject" in message] |
|
raise Exception(f"Failed to fetch emails: {response.status_code} - {response.text}") |
|
|
|
|
|
def main(): |
|
access_token = get_access_token() |
|
emails = get_emails_today(access_token) |
|
print("Today's Emails:") |
|
for idx, email in enumerate(emails, start=1): |
|
print(f"{idx}: {email}") |
|
|
|
if __name__ == "__main__": |
|
main() |