Skip to content

Instantly share code, notes, and snippets.

@VldMrgnn
Last active September 7, 2024 23:59
Show Gist options
  • Save VldMrgnn/92fedef8a504b6fdebc51c680fb98136 to your computer and use it in GitHub Desktop.
Save VldMrgnn/92fedef8a504b6fdebc51c680fb98136 to your computer and use it in GitHub Desktop.
Implement a queue processing for server-side using Starfx
//this code is just for guidance.
import { each, main, on, resource, sleep } from "effection";
import {
createThunks,
keepAlive,
mdw,
parallel,
put,
run,
spawn,
take,
takeEvery,
takeLeading,
} from "starfx";
import { isErr } from "../helpers";
import type { ThunkCtx } from "starfx";
// imaginary task manager
type TMyTask = {
key: string;
url: URL;
req: RequestInit;
value: any;
abortController: AbortController;
};
const keyInProcess = new Set<string>();
const tasks = new Map<string, TMyTask>();
const theMainProcess = (task) =>
resource(function* (provide) {
const { key, url, req, abortController } = task;
const { signal } = abortController;
signal.addEventListener("abort", function () {
run(function* () {
// do more cleanup here
yield* sleep(0);
});
});
// do the work here
//...
keyInProcess.delete(key);
// some output
yield* provide({ key, status: "done" });
});
export const thunks = createThunks<ThunkCtx>();
thunks.use(mdw.err);
thunks.use(thunks.routes());
const parseMessageEvent = function* (event: MessageEvent) {
const { data } = event;
if (isErr(data)) {
return { error: data };
}
const { key, value } = data as TMyTask;
const url = new URL(value.url);
return { key, value, url };
};
const enqueue = thunks.create<TMyTask>(
"/thunks/enqueue",
{ supervisor: takeEvery }, /// this is a queue itself underneath
function* (ctx, next) {
const { key, value, url } = ctx.payload;
tasks.set(key, {
key,
value,
url,
req: {},
abortController: new AbortController(),
});
yield* next();
}
);
const dequeue = thunks.create<TMyTask>(
"/thunks/dequeue",
{ supervisor: takeEvery },
function* (ctx, next) {
const { key } = ctx.payload;
// if is in process ... then halt
if (keyInProcess.has(key)) {
tasks[key].abortController.abort("user request");
return;
}
tasks.delete(key);
yield* next();
}
);
const startProcess = thunks.create(
"/thunks/startProcess",
{ supervisor: takeLeading },
function* (ctx, next) {
for (let [key, task] of tasks) {
// add bottleneck here
// add guardians...
if (keyInProcess.has(key)) {
continue;
}
keyInProcess.add(key);
yield* spawn(() => theMainProcess(task));
}
yield* next();
}
);
function* taskManager() {
const dispatcher = function* () {
while (true) {
const action = yield* take("*");
const { type: actionType, payload } = action as {
type: string;
payload: TMyTask;
};
switch (actionType) {
case "enqueue":
yield* spawn(() => enqueue.run(payload));
break;
case "dequeue":
yield* spawn(() => dequeue.run(payload));
break;
case "process":
yield* spawn(() => startProcess.run());
break;
default:
console.log("actionType", actionType);
break;
}
}
}
yield* spawn(() =>
run(function* () {
const group = yield* parallel([thunks.bootup, dispatcher]);
yield* group;
})
);
// or incomming message from "express.js" server
// or websocket server
for (let event of yield* each(on(self, "message"))) {
const paramResult = yield* parseMessageEvent(event);
const { url } = paramResult.value;
yield* put({ type: url.pathname, payload: paramResult.value });
yield* each.next();
}
}
keepAlive([main(taskManager)]);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment