Skip to content

Instantly share code, notes, and snippets.

@VldMrgnn
Created January 18, 2025 04:55
Show Gist options
  • Save VldMrgnn/954b79e6d334d2ae8b11339d3a2b5998 to your computer and use it in GitHub Desktop.
Save VldMrgnn/954b79e6d334d2ae8b11339d3a2b5998 to your computer and use it in GitHub Desktop.
Starfx Middleware for fetching data in main thread from the backend through the worker.
/*
* Showcase Example: Fetching Data via a Worker
*
* In this example, data is stored and fetched from a worker.
* It mimics the classic fetch behavior but instead passes the request through a worker.
* At request time, you can enhance the logic (e.g., add additional hashes)
* and at response time, perform parsing or other heavy-lifting operations.
*/
import { nanoid } from 'nanoid';
import { ApiCtx, Next, Ok, Result, safe } from 'starfx';
import { isOk } from '../helpers';
// this is part of @effection-contrib/websocket package but not exported as standalone
import { withResolvers } from '../helpers/with-resolvers';
//my cache validation worker
import { getVacacheWorker } from '../workers/worker-factory-vacache';
export type WApiCtx = ApiCtx & { workerPayload?: any };
export /**
* This middleware sends a fetch-like request to the worker.
* The worker will fetch the data and return it to the main thread.
* @export
* @param {WApiCtx} ctx
* @param {Next} next
* @returns {{}}
*/ function* workerFetch(ctx: WApiCtx, next: Next) {
const { url, ...options } = ctx.req();
const worker = getVacacheWorker();
if (!worker) {
throw new Error("Worker not started");
}
const resolver = withResolvers<Result<Response>>();
const id = `${Date.now()}-${Math.random()}-${nanoid()}`;
const handleMessage = (event: MessageEvent) => {
if (event.data?.id === id) {
worker.removeEventListener("message", handleMessage);
if (event.data.error) {
resolver.reject(new Error(event.data.error));
return;
}
const { response } = event.data;
// Handle the stream response
if (response.body instanceof ReadableStream) {
const reader = response.body.getReader();
const chunks: any[] = [];
const processChunks = () =>
reader.read().then(({ done, value }) => {
if (done) {
// Construct response only if the status allows a body
const options: ResponseInit = { status: response.status };
if (response.headers) {
options.headers = new Headers(response.headers);
}
const body = [204, 205, 304].includes(response.status)
? null // No body for these status codes
: JSON.stringify(chunks);
resolver.resolve(Ok(new Response(body, options)));
} else {
chunks.push(value);
return processChunks();
}
});
processChunks();
} else {
// Handle non-stream responses
const options: ResponseInit = { status: response.status };
if (response.headers) {
options.headers = new Headers(response.headers);
}
const body = [204, 205, 304].includes(response.status)
? null // No body for these status codes
: JSON.stringify(response.body);
resolver.resolve(Ok(new Response(body, options)));
}
}
};
worker.addEventListener("message", handleMessage);
worker.postMessage({ id, url, options });
const res = yield* resolver.operation;
ctx.response = isOk(res)
? res.value
: new Response(JSON.stringify({ error: res.error }), { status: 500 });
ctx.json = yield* safe(() => ctx.response.json());
yield* next();
}
@VldMrgnn
Copy link
Author

VldMrgnn commented Jan 18, 2025

A few notes on usage of the workersfetch middleware

The flow now goes like this:

Step 1: The main thread requests the worker (via Starfx middleware).
Step 2: The worker does some validations and if needed forwards the request to the backend.
Step 3: The worker interprets the results and sends them back to the main thread.

Main Thread Setup

Configure your API with the required middleware, including the custom workerFetch.

// main thread: API #configuration
export const vacaWorker = createApi<ApiCtx>();
vacaWorker.use(debugMdw);
vacaWorker.use(mdw.err);
vacaWorker.use(mdw.api({ schema }));
vacaWorker.use(mdw.queryCtx);
vacaWorker.use(vacaWorker.routes());
vacaWorker.use(workerFetch); // Adds fetch via worker

Thunk Example on Main Thread

Define a thunk to trigger the GET request through the worker and process the returned response.

// thunk example on main thread
export const fetchThroughWorker = vacaWorker.get<string>(
  "vaca/fetchThroughWorker",
  { supervisor: takeEvery },
  function* (ctx, next) {
    ctx.request = ctx.req({ method: "GET", url: ctx.payload });
    yield* next();
    const data = ctx.json;
    // ... further processing as needed
  },
);

Worker Thunk for Data Processing

The worker receives the request, processes data, and sends the result back. We set up a thunk middleware in worker too.

const vacaShard = wThunks.create<{
  id: string;
  url: string;
  options: any;
}>(
  "/thunks/vcw/vacaShard",
  { supervisor: takeEvery },
  function* (ctx: ThunkCtx, next: Next) {
    const { id, url, options } = ctx.payload;

    try {
      // Process and fetch the data
      const processedResult = []; // ...fetch and process the data

      // Send back the processed dataset to the main thread
      yield* call(() => respondWithDataset(id, processedResult));
    } catch (error) {
      // Post error back to the main thread (error handling excluded for brevity)
    }
    yield* next();
  },
);

Sending the Response from the Worker

Return the processed result to the main thread—either as a complete response or a stream.

export function* respondWithDataset(
  id: string,
  processedResult: Result<any[] | number>,
) {
  if (!isOk(processedResult)) {
    self.postMessage({
      id,
      response: {
        status: 500, // 204, 205, 304, etc.
        body: { error: processedResult.error.message }, // or null 
        headers: { "Content-Type": "application/json" },
      },
    });
    return;
  }
  // On success, send data back as a stream
  yield* call(() => sendResponseAsStream(id, processedResult.value));
}

Helper Functions for Streaming

Create and send a readable stream from an array of data.

function createReadableStream(data: any[]) {
  let index = 0;
  return new ReadableStream({
    pull(controller) {
      if (index < data.length) {
        controller.enqueue(data[index++]);
      } else {
        controller.close();
      }
    },
    cancel() {
      console.log("Stream cancelled");
    },
  });
}

function sendResponseAsStream(id: string, data: any[]) {
  const stream = createReadableStream(data);
  self.postMessage(
    {
      id,
      response: {
        status: 200,
        body: stream,
        headers: { "Content-Type": "application/json" },
      },
    },
    [stream],
  );
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment