Skip to content

Instantly share code, notes, and snippets.

@dmmulroy
Created May 27, 2025 23:48
Show Gist options
  • Save dmmulroy/c4b7e9d4c43e03c38782e4620304960f to your computer and use it in GitHub Desktop.
Save dmmulroy/c4b7e9d4c43e03c38782e4620304960f to your computer and use it in GitHub Desktop.
import { Effect, Layer, Stream } from "effect";
import { handleOntraportCreateContact } from "./handle-ontraport-create-contact";
import { handleProccesMetricsUpload } from "./handle-process-metrics-upload";
import { handleSendVerificationEmail } from "./handle-send-verification-email";
import { Message } from "./message";
import { Postgres } from "./postgres";
import { RetryPolicy } from "./retry-policy";
import { flatten } from "flat";
export type JobStatus = "new" | "locked" | "completed" | "error";
export type JobType =
| "process_metrics_upload"
| "ontraport_create_contact"
| "send_verification_email";
export type Job = Readonly<{
id: string;
type: JobType;
status: JobStatus;
payload: string;
completedAt: Date;
lockedAt: Date;
createdAt: Date;
updatdAt: Date;
}>;
const make = Effect.gen(function* () {
yield* Effect.logInfo("Starting JobQueue");
yield* Effect.addFinalizer(() => Effect.logInfo("JobQueue stopped"));
const postgres = yield* Postgres;
yield* Effect.forkScoped(
postgres.listen("job_queue_channel").pipe(
Stream.runForEach((jobId) =>
postgres.getJobById(jobId).pipe(
Effect.flatten,
Effect.andThen((job) => Message.decode(job.payload)),
Effect.andThen((message) => [jobId, message] as const),
Effect.andThen(handleMessage),
Effect.onError(Effect.logError),
Effect.withSpan("job_queue.job", {
attributes: {
"job.id": jobId,
},
}),
),
),
),
);
yield* Effect.logInfo("JobQueue started");
});
export const JobQueue = Layer.scopedDiscard(make).pipe(
Layer.annotateSpans({ module: "JobQueue" }),
);
export function handleMessage([jobId, message]: readonly [string, Message]) {
return Effect.gen(function* () {
yield* Effect.logInfo(`Handling job '${jobId}' of type '${message._tag}'`);
const retryPolicy = yield* RetryPolicy;
const postgres = yield* Postgres;
const withRetry = Effect.retry(retryPolicy);
const run = <A, E, R>(effect: Effect.Effect<A, E, R>) => {
return Effect.matchEffect(withRetry(effect), {
onFailure(cause) {
return Effect.gen(function* () {
yield* Effect.logError(
`An error occurred while handling job '${jobId}' of type '${message._tag}': ${cause}`,
);
yield* withRetry(postgres.setJobStatus(jobId, "error"));
return yield* Effect.fail(cause);
});
},
onSuccess(a) {
return Effect.gen(function* () {
yield* Effect.logInfo(
`Successfully handled job '${jobId}' of type '${message._tag}'`,
);
yield* withRetry(postgres.setJobStatus(jobId, "completed"));
return yield* Effect.succeed(a);
});
},
});
};
switch (message._tag) {
case "send_verification_email": {
return yield* Effect.forkScoped(
run(handleSendVerificationEmail(message)),
);
}
case "process_metrics_upload": {
return yield* Effect.forkScoped(
run(handleProccesMetricsUpload(message)),
);
}
case "ontraport_create_contact": {
return yield* Effect.forkScoped(
run(handleOntraportCreateContact(message)),
);
}
}
}).pipe(
Effect.withSpan("job_queue.handle_message", {
attributes: flatten({
"job.id": jobId,
message,
}),
}),
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment