|
import { promises as fs } from 'fs'; |
|
import path from 'path'; |
|
import { authenticate } from '@google-cloud/local-auth'; |
|
import { google } from 'googleapis'; |
|
import { GoogleGenerativeAI, HarmCategory, HarmBlockThreshold } from "@google/generative-ai"; |
|
import { config } from 'dotenv'; |
|
import { subMonths, format } from 'date-fns'; |
|
import { parse } from 'csv-parse/sync'; // For reading the exclusion list |
|
|
|
config(); // Load .env file |
|
|
|
// --- Configuration --- |
|
const SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']; |
|
const TOKEN_PATH = path.join(process.cwd(), 'token.json'); |
|
const CREDENTIALS_PATH = path.join(process.cwd(), 'credentials.json'); |
|
const MONTHS_TO_SEARCH = 7; |
|
const GEMINI_API_KEY = process.env.GEMINI_API_KEY; |
|
// Choose your model - Gemma might require specific setup/API endpoints if not using standard Gemini API |
|
// const GEMINI_MODEL_NAME = "gemini-1.5-flash-latest"; // A good default choice for balance |
|
// const GEMINI_MODEL_NAME = "gemma-2"; // Check API compatibility/availability |
|
const GEMINI_MODEL_NAME = "gemini-2.0-flash"; // Often available and reliable |
|
const MAX_EMAILS_TO_PROCESS = 5000; // Limit total emails fetched from Gmail |
|
const OUTPUT_FILE = 'job_applications_aggregated.json'; // Final aggregated output |
|
const RAW_OUTPUT_FILE = 'job_applications_raw.jsonl'; // Incremental results (JSON Lines format) |
|
const EXCLUDED_SENDERS_FILE = 'excluded_senders.csv'; |
|
const LLM_BATCH_SIZE = 10; // Number of emails to send to LLM in one request (Adjust based on token limits/performance) |
|
const LLM_MAX_RETRIES = 3; // Max retries for LLM calls |
|
const LLM_RETRY_DELAY_MS = 5000; // Base delay between retries (milliseconds) |
|
|
|
let excludedSendersSet = new Set(); |
|
let processedMessageIds = new Set(); // Holds message IDs already processed and saved |
|
|
|
// --- Helper Functions --- |
|
|
|
const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms)); |
|
|
|
async function loadSavedCredentialsIfExist() { |
|
try { |
|
const content = await fs.readFile(TOKEN_PATH); |
|
const credentials = JSON.parse(content.toString()); |
|
return google.auth.fromJSON(credentials); |
|
} catch (err) { |
|
return null; |
|
} |
|
} |
|
|
|
async function saveCredentials(client) { |
|
try { |
|
const content = await fs.readFile(CREDENTIALS_PATH); |
|
const keys = JSON.parse(content.toString()); |
|
const key = keys.installed || keys.web; |
|
const payload = JSON.stringify({ |
|
type: 'authorized_user', |
|
client_id: key.client_id, |
|
client_secret: key.client_secret, |
|
refresh_token: client.credentials.refresh_token, |
|
}); |
|
await fs.writeFile(TOKEN_PATH, payload); |
|
console.log('Token saved to', TOKEN_PATH); |
|
} catch (error) { |
|
console.error("Error saving credentials:", error); |
|
} |
|
} |
|
|
|
async function authorize() { |
|
let client = await loadSavedCredentialsIfExist(); |
|
if (client) { |
|
return client; |
|
} |
|
try { |
|
client = await authenticate({ |
|
scopes: SCOPES, |
|
keyfilePath: CREDENTIALS_PATH, |
|
}); |
|
if (client.credentials) { |
|
await saveCredentials(client); |
|
} |
|
return client; |
|
} catch (error) { |
|
console.error("Authorization failed:", error); |
|
process.exit(1); // Exit if authorization fails |
|
} |
|
} |
|
|
|
async function getMessage(authClient, messageId) { |
|
const gmail = google.gmail({ version: 'v1', auth: authClient }); |
|
try { |
|
const res = await gmail.users.messages.get({ |
|
userId: 'me', |
|
id: messageId, |
|
format: 'full', |
|
}); |
|
return res.data; |
|
} catch (err) { |
|
// Handle common errors like 404 Not Found gracefully |
|
if (err.code === 404) { |
|
console.warn(`Warning: Message ID ${messageId} not found (skipped).`); |
|
return null; |
|
} |
|
console.error(`Error fetching message ID ${messageId}:`, err.message); |
|
// Consider adding retry logic here too for transient network errors if needed |
|
return null; |
|
} |
|
} |
|
|
|
async function loadExclusionList() { |
|
try { |
|
const csvContent = await fs.readFile(EXCLUDED_SENDERS_FILE, 'utf-8'); |
|
const records = parse(csvContent, { |
|
columns: ['SenderOrDomain'], |
|
skip_empty_lines: true, |
|
comment: '#', |
|
trim: true, |
|
from_line: 2, |
|
}); |
|
records.forEach(record => { |
|
if (record.SenderOrDomain) { |
|
excludedSendersSet.add(record.SenderOrDomain.toLowerCase()); |
|
} |
|
}); |
|
console.log(`Loaded ${excludedSendersSet.size} sender/domain exclusion rules from ${EXCLUDED_SENDERS_FILE}.`); |
|
} catch (err) { |
|
if (err.code === 'ENOENT') { |
|
console.warn(`${EXCLUDED_SENDERS_FILE} not found. No senders will be pre-filtered.`); |
|
} else { |
|
console.error(`Error loading exclusion list from ${EXCLUDED_SENDERS_FILE}:`, err); |
|
} |
|
} |
|
} |
|
|
|
async function loadProcessedMessageIds() { |
|
try { |
|
const fileContent = await fs.readFile(RAW_OUTPUT_FILE, 'utf-8'); |
|
const lines = fileContent.split('\n'); |
|
lines.forEach(line => { |
|
if (line.trim()) { |
|
try { |
|
const data = JSON.parse(line); |
|
if (data.messageId) { |
|
processedMessageIds.add(data.messageId); |
|
} |
|
} catch (parseErr) { |
|
console.warn(`Skipping invalid line in ${RAW_OUTPUT_FILE}: ${line.substring(0, 50)}...`); |
|
} |
|
} |
|
}); |
|
console.log(`Loaded ${processedMessageIds.size} previously processed message IDs from ${RAW_OUTPUT_FILE}.`); |
|
} catch (err) { |
|
if (err.code === 'ENOENT') { |
|
console.log(`${RAW_OUTPUT_FILE} not found. Starting fresh analysis.`); |
|
} else { |
|
console.error(`Error loading processed message IDs from ${RAW_OUTPUT_FILE}:`, err); |
|
} |
|
} |
|
} |
|
|
|
/** |
|
* Appends a single result object to the raw JSON Lines output file. |
|
* @param {object} resultData The job data object to save. |
|
*/ |
|
async function appendRawResult(resultData) { |
|
try { |
|
const line = JSON.stringify(resultData) + '\n'; |
|
await fs.appendFile(RAW_OUTPUT_FILE, line); |
|
processedMessageIds.add(resultData.messageId); // Add to set immediately after successful save |
|
} catch (err) { |
|
console.error(`Error appending result for message ${resultData.messageId} to ${RAW_OUTPUT_FILE}:`, err); |
|
} |
|
} |
|
|
|
|
|
function getDomainFromEmail(email) { |
|
if (!email || typeof email !== 'string') return null; |
|
const emailMatch = email.match(/<?([^>]+@[^>]+)>?/); |
|
const actualEmail = emailMatch ? emailMatch[1] : email; |
|
const atIndex = actualEmail.lastIndexOf('@'); |
|
if (atIndex === -1 || atIndex === actualEmail.length - 1) { |
|
if (actualEmail.startsWith('@')) { |
|
return actualEmail.substring(1).toLowerCase(); |
|
} |
|
return null; |
|
} |
|
return actualEmail.substring(atIndex + 1).toLowerCase(); |
|
} |
|
|
|
function base64UrlDecode(input) { |
|
if (!input) return ''; |
|
let base64 = input.replace(/-/g, '+').replace(/_/g, '/'); |
|
while (base64.length % 4) { |
|
base64 += '='; |
|
} |
|
try { |
|
return Buffer.from(base64, 'base64').toString('utf-8'); |
|
} catch (e) { |
|
console.warn("Base64 decoding failed for a part."); |
|
return ''; // Return empty string on decode error |
|
} |
|
} |
|
|
|
function extractEmailBody(payload) { |
|
let body = ''; |
|
if (!payload) return body; |
|
|
|
if (payload.mimeType === 'text/plain' && payload.body?.data) { |
|
body = base64UrlDecode(payload.body.data); |
|
} else if (payload.mimeType === 'text/html' && payload.body?.data) { |
|
const htmlContent = base64UrlDecode(payload.body.data); |
|
body = htmlContent |
|
.replace(/<style([\s\S]*?)<\/style>/gi, '') |
|
.replace(/<script([\s\S]*?)<\/script>/gi, '') |
|
.replace(/<[^>]*>/g, ' ') |
|
.replace(/(\r\n|\n|\r)/gm, " ") // Replace newlines with spaces |
|
.replace(/\s+/g, ' ') |
|
.trim(); |
|
} else if (payload.parts) { |
|
let plainPart = payload.parts.find(part => part.mimeType === 'text/plain'); |
|
if (plainPart?.body?.data) { |
|
body = base64UrlDecode(plainPart.body.data); |
|
} else { |
|
// If no plain text, try finding the first html part |
|
let htmlPart = payload.parts.find(part => part.mimeType === 'text/html'); |
|
if (htmlPart) { |
|
body = extractEmailBody(htmlPart); // Recurse with the HTML part |
|
} else { |
|
// If still no body, recurse through all parts (might find nested text/plain) |
|
for (const part of payload.parts) { |
|
body = extractEmailBody(part); |
|
if (body) break; |
|
} |
|
} |
|
} |
|
} |
|
// Limit body size per email to avoid overly large LLM requests for the batch |
|
return body.substring(0, 8000); // Reduced limit per email for batching |
|
} |
|
|
|
function extractHeaders(headers, names) { |
|
const result = {}; |
|
if (!headers) return result; |
|
const lowerCaseNames = names.map(n => n.toLowerCase()); |
|
headers.forEach(header => { |
|
if (header.name && header.value) { |
|
const headerNameLower = header.name.toLowerCase(); |
|
const index = lowerCaseNames.indexOf(headerNameLower); |
|
if (index !== -1) { |
|
result[names[index]] = header.value; |
|
} |
|
} |
|
}); |
|
return result; |
|
} |
|
|
|
/** |
|
* Analyzes a BATCH of emails using Gemini. Implements retries. |
|
* @param {Array<object>} emailBatch - Array of objects { messageId, subject, body, sender, date } |
|
* @returns {Promise<Array<object>|null>} Array of extracted data objects or null if fatal error. |
|
*/ |
|
async function analyzeEmailBatchWithLLM(emailBatch) { |
|
if (!GEMINI_API_KEY) { |
|
console.error("Error: GEMINI_API_KEY is not set in .env file."); |
|
return null; |
|
} |
|
if (!emailBatch || emailBatch.length === 0) { |
|
return []; |
|
} |
|
|
|
const genAI = new GoogleGenerativeAI(GEMINI_API_KEY); |
|
// Ensure the chosen model is compatible with the API key/project setup |
|
const model = genAI.getGenerativeModel({ model: GEMINI_MODEL_NAME }); |
|
|
|
const generationConfig = { |
|
temperature: 0.2, |
|
topK: 1, |
|
topP: 1, |
|
maxOutputTokens: 4096, // Increase if needed for larger batches, watch model limits |
|
}; |
|
const safetySettings = [ |
|
{ category: HarmCategory.HARM_CATEGORY_HARASSMENT, threshold: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE }, |
|
{ category: HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE }, |
|
{ category: HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE }, |
|
{ category: HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE }, |
|
]; |
|
|
|
// Prepare input for the prompt |
|
const emailInputsString = emailBatch.map((email, index) => ` |
|
--- Email ${index + 1} (messageId: ${email.messageId}) --- |
|
Sender: ${email.sender} |
|
Date: ${email.date} |
|
Subject: ${email.subject} |
|
Body: ${email.body.substring(0, 7000)} |
|
--- End Email ${index + 1} --- |
|
`).join('\n'); |
|
|
|
const prompt = ` |
|
Analyze the following batch of emails. For EACH email provided, determine if it relates to a job application I sent out. |
|
If it IS related to a job application (confirmation, rejection, interview, assessment, offer, etc.), extract the details. |
|
If it is NOT related (newsletter, spam, personal, general job alert), indicate that. |
|
|
|
**Input Emails Batch:** |
|
${emailInputsString} |
|
|
|
**Instructions:** |
|
Respond ONLY with a single JSON array. Each element in the array MUST be a JSON object corresponding to one input email, in the SAME ORDER as the input. |
|
Each JSON object MUST include the original "messageId" provided for that email. |
|
Each JSON object MUST include the following fields: |
|
- messageId: string (The messageId from the input) |
|
- isJobRelated: boolean |
|
- companyName: string or null |
|
- positionTitle: string or null |
|
- applicationStage: string (One of: "Applied", "Assessment", "Screening", "Interview Scheduled", "Interview Followup", "Offer", "Rejection", "Withdrawal Confirmation", "On Hold", "Keep In Touch", "Informational", "Unknown") or null |
|
- stageDate: string (YYYY-MM-DD) or null |
|
- emailDate: string (YYYY-MM-DD from input) |
|
- notes: string or null |
|
|
|
**Example JSON Object (Job Related):** |
|
{ |
|
"messageId": "...", |
|
"isJobRelated": true, |
|
"companyName": "Example Corp", |
|
"positionTitle": "Software Engineer", |
|
"applicationStage": "Assessment", |
|
"stageDate": "2024-05-15", |
|
"emailDate": "2024-05-10", |
|
"notes": "Received coding challenge link." |
|
} |
|
|
|
**Example JSON Object (Not Job Related):** |
|
{ |
|
"messageId": "...", |
|
"isJobRelated": false, |
|
"companyName": null, |
|
"positionTitle": null, |
|
"applicationStage": null, |
|
"stageDate": null, |
|
"emailDate": "2024-05-09", |
|
"notes": null |
|
} |
|
|
|
**Your JSON Array Response:** |
|
`; |
|
|
|
let retries = 0; |
|
while (retries <= LLM_MAX_RETRIES) { |
|
try { |
|
console.log(` Attempting LLM analysis for batch of ${emailBatch.length} emails (Attempt ${retries + 1}/${LLM_MAX_RETRIES + 1})`); |
|
const result = await model.generateContent(prompt); |
|
const response = result.response; |
|
const responseText = response.text() |
|
.replace(/^```json\s*/, '') |
|
.replace(/\s*```$/, '') |
|
.trim(); // Trim whitespace which can sometimes cause issues |
|
|
|
// console.log("LLM Raw Response:", responseText); // Debugging |
|
|
|
// Validate if it's likely JSON before parsing |
|
if (!responseText.startsWith('[') || !responseText.endsWith(']')) { |
|
console.error("LLM response does not appear to be a JSON array:", responseText.substring(0, 100) + "..."); |
|
throw new Error("Invalid JSON array format from LLM"); // Throw to trigger retry or failure |
|
} |
|
|
|
const parsedDataArray = JSON.parse(responseText); |
|
|
|
if (!Array.isArray(parsedDataArray)) { |
|
console.error("LLM response parsed, but is not an array:", parsedDataArray); |
|
throw new Error("LLM response is not an array"); |
|
} |
|
|
|
// Basic validation of results |
|
if (parsedDataArray.length !== emailBatch.length) { |
|
console.warn(`LLM returned ${parsedDataArray.length} results for a batch of ${emailBatch.length}. Will try to match by messageId.`); |
|
// Continue processing, matching will handle discrepancies |
|
} |
|
|
|
// Add sender/subject back for context if needed later (optional here as raw saving is primary) |
|
parsedDataArray.forEach(item => { |
|
const originalEmail = emailBatch.find(e => e.messageId === item.messageId); |
|
if (originalEmail) { |
|
item.sender = originalEmail.sender; |
|
item.emailSubject = originalEmail.subject; |
|
} |
|
}); |
|
|
|
|
|
return parsedDataArray; // Success |
|
|
|
} catch (error) { |
|
console.error(`Error calling Gemini API or parsing batch response (Attempt ${retries + 1}):`, error.message); |
|
|
|
// Check for specific retryable errors (e.g., rate limits, server errors) |
|
// Gemini API might throw errors with specific status codes or messages |
|
const isRetryable = error.message?.includes('429') || // Too Many Requests |
|
error.message?.includes('500') || // Internal Server Error |
|
error.message?.includes('503') || // Service Unavailable |
|
error.message?.includes('timed out') || |
|
error.message?.includes('network error') || |
|
error.message?.includes('Invalid JSON array format'); // Retry parsing issues too |
|
|
|
if (isRetryable && retries < LLM_MAX_RETRIES) { |
|
retries++; |
|
const delayTime = LLM_RETRY_DELAY_MS * Math.pow(2, retries - 1); // Exponential backoff |
|
console.log(` Retrying in ${delayTime / 1000} seconds...`); |
|
await delay(delayTime); |
|
} else { |
|
console.error(` LLM analysis failed permanently for batch after ${retries} retries.`); |
|
if (error.response && error.response.promptFeedback) { |
|
console.error(" Safety Feedback:", error.response.promptFeedback); |
|
} |
|
return null; // Indicate permanent failure for this batch |
|
} |
|
} |
|
} |
|
return null; // Should not be reached if retries are handled correctly, but acts as a safeguard |
|
} |
|
|
|
async function listMessages(authClient, query) { |
|
const gmail = google.gmail({ version: 'v1', auth: authClient }); |
|
let messages = []; |
|
let pageToken = undefined; |
|
let count = 0; |
|
let apiCallCount = 0; |
|
const MAX_API_CALLS = 100; // Safety break for pagination loops |
|
|
|
console.log(`Fetching emails with query: ${query}`); |
|
|
|
try { |
|
do { |
|
apiCallCount++; |
|
if (apiCallCount > MAX_API_CALLS) { |
|
console.warn("Stopping message list due to reaching max API call limit."); |
|
break; |
|
} |
|
const res = await gmail.users.messages.list({ |
|
userId: 'me', |
|
q: query, |
|
maxResults: 100, |
|
pageToken: pageToken, |
|
}); |
|
|
|
if (res.data.messages) { |
|
// Filter out already processed messages *before* adding to the list |
|
const newMessages = res.data.messages.filter(m => !processedMessageIds.has(m.id)); |
|
if(newMessages.length < res.data.messages.length){ |
|
console.log(` (Skipped ${res.data.messages.length - newMessages.length} already processed messages in this page)`); |
|
} |
|
messages = messages.concat(newMessages); |
|
count += newMessages.length; |
|
console.log(`Fetched ${count} new message IDs... (Total checked: approx ${count + processedMessageIds.size})`); |
|
} |
|
|
|
pageToken = res.data.nextPageToken; |
|
|
|
if (count >= MAX_EMAILS_TO_PROCESS) { // Check against NEW messages fetched |
|
console.warn(`Reached MAX_EMAILS_TO_PROCESS limit (${MAX_EMAILS_TO_PROCESS}) for new emails. Stopping fetch.`); |
|
pageToken = undefined; |
|
} |
|
// Add a small delay between pages to be nicer to the API |
|
if(pageToken) await delay(200); |
|
|
|
} while (pageToken); |
|
|
|
console.log(`Total new message IDs to process: ${messages.length}`); |
|
return messages.slice(0, MAX_EMAILS_TO_PROCESS); |
|
|
|
} catch (err) { |
|
console.error('The API returned an error while listing messages: ' + err); |
|
return []; |
|
} |
|
} |
|
|
|
// --- Main Execution --- |
|
async function main() { |
|
console.log("Starting Gmail Job Application Tracker..."); |
|
|
|
if (!GEMINI_API_KEY) { |
|
console.error("FATAL: GEMINI_API_KEY is not set in the .env file. Exiting."); |
|
process.exit(1); |
|
} |
|
|
|
// 0. Load Exclusions and Previously Processed IDs |
|
await loadExclusionList(); |
|
await loadProcessedMessageIds(); // Load IDs from RAW_OUTPUT_FILE |
|
|
|
// 1. Authorize |
|
const auth = await authorize(); |
|
console.log("Authorization successful."); |
|
|
|
// 2. Calculate Date Range & Build Query |
|
const startDate = subMonths(new Date(), MONTHS_TO_SEARCH); |
|
const formattedStartDate = format(startDate, 'yyyy/MM/dd'); |
|
// Refined query - remove category filter if it excludes too much, add quotes for phrases |
|
const query = `in:inbox to:me after:${formattedStartDate} AND (job OR application OR apply OR applied OR interview OR career OR hiring OR assessment OR recruit OR offer OR position OR "next steps") -category:(promotions OR social)`; |
|
// const query = `in:inbox to:me after:${formattedStartDate}`; // Simpler query if needed |
|
|
|
// 3. List Messages (Now filters processed IDs internally) |
|
const messageList = await listMessages(auth, query); |
|
if (!messageList || messageList.length === 0) { |
|
console.log("No new messages found matching the criteria to process."); |
|
// Optional: Proceed to aggregation if you want to re-aggregate existing raw data |
|
// await aggregateAndSaveResults(); // Call aggregation function here if needed |
|
return; |
|
} |
|
console.log(`Found ${messageList.length} new emails to analyze.`); |
|
|
|
// 4. Fetch, Prepare Batches, and Analyze Emails |
|
let currentBatch = []; |
|
let processedCount = 0; |
|
let skippedExcludedCount = 0; |
|
let failedFetchCount = 0; |
|
let llmAnalyzedCount = 0; |
|
let llmJobRelatedCount = 0; |
|
|
|
for (let i = 0; i < messageList.length; i++) { |
|
const messageMeta = messageList[i]; |
|
processedCount++; |
|
console.log(`Processing new email ${processedCount}/${messageList.length} (ID: ${messageMeta.id})`); |
|
|
|
// Check again here just in case (though listMessages should handle it) |
|
if (processedMessageIds.has(messageMeta.id)) { |
|
console.log(` -> Skipping already processed message ID: ${messageMeta.id}`); |
|
continue; |
|
} |
|
|
|
// Fetch message details |
|
const message = await getMessage(auth, messageMeta.id); |
|
if (!message || !message.payload) { |
|
console.log(` -> Skipping message ${messageMeta.id} due to fetch error or no payload.`); |
|
failedFetchCount++; |
|
continue; |
|
} |
|
|
|
// Extract headers for exclusion check |
|
const headers = extractHeaders(message.payload.headers, ['Subject', 'From', 'Date']); |
|
const sender = headers['From'] || ''; |
|
|
|
// Exclusion Check Logic |
|
let isExcluded = false; |
|
if (sender && excludedSendersSet.size > 0) { |
|
const senderLower = sender.toLowerCase(); |
|
const senderDomain = getDomainFromEmail(sender); |
|
if (excludedSendersSet.has(senderLower) || |
|
(senderDomain && excludedSendersSet.has(`@${senderDomain}`)) || |
|
(sender.match(/<([^>]+)>/) && excludedSendersSet.has(sender.match(/<([^>]+)>/)[1].toLowerCase()))) |
|
{ |
|
isExcluded = true; |
|
} |
|
} |
|
|
|
if (isExcluded) { |
|
console.log(` -> Skipping email from excluded sender: ${sender}`); |
|
skippedExcludedCount++; |
|
// Add to processed list so we don't try again next time, even though not analyzed |
|
processedMessageIds.add(messageMeta.id); // Track excluded as "processed" to prevent refetching |
|
continue; |
|
} |
|
|
|
// Extract remaining data for LLM |
|
const subject = headers['Subject'] || ''; |
|
const dateReceived = headers['Date'] ? format(new Date(headers['Date']), 'yyyy-MM-dd') : 'Unknown Date'; |
|
const body = extractEmailBody(message.payload); |
|
|
|
// Add to batch |
|
currentBatch.push({ |
|
messageId: messageMeta.id, |
|
threadId: messageMeta.threadId, // Keep threadId |
|
subject, |
|
body, |
|
sender, |
|
date: dateReceived |
|
}); |
|
|
|
// If batch is full or it's the last message, process the batch |
|
if (currentBatch.length >= LLM_BATCH_SIZE || i === messageList.length - 1) { |
|
console.log(`\n--- Processing Batch of ${currentBatch.length} emails ---`); |
|
const batchResults = await analyzeEmailBatchWithLLM(currentBatch); |
|
|
|
if (batchResults) { |
|
console.log(`--- LLM analysis complete for batch. Received ${batchResults.length} results. ---`); |
|
// Process results and save incrementally |
|
for (const result of batchResults) { |
|
// Find the original email data (in case order changed or items missing) |
|
const originalEmail = currentBatch.find(e => e.messageId === result.messageId); |
|
if (!originalEmail) { |
|
console.warn(`LLM result for unknown messageId ${result.messageId} in batch, skipping.`); |
|
continue; |
|
} |
|
|
|
llmAnalyzedCount++; // Count emails analyzed by LLM |
|
|
|
if (result.isJobRelated) { |
|
llmJobRelatedCount++; |
|
// Basic validation |
|
result.companyName = result.companyName || "Unknown"; |
|
result.positionTitle = result.positionTitle || "Unknown"; |
|
result.applicationStage = result.applicationStage || "Unknown"; |
|
result.threadId = originalEmail.threadId; // Ensure threadId is added |
|
|
|
console.log(` -> Job Related: ${result.companyName} - ${result.positionTitle} (${result.applicationStage}) (ID: ${result.messageId})`); |
|
await appendRawResult(result); // Save immediately |
|
} else { |
|
// console.log(` -> Not Job Related (ID: ${result.messageId})`); |
|
// Mark as processed even if not job related to avoid re-analysis |
|
processedMessageIds.add(result.messageId); |
|
} |
|
} |
|
} else { |
|
console.error(`--- LLM analysis FAILED for batch. Skipping ${currentBatch.length} emails in this batch. ---`); |
|
// Optionally: Mark these message IDs as needing reprocessing later? For now, just skip. |
|
} |
|
currentBatch = []; // Reset batch |
|
console.log("--- Batch processing finished ---\n"); |
|
} |
|
// Optional delay between fetching individual messages if needed |
|
// await delay(100); |
|
} |
|
|
|
console.log("\n--- Analysis Summary ---"); |
|
console.log(`Total new emails matching query: ${messageList.length}`); |
|
console.log(`Emails skipped (already processed): approx ${processedMessageIds.size - llmAnalyzedCount - skippedExcludedCount}`); // Rough estimate |
|
console.log(`Emails skipped (fetch error): ${failedFetchCount}`); |
|
console.log(`Emails skipped (exclusion list): ${skippedExcludedCount}`); |
|
console.log(`Emails sent for LLM analysis: ${llmAnalyzedCount}`); |
|
console.log(`Emails identified as Job Related: ${llmJobRelatedCount}`); |
|
console.log(`Total results saved incrementally in: ${RAW_OUTPUT_FILE}`); |
|
|
|
|
|
// 5. Aggregate all results from the RAW file for final output |
|
await aggregateAndSaveResults(); |
|
|
|
console.log("\nScript finished."); |
|
} |
|
|
|
/** |
|
* Reads all data from the raw JSONL file and performs aggregation. |
|
*/ |
|
async function aggregateAndSaveResults() { |
|
console.log(`\nAggregating results from ${RAW_OUTPUT_FILE}...`); |
|
const allJobData = []; |
|
try { |
|
const fileContent = await fs.readFile(RAW_OUTPUT_FILE, 'utf-8'); |
|
const lines = fileContent.split('\n'); |
|
lines.forEach(line => { |
|
if (line.trim()) { |
|
try { |
|
const data = JSON.parse(line); |
|
// Ensure it's a valid job entry before adding to aggregation list |
|
if (data.isJobRelated && data.messageId) { |
|
allJobData.push(data); |
|
} |
|
} catch (parseErr) { |
|
// Ignore lines that cannot be parsed during aggregation |
|
} |
|
} |
|
}); |
|
} catch (err) { |
|
if (err.code === 'ENOENT') { |
|
console.log(`No raw data file (${RAW_OUTPUT_FILE}) found to aggregate. Nothing to save.`); |
|
return; |
|
} else { |
|
console.error(`Error reading raw data file ${RAW_OUTPUT_FILE} for aggregation:`, err); |
|
return; |
|
} |
|
} |
|
|
|
if (allJobData.length === 0) { |
|
console.log("No job-related data found in the raw file to aggregate."); |
|
return; |
|
} |
|
|
|
console.log(`Aggregating ${allJobData.length} job entries...`); |
|
|
|
// Group by Company + Position (case-insensitive) to find the latest status |
|
const aggregatedResults = {}; |
|
allJobData.sort((a, b) => new Date(a.emailDate) - new Date(b.emailDate)); // Sort by email date ASC |
|
|
|
for (const entry of allJobData) { |
|
const key = `${entry.companyName?.toLowerCase()?.trim() || 'unknown'}-${entry.positionTitle?.toLowerCase()?.trim() || 'unknown'}`; |
|
if (!aggregatedResults[key]) { |
|
aggregatedResults[key] = { |
|
companyName: entry.companyName, |
|
positionTitle: entry.positionTitle, |
|
stages: [], |
|
firstSeen: entry.emailDate, |
|
lastSeen: entry.emailDate, |
|
latestStage: entry.applicationStage, |
|
latestStageDate: entry.stageDate || entry.emailDate, // Fallback stage date |
|
relatedEmailCount: 0, |
|
threadIds: new Set(), // Track unique thread IDs associated |
|
}; |
|
} |
|
// Update latest info based on emailDate |
|
if(new Date(entry.emailDate) >= new Date(aggregatedResults[key].lastSeen)) { |
|
aggregatedResults[key].lastSeen = entry.emailDate; |
|
aggregatedResults[key].latestStage = entry.applicationStage; |
|
// Prefer specific stageDate if available and makes sense, otherwise use emailDate |
|
aggregatedResults[key].latestStageDate = entry.stageDate && new Date(entry.stageDate) >= new Date(aggregatedResults[key].firstSeen) ? entry.stageDate : entry.emailDate; |
|
} |
|
|
|
aggregatedResults[key].relatedEmailCount++; |
|
if (entry.threadId) { |
|
aggregatedResults[key].threadIds.add(entry.threadId); |
|
} |
|
aggregatedResults[key].stages.push({ |
|
stage: entry.applicationStage, |
|
stageDate: entry.stageDate || entry.emailDate, // Fallback stage date |
|
emailDate: entry.emailDate, |
|
notes: entry.notes, |
|
subject: entry.emailSubject, // Ensure subject is carried over if needed |
|
messageId: entry.messageId |
|
}); |
|
} |
|
|
|
// Convert Set back to Array for JSON output |
|
const finalOutput = Object.values(aggregatedResults).map(item => ({ |
|
...item, |
|
threadIds: Array.from(item.threadIds) |
|
})); |
|
|
|
// Sort final output, e.g., by last seen date DESC |
|
finalOutput.sort((a,b) => new Date(b.lastSeen) - new Date(a.lastSeen)); |
|
|
|
// Save Aggregated Results |
|
try { |
|
await fs.writeFile(OUTPUT_FILE, JSON.stringify(finalOutput, null, 2)); |
|
console.log(`\nAggregated results saved to ${OUTPUT_FILE}`); |
|
} catch (err) { |
|
console.error("Error saving aggregated results to file:", err); |
|
console.log("\n--- Aggregated Results (JSON) ---"); |
|
console.log(JSON.stringify(finalOutput, null, 2)); // Log to console as fallback |
|
} |
|
} |
|
|
|
|
|
// --- Run the main function --- |
|
main().catch(error => { |
|
console.error("An uncaught error occurred in the main execution:", error); |
|
process.exit(1); // Exit with error code on unhandled rejection |
|
}); |