Skip to content

Instantly share code, notes, and snippets.

@tomredman
Created August 13, 2025 23:11
Show Gist options
  • Save tomredman/e543d120a4fdb22fa24d48319b917316 to your computer and use it in GitHub Desktop.
Save tomredman/e543d120a4fdb22fa24d48319b917316 to your computer and use it in GitHub Desktop.
Sending batch emails with Convex & Resend
/**
* Builds an email payload for sending campaign emails through an email service provider.
*
* This function takes campaign details and recipient information to construct a complete
* email payload that includes:
* - Sender information (from name and email)
* - Recipient email
* - Subject line
* - Reply-to address
* - Unsubscribe URL (merged into both HTML and plain text content)
* - Email tracking headers and tags for analytics
* - Both HTML and plain text versions of the email content
*
* @param campaign - The campaign document containing email content and settings
* @param teamId - The ID of the team sending the campaign
* @param audience - The target audience information (Mailchimp list or segment)
* @param to - The recipient's email address
* @param contactId - The recipient's contact ID for unsubscribe tracking
*
* @returns A formatted email payload conforming to the email service provider's requirements
*/
export const buildEmailPayload = ({
to,
teamId,
campaign,
contactId,
generatedCampaign,
executionId,
overrides,
fubPersonId,
fubUserId,
}: {
to: string;
teamId: Id<"teams">;
contactId: Id<"fubContacts"> | string;
fubPersonId: string;
fubUserId: string;
campaign: Doc<"campaigns">;
generatedCampaign: Doc<"generatedCampaigns">;
executionId: Id<"campaignExecution">;
overrides?: {
from?: string;
subject?: string;
html?: string;
plainText?: string;
replyTo?: string;
};
}) => {
const emailContent = generatedCampaign.emailContent;
if (!emailContent) {
throw new Error("Campaign email content is undefined");
}
const unsubscribeUrl = getUnsubscribeUrl(contactId);
let mergedHtml = overrides?.html || emailContent.html;
let mergedPlainText = overrides?.plainText || emailContent.plainText;
const unsubscribeNeedles = [
"https://{{unsubscribe_url}}",
"http://{{unsubscribe_url}}",
"https://{{unsubscribe}}",
"http://{{unsubscribe}}",
"{{unsubscribe_url}}",
"{{unsubscribe}}",
];
let foundUnsubscribeUrl = false;
for (const needle of unsubscribeNeedles.sort((a, b) => b.length - a.length)) {
if (emailContent.html.includes(needle)) {
mergedHtml = mergedHtml.replace(needle, unsubscribeUrl);
foundUnsubscribeUrl = true;
break;
}
}
if (!foundUnsubscribeUrl) {
mergedHtml = mergedHtml.replace(
"</body>",
`<a style="color: #666; text-decoration: none; margin-top: 12px; font-size: 12px;" href="${unsubscribeUrl}">Unsubscribe</a></body>`
);
}
let foundUnsubscribeUrlInPlainText = false;
for (const needle of unsubscribeNeedles.sort((a, b) => b.length - a.length)) {
if (emailContent.plainText.includes(needle)) {
mergedPlainText = mergedPlainText.replace(needle, unsubscribeUrl);
foundUnsubscribeUrlInPlainText = true;
break;
}
}
if (!foundUnsubscribeUrlInPlainText) {
// Add the unsubscribe URL to the plain text
mergedPlainText = `${mergedPlainText}\n\nUnsubscribe: ${unsubscribeUrl}`;
}
const payload: SendEmailOptions = {
from: `${emailContent.fromName} <${emailContent.fromEmail}>`,
to: to,
subject: overrides?.subject || emailContent.subject,
replyTo: overrides?.replyTo
? [overrides.replyTo]
: emailContent.replyTo
? [emailContent.replyTo]
: undefined,
headers: [
{
name: "X-Team-ID",
value: teamId,
},
{
name: "X-App-ID",
value: "journey",
},
{
name: "X-Contact-ID",
value: contactId,
},
{
name: "X-Person-ID",
value: fubPersonId,
},
{
name: "X-Fub-User-ID",
value: fubUserId,
},
{
name: "X-Campaign-ID",
value: campaign._id,
},
{
name: "X-Execution-ID",
value: executionId,
},
{
name: "X-Entity-Ref-ID",
value: campaign._id,
},
{
name: "X-Payload-Type",
value: "live",
},
{
name: "X-Environment",
value: process.env.ENVIRONMENT!,
},
{
name: "List-Unsubscribe",
value: `<${unsubscribeUrl}>`,
},
],
html: mergedHtml,
text: mergedPlainText,
};
return payload;
};
import { Infer, v } from "convex/values";
import { internalQuery, MutationCtx, QueryCtx } from "../_generated/server";
import { Doc, Id } from "../_generated/dataModel";
import { paginationOptsValidator } from "convex/server";
import { getContactsForListFn } from "../fub/contacts/queries";
/**
* Retrieves the next batch of contacts for a campaign based on its audience settings.
* Supports Mailchimp lists and segments and FUB lists as audience sources.
*
* @param campaignId - The ID of the campaign
* @param audience - The audience configuration (list or segment)
* @param paginationOpts - Options for paginating through contacts
* @returns An object containing:
* - page: Array of contact documents
* - isDone: Boolean indicating if all contacts have been processed
* - continueCursor: Cursor for fetching the next batch
*/
export const getNextBatchOfContacts = async (
ctx: QueryCtx | MutationCtx,
args: {
teamId: Id<"teams">;
listId: Id<"fubLists">;
campaignId: Id<"campaigns">;
paginationOpts: Infer<typeof paginationOptsValidator>;
}
): Promise<{
page: Doc<"fubContacts">[];
isDone: boolean;
continueCursor: string | null;
}> => {
return await getContactsForListFn(ctx, {
listId: args.listId,
paginationOpts: args.paginationOpts,
});
};
export const getNextBatchOfContactsQuery = internalQuery({
args: {
teamId: v.id("teams"),
listId: v.id("fubLists"),
campaignId: v.id("campaigns"),
paginationOpts: paginationOptsValidator,
},
handler: async (ctx, args) => {
return await getNextBatchOfContacts(ctx, args);
},
});
import { Infer, v } from "convex/values";
import { SendEmailOptions } from "@convex-dev/resend";
import { systemGet } from "../utils/system";
import { Id } from "../_generated/dataModel";
import { internal } from "../_generated/api";
import { Doc } from "../_generated/dataModel";
import { paginationOptsValidator } from "convex/server";
import { getNextBatchOfContacts } from "./campaignContacts";
import { validateSendCampaignArgs } from "./campaignValidator";
import { internalMutation, MutationCtx } from "../_generated/server";
import { buildEmailPayload } from "../actionPlanGenerators/dealOfTheWeek/emailGeneratorCampaign";
import { createFubCampaignIfNotExists } from "../fub/em/mutations";
import { getOrThrow } from "../get";
import { isUniqueSendFn } from "../emails/mutations";
/**
* Main function that sends a campaign to a batch of contacts.
* This function:
* 1. Fetches a batch of contacts from the list
* 2. Creates email payloads for each contact
* 3. Schedules the emails to be sent via Resend
* 4. Either continues with the next batch or marks the campaign as complete
*
* Uses pagination to process large contact lists in manageable chunks.
*/
export const sendCampaignToContacts = internalMutation({
args: {
currentListIndex: v.number(),
executionId: v.id("campaignExecution"),
listIds: v.array(v.id("fubLists")),
pageNumber: v.number(), // only used for the idempotency key
paginationOpts: paginationOptsValidator,
totalSent: v.optional(v.number()),
warmMax: v.number(),
},
handler: async (ctx, args) => {
try {
const { execution, campaign, generatedCampaign } =
await validateSendCampaignArgs(ctx, args);
if (execution.status !== "sending") {
throw new Error(
`This campaign is not in sending status: ${execution._id} ${execution.status}`
);
}
const listId = args.listIds[args.currentListIndex];
const totalSent = args.totalSent || 0;
let numToFetch = args.paginationOpts.numItems;
/**
* Enforce warm-up limit to gradually increase sending volume.
* This prevents sending too many emails at once which could:
* - Trigger spam filters
* - Overwhelm recipient mail servers
* - Damage sender reputation
* The warmMax limit increases by 50% after each successful campaign.
*/
if (totalSent + numToFetch > args.warmMax) {
numToFetch = args.warmMax - totalSent;
}
const {
page: contacts,
isDone,
continueCursor,
} = await getNextBatchOfContacts(ctx, {
campaignId: execution.campaignId,
teamId: campaign.teamId,
listId: listId,
paginationOpts: {
numItems: numToFetch,
cursor: args.paginationOpts.cursor,
},
});
let startedSendingAt = execution.startedSendingAt;
if (execution.startedSendingAt === 0 || !execution.startedSendingAt) {
startedSendingAt = Date.now();
}
const payloads: SendEmailOptions[] = [];
/**
* Create a FollowUpBoss email campaign record if it doesn't exist.
* This ensures:
* - Proper tracking and analytics setup
* - Integration with CRM for lead management
* - Unified reporting across platforms
*/
await createFubCampaignIfNotExists(ctx, {
executionId: execution._id,
generatedCampaign: generatedCampaign,
campaign: campaign,
});
const user = await getOrThrow(ctx, campaign.createdBy);
/**
* Build personalized email payloads for each contact in the batch.
* Each payload includes:
* - Dynamic content based on campaign template
* - Contact-specific personalization
* - Tracking pixels and analytics
* - Unsubscribe links
*/
for (const contact of contacts) {
/**
* Check if the email has already been sent to this contact.
* If it has, we skip it.
*
* This could happen if a contact is part of multiple lists for the same agent.
*/
const isUniqueSend = await isUniqueSendFn(ctx, {
to: contact.emailAddress,
executionId: execution._id,
});
if (isUniqueSend) {
const payload = await buildEmailPayload({
campaign,
contactId: contact._id,
fubPersonId: contact.id.toString(),
fubUserId: user.fubUser?.id.toString() || "0",
teamId: campaign.teamId,
to: contact.emailAddress,
executionId: execution._id,
generatedCampaign: generatedCampaign,
});
payloads.push(payload);
}
}
/**
* Generate a unique idempotency key to prevent duplicate sends.
* Format: send/{executionId}/v{version}/page-{pageNumber}
* This ensures that retries don't result in duplicate emails.
*/
const idempotencyKey = `send/${execution._id}/v${execution.version}/page-${args.pageNumber}`;
/**
* Update execution status to track sending progress.
* This allows monitoring and ensures proper state management.
*/
await ctx.db.patch(execution._id, {
status: "sending",
startedSendingAt,
});
/**
* Send the batch of emails through the Resend service.
* Returns counts of successfully sent and skipped emails.
*/
const { sentInBatch, skippedInBatch } = await ctx.runMutation(
internal.resend.actions.sendBatchEmails,
{
payloads,
campaignId: campaign._id,
executionId: execution._id,
teamId: campaign.teamId,
idempotencyKey: idempotencyKey,
}
);
console.debug("sentInBatch", sentInBatch);
console.debug("skippedInBatch", skippedInBatch);
const updatedTotalSent = sentInBatch + totalSent;
/**
* Determine next action based on sending progress:
* - Continue if more contacts remain and under warm limit
* - Move to next list if current list is complete
* - Finish if all lists processed or warm limit reached
*/
const hitWarmMax = updatedTotalSent >= args.warmMax;
console.debug("hitWarmMax", hitWarmMax);
console.debug("isDone", isDone);
const scheduleDone = async () => {
await ctx.runMutation(
internal.automation.postOffice.scheduleFinishedSendingMutation,
{
executionId: execution._id,
totalSent: updatedTotalSent,
}
);
};
const scheduleContinue = async ({
currentListIndex,
listIds,
paginationOpts,
totalSent,
pageNumber,
warmMax,
}: {
currentListIndex: number;
listIds: Id<"fubLists">[];
paginationOpts: Infer<typeof paginationOptsValidator>;
totalSent: number;
pageNumber: number;
warmMax: number;
}) => {
await ctx.runMutation(
internal.automation.postOffice.scheduleContinueSendingMutation,
{
executionId: execution._id,
paginationOpts: paginationOpts,
totalSent,
pageNumber,
currentListIndex,
listIds,
warmMax,
}
);
};
/**
* Branching logic for campaign progression:
* 1. If warm limit reached → Complete campaign (protects sender reputation)
* 2. If current list done:
* a. If more lists exist → Process next list
* b. If no more lists → Complete campaign
* 3. If current list has more contacts → Continue with same list
*/
if (hitWarmMax) {
console.log("scheduleDone: HIT WARM MAX", {
paginationOpts: {
numItems: args.paginationOpts.numItems,
cursor: continueCursor,
},
totalSent: updatedTotalSent,
});
await scheduleDone();
} else if (isDone) {
const isLastList = args.currentListIndex === args.listIds.length - 1;
if (isLastList) {
/**
* No more lists!
*/
console.log("scheduleDone: LAST LIST", {
paginationOpts: {
numItems: args.paginationOpts.numItems,
cursor: continueCursor,
},
totalSent: updatedTotalSent,
});
await scheduleDone();
} else {
/**
* Next list!
*/
console.log("scheduleContinue: NEXT LIST", {
paginationOpts: {
numItems: args.paginationOpts.numItems,
cursor: null,
},
totalSent: updatedTotalSent,
});
await scheduleContinue({
paginationOpts: {
numItems: args.paginationOpts.numItems,
cursor: null,
},
totalSent: updatedTotalSent,
pageNumber: args.pageNumber + 1,
currentListIndex: args.currentListIndex + 1,
listIds: args.listIds,
warmMax: args.warmMax,
});
}
} else {
/**
* Continue with the current list.
*/
console.log("scheduleContinue: CONTINUE WITH CURRENT LIST", {
paginationOpts: {
numItems: args.paginationOpts.numItems,
cursor: continueCursor,
},
totalSent: updatedTotalSent,
});
await scheduleContinue({
paginationOpts: {
numItems: args.paginationOpts.numItems,
cursor: continueCursor,
},
totalSent: updatedTotalSent,
pageNumber: args.pageNumber + 1,
currentListIndex: args.currentListIndex,
listIds: args.listIds,
warmMax: args.warmMax,
});
}
} catch (error) {
console.error("Error sending campaign to contacts", error);
await ctx.runMutation(
internal.campaigns.executions.recordExecutionError,
{
executionId: args.executionId,
step: "emailSending",
message: `Sending failed: ${error}`,
}
);
throw error;
}
},
});
/**
* Wrapper mutation that schedules the continuation of email sending.
* This indirection is necessary to avoid stack overflow when recursively
* scheduling mutations.
*/
export const scheduleContinueSendingMutation = internalMutation({
args: {
currentListIndex: v.number(),
executionId: v.id("campaignExecution"),
listIds: v.array(v.id("fubLists")),
pageNumber: v.number(),
paginationOpts: paginationOptsValidator,
totalSent: v.number(),
warmMax: v.number(),
},
handler: async (ctx, args) => {
await handleContinueSendingFn(ctx, {
executionId: args.executionId,
pageNumber: args.pageNumber,
paginationOpts: args.paginationOpts,
totalSent: args.totalSent,
currentListIndex: args.currentListIndex,
listIds: args.listIds,
warmMax: args.warmMax,
});
},
});
/**
* Handles the continuation of email sending with proper rate limiting.
* This function:
* 1. Checks if the previous worker is still running
* 2. Waits if necessary to respect rate limits
* 3. Schedules the next batch of emails
*
* The recursive nature of this function (via sendCampaignToContacts)
* allows us to process large contact lists without overwhelming the system.
*/
const handleContinueSendingFn = async (
ctx: MutationCtx,
args: {
executionId: Id<"campaignExecution">;
paginationOpts: Infer<typeof paginationOptsValidator>;
totalSent: number;
pageNumber: number;
currentListIndex: number;
listIds: Id<"fubLists">[];
warmMax: number;
}
) => {
/**
* IMPORTANT: We must use scheduler.runAfter instead of runMutation to avoid:
* "Error: Cross component call depth limit exceeded. Do you have an infinite loop in your app?"
* The scheduler breaks the call stack and prevents this error.
*/
const {
executionId,
paginationOpts,
pageNumber,
totalSent,
currentListIndex,
listIds,
warmMax,
} = args;
/**
* Recursive batch processing loop:
* sendCampaignToContacts → scheduleContinueSendingMutation → handleContinueSendingFn
* ↑ ↓
* └──────────────────────────────────────────────┘
*
* This continues until:
* - All contacts have been processed, OR
* - We hit the warm-up limit (WARM_MAX)
*/
await ctx.scheduler.runAfter(
0,
internal.automation.postOffice.sendCampaignToContacts,
{
executionId: executionId,
paginationOpts: paginationOpts,
totalSent: totalSent,
pageNumber: pageNumber,
currentListIndex: currentListIndex,
listIds: listIds,
warmMax: warmMax,
}
);
};
export const scheduleFinishedSendingMutation = internalMutation({
args: {
executionId: v.id("campaignExecution"),
totalSent: v.number(),
},
handler: async (ctx, args) => {
await scheduleFinishedSendingFn(ctx, {
executionId: args.executionId,
totalSent: args.totalSent,
});
},
});
/**
* Schedules the completion of the email sending process.
* Similar to handleContinueSendingFn, this ensures the previous worker
* has completed before marking the execution as finished.
*/
const scheduleFinishedSendingFn = async (
ctx: MutationCtx,
args: {
executionId: Id<"campaignExecution">;
totalSent: number;
}
) => {
const { executionId, totalSent } = args;
await ctx.runMutation(
internal.automation.postOffice.handleFinishedSendingMutation,
{
executionId: executionId,
totalSent: totalSent,
}
);
};
export const handleFinishedSendingMutation = internalMutation({
args: {
executionId: v.id("campaignExecution"),
totalSent: v.number(),
},
handler: async (ctx, args) => {
await handleFinishedSendingFn(ctx, {
executionId: args.executionId,
totalSent: args.totalSent,
});
},
});
/**
* Finalizes the email sending process:
* 1. Updates the execution status to 'sent'
* 2. Records timing metrics
* 3. Schedules post-send follow-up tasks
*/
const handleFinishedSendingFn = async (
ctx: MutationCtx,
{
executionId,
totalSent,
}: { executionId: Id<"campaignExecution">; totalSent: number }
) => {
const execution = await ctx.db.get(executionId);
if (!execution) {
throw new Error("Execution not found");
}
const finishedSendingAt = Date.now();
const totalDurationSeconds =
(finishedSendingAt - execution.startedSendingAt) / 1000;
await ctx.db.patch(execution._id, {
status: "sent",
finishedSendingAt: finishedSendingAt,
totalDurationSeconds: totalDurationSeconds,
});
await schedulePostSendTasks(ctx, { executionId: execution._id });
console.log(
`[EXECUTION COMPLETE] ✅ Total emails sent for execution: ${execution._id} is ${totalSent}`
);
};
/**
* Schedules follow-up tasks after the campaign has been sent.
* These tasks handle:
* - Analytics collection at various intervals
* - Success notifications to the campaign creator
* - Performance monitoring for admins
*/
const schedulePostSendTasks = async (
ctx: MutationCtx,
{ executionId }: { executionId: Id<"campaignExecution"> }
) => {
/**
* Gradually increase the user's warm-up limit after successful campaign.
* This allows sending more emails in future campaigns while maintaining
* good sender reputation. The 50% increase strikes a balance between
* growth and safety.
*/
const execution = await getOrThrow(ctx, executionId);
const campaign = await getOrThrow(ctx, execution.campaignId);
const user = await getOrThrow(ctx, campaign.createdBy);
const currentWarmMax = user.warmMax || 50;
const newWarmMax = Math.round(currentWarmMax * 1.5);
/**
* Persist the increased warm limit for the user's next campaign
*/
await ctx.runMutation(internal.users.mutations.updateWarmMax, {
userId: user._id,
warmMax: newWarmMax,
});
/**
* Schedule analytics collection at strategic intervals:
* - 10 minutes: Initial engagement metrics (opens)
* - 1 hour: Early click-through rates
* - 4 hours: Meaningful engagement patterns
* - 24 hours: Complete performance report
*
* Admin-only tasks (10min, 1hr, 4hr) provide early insights
* for monitoring and intervention if needed.
*/
await ctx.scheduler.runAfter(
10 * 60 * 1000, // 10 minutes
internal.automation.campaignOrchestrator.processPostSendTasks,
{
executionId: executionId,
minutesAfterSending: 10,
includeCustomer: false,
}
);
await ctx.scheduler.runAfter(
60 * 60 * 1000, // 1 hour
internal.automation.campaignOrchestrator.processPostSendTasks,
{
executionId: executionId,
minutesAfterSending: 60,
includeCustomer: false,
}
);
await ctx.scheduler.runAfter(
4 * 60 * 60 * 1000, // 4 hours
internal.automation.campaignOrchestrator.processPostSendTasks,
{
executionId: executionId,
minutesAfterSending: 4 * 60,
includeCustomer: false,
}
);
/**
* Customer-facing summary report (24 hours):
* Comprehensive performance metrics including:
* - Total opens and unique open rate
* - Click-through rates by link
* - Device and email client breakdown
* - Geographic distribution of engagement
* - Recommendations for future campaigns
*/
await ctx.scheduler.runAfter(
24 * 60 * 60 * 1000, // 24 hours
internal.automation.campaignOrchestrator.processPostSendTasks,
{
executionId: executionId,
minutesAfterSending: 24 * 60,
includeCustomer: true,
}
);
};
export const sendBatchEmails = internalMutation({
args: {
payloads: v.array(v.any()),
campaignId: v.id("campaigns"),
executionId: v.id("campaignExecution"),
teamId: v.id("teams"),
idempotencyKey: v.string(),
},
handler: async (ctx, args) => {
if (process.env.NODE_ENV === "development") {
return await sendBatchEmailsDevelopment(ctx, args);
}
return await sendBatchEmailsProduction(ctx, args);
},
});
export const sendBatchEmailsProduction = async (
ctx: MutationCtx,
args: {
payloads: SendEmailOptions[];
teamId: Id<"teams">;
campaignId: Id<"campaigns">;
executionId: Id<"campaignExecution">;
idempotencyKey: string;
}
) => {
let totalSent = 0;
let totalSkipped = 0;
for (const payload of args.payloads) {
/**
* I use isUniqueSendFn() as a last ditch effort to ensure
* we don't send the same email twice. It's basically an
* idempotency key for the email.
*
* Resend offers idempotency keys, but they're only available
* on single API calls (so either a single email or one key for a batch of emails).
*/
const isUniqueSend = await isUniqueSendFn(ctx, {
to: payload.to,
executionId: args.executionId,
});
if (!isUniqueSend) {
totalSkipped++;
continue;
}
/**
* With Convex's Resend component, we can just load up
* the provided workpool and let it manage creating batches
* and efficiently shipping payloads to Resend.
*/
await resend.sendEmail(ctx, payload);
totalSent++;
/**
* I like to keep track of sent emails in the database.
* This is useful for analytics and for ensuring we don't
* send the same email twice.
*/
await upsertSentMailFn(ctx, {
teamId: args.teamId,
to: payload.to,
from: payload.from,
campaignId: args.campaignId,
executionId: args.executionId,
});
}
return {
sentInBatch: totalSent,
skippedInBatch: totalSkipped,
};
};
/**
* There are so many scale-related things to test,
* so I've added a development mode that allows us to
* send emails to Mailinator, an open throwaway email
* provider.
*
* Don't send anything sensitive or PII to Mailinator.
* It's all public.
*/
export const sendBatchEmailsDevelopment = async (
ctx: MutationCtx,
args: {
payloads: SendEmailOptions[];
campaignId: Id<"campaigns">;
teamId: Id<"teams">;
executionId: Id<"campaignExecution">;
idempotencyKey: string;
}
) => {
console.info("Sending batch emails in development mode to Mailinator");
const modifiedPayloads = args.payloads.map((payload) => ({
...payload,
to: `${args.campaignId}@mailinator.com`,
}));
for (const payload of modifiedPayloads) {
await resend.sendEmail(ctx, payload);
}
};
export async function isUniqueSendFn(
ctx: MutationCtx | QueryCtx,
args: {
to: string;
executionId: Id<"campaignExecution">;
}
): Promise<boolean> {
const existingEmail = await ctx.db
.query("sentMail")
.withIndex("by_to_and_execution", (q) =>
q.eq("to", args.to).eq("executionId", args.executionId)
)
.first();
if (existingEmail) {
return false;
}
return true;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment