Skip to content

Instantly share code, notes, and snippets.

@VldMrgnn
Last active July 28, 2024 15:33
Show Gist options
  • Save VldMrgnn/0a4b3082e33a48fbee27b6d057866bca to your computer and use it in GitHub Desktop.
Save VldMrgnn/0a4b3082e33a48fbee27b6d057866bca to your computer and use it in GitHub Desktop.
indexedDB starfx persistor with forwarding state to server for avoiding eviction
// we could store on filesystem as well...
export async function setPersisted(req: Request) {
const group = Object.values(req.session.groups || {}).find(
(x) => x.PERSEED === req.session.INSIDER
);
if (!group) {
console.log("setPersisted", "no group found");
return {};
}
// const { data } = req.body;
const data = req.body;
if (process.env.NODE_ENV === "development") {
const sizeInMB = data.length / 1_048_576;
console.log("[dev.] sizeInMB", sizeInMB);
}
const key = `${rkey.INSIDER_STATE}:${req.session.email}`;
// Store buffer in Redis directly as binary data
rasydb0.hset(key, ["persist", data]);
return {}; // We don't use the return value
}
export async function getPersisted(req: Request) {
const key = `${rkey.INSIDER_STATE}:${req.session.email}`;
const base64Data = (await rasydb0.hget(key, "persist")) as string;
return base64Data; // (!)
}
import { openDB } from "idb";
import storage from "localforage";
import {
AnyState,
call,
Err,
Ok,
Operation,
PersistAdapter,
Result,
select as starfxSelect,
sleep,
} from "starfx";
import { PERSISTOR_NAME } from "@app/state/constants";
import { schema } from "@app/statefx/schema";
import { processSkipForwardPersistor } from "@app/statefx/selectors/applog.sel";
import { getPxWorker } from "@app/statefx/workers/pxworker-factory";
import { allowlist, AppState } from "@app/statefx/xstore";
import { debounceThx } from "@app/statefx/xThunks/ui-persist";
const CONST_PERSISTOR_VERSION = 9;
export function* forwardPersitence(
key: string = null,
jsonState: string = null
) {
if (jsonState === null) {
const s_ = yield* starfxSelect((a) => a);
const s = {};
for (const k of allowlist) {
s[k] = s_[k];
}
jsonState = JSON.stringify(s);
}
const who = yield* starfxSelect(schema.whoami.select);
if (who.wai_database) {
const pxWorker = getPxWorker();
if (!pxWorker) {
console.log("no pxWorker");
}
if (pxWorker) {
// custom debounce middleware
const canPost = yield* call(() => debounceThx.run());
if (canPost.key === "out") {
return Ok(undefined);
}
// includes the lock and the collection active
const shouldSkip = yield* starfxSelect(processSkipForwardPersistor);
// create the waiting mechanism but exit if the lock is on
if (shouldSkip) {
return Ok(undefined);
}
if (canPost.key === "in") {
// efficient way to send the data
// Encode the JSON string to an ArrayBuffer
const buffer = new TextEncoder().encode(jsonState).buffer;
console.log(buffer.byteLength, "before"); // 4MB
pxWorker.postMessage(
{
type: "/runner/arrayBuffer",
data: buffer,
},
[buffer]
);
console.log(buffer.byteLength, "after"); //0 bytes
}
}
}
}
export const openDbfn = (pName = PERSISTOR_NAME) => {
if (!pName) throw new Error("pName is required");
return openDB(pName, CONST_PERSISTOR_VERSION, {
upgrade(db) {
if (!db.objectStoreNames.contains("persist")) {
db.createObjectStore("persist");
}
// ui
if (!db.objectStoreNames.contains("ui")) {
db.createObjectStore("ui");
}
// workermain
if (!db.objectStoreNames.contains("workermain")) {
db.createObjectStore("workermain");
}
},
});
};
export function createIDBStorageAdapter<S extends AnyState>(
pName: string
): PersistAdapter<S> {
storage.config({
driver: storage.INDEXEDDB,
name: pName,
version: CONST_PERSISTOR_VERSION,
storeName: "persist",
});
return {
getItem: function* get(key: string): Operation<Result<S>> {
try {
const ostorage = yield* call(storage.getItem(key)) as Operation<string>;
const returnvalue = JSON.parse(ostorage || "{}");
return Ok(returnvalue);
} catch (error) {
console.error("Error getting item from storage:", error);
return Err(error);
}
},
setItem: function* set(key: string, s: Partial<S>) {
const state = JSON.stringify(s);
try {
yield* call(storage.setItem(key, state));
// forward a slice of the state to persist in redis.
yield* forwardPersitence(key, state);
// maybe is too much ....
return Ok(undefined);
} catch (error) {
console.error("Error setting item to storage:", error, key, state);
return Err(error);
}
},
removeItem: function* remove(key: string) {
try {
yield* call(storage.removeItem(key));
return Ok(undefined);
} catch (error) {
console.error("Error removing item from storage:", error);
return Err(error);
}
},
};
}
import { each, ensure, main, on, resource, run } from "effection";
import { gzip } from "fflate";
import {
call,
createApi,
createThunks,
keepAlive,
mdw,
parallel,
put,
sleep,
spawn,
take,
takeLeading,
} from "starfx";
import { IndexeddbPersistence } from "y-indexeddb";
import { WebsocketProvider } from "y-websocket";
import * as Y from "yjs";
import { service } from "@app/service";
import { PERSISTOR_NAME } from "@app/state/constants";
import { isErr, isOk } from "../helpers";
import { parseMessageEvent } from "./helpers";
const dev = process.env.NODE_ENV === "development";
const serverUrl =
process.env.NODE_ENV === "production"
? process.env.REACT_APP_SERVERURL_PROD || ""
: process.env.REACT_APP_SERVERURL_DEV || "";
const wsUrl = serverUrl.replace("http", "ws");
const pxMap = new Map<string, any>();
pxMap.set("hydrate", true);
pxMap.set("email", "");
pxMap.set("perseed", "");
//
import type { ApiCtx, ThunkCtx, Next } from "starfx";
let storeName; // [findme#320]
type TWorkerParams = {
key: string;
url: URL;
req: Request;
data: any;
body?: any;
isLast?: boolean;
meta?: {
halt: boolean;
};
};
// gas
const persitInRedis = [
//.. ui stuff
];
//ice = where shid0 is null
//liquid = where shid0 is not null
const tables = [
"currencyRates",
//'....
];
const shid0s = [];
// ["currencyRates", "invoiceFiscal", "SPVInvoiceRaw", "SPVMessages", "SPVMessageKeys", "SPVMessageUserState"];
/* -------------------------------------------------------------------------- */
const setStoreName = (name: string) => {
if (!name) {
return;
}
// first we have no email.
// the worker starts on saga-query store bootup
storeName = name;
run(function* () {
yield* call(() => getWsPersist.run());
});
};
const getStoreName = () => {
return storeName;
};
function* setStoreParams({
perseed,
email,
}: {
perseed: string;
email: string;
}) {
const stopSync = yield* call(pxSync(email, perseed));
console.log("stopSync >> SHOULD BE THE TEXT OF THE FUNCTION:", stopSync);
// to really stop the sync we need to call the stopSync function
}
const pxApi = createApi<ApiCtx>();
pxApi.use(mdw.err);
pxApi.use(mdw.queryCtx);
pxApi.use(mdw.nameParser);
pxApi.use(pxApi.routes());
pxApi.use(mdw.fetch({ baseUrl: service }));
export const pxThunks = createThunks<ThunkCtx>();
pxThunks.use(mdw.err);
pxThunks.use(pxThunks.routes());
const filterStore = (s) => {
const subset = {};
for (const key of persitInRedis) {
if (key in s) {
subset[key] = s[key];
}
}
// latest data: maybe is too much
for (const key of shid0s) {
if (key in s) {
subset[key] = Object.fromEntries(
Object.entries(s[key]).filter(([k, v]: [string, any]) => !!v?.shid0)
);
}
}
// the full dataset. idk how o optimize
for (const key of tables) {
if (key in s) {
subset[key] = s[key];
}
}
return subset;
};
/* --------------------------------------------------------------------------
v1. hand made
/* -------------------------------------------------------------------------- */
function compressData(data: unknown): Promise<ArrayBuffer> {
// Convert the object to a JSON string
const jsonString = JSON.stringify(data);
return new Promise((resolve, reject) => {
// Gzip compress the JSON string
gzip(new TextEncoder().encode(jsonString), (err, compressed) => {
if (err) {
reject(err);
} else {
// Resolve with the buffer part of the Uint8Array
resolve(
compressed.buffer.slice(
compressed.byteOffset,
compressed.byteOffset + compressed.byteLength
)
);
}
});
});
}
const getPersist = pxApi.get(
"/persist",
{ supervisor: takeLeading },
function* (ctx: ApiCtx, next: Next) {
yield* next();
if (isErr(ctx.json)) {
return;
}
if (isOk(ctx.json)) {
if (ctx?.json?.value) {
/// can we transfer the data to the main thread?
self.postMessage({ type: "/rehydrate", payload: ctx.json.value });
}
}
}
);
const postPersist = pxApi.post<ArrayBuffer>(
"/persist",
{ supervisor: takeLeading },
function* (ctx, next) {
const jsonString = new TextDecoder().decode(ctx.payload);
const data = JSON.parse(jsonString);
const relevantData = filterStore(data);
const compressed = yield* call(() => compressData(relevantData));
// inspect the size of the compressed data in MB
// console.log(
// "compressed.byteLength",
// compressed.byteLength / 1024 / 1024,
// "MB"
// );
ctx.request = ctx.req({
method: "POST",
url: `/persist`,
headers: {
"Content-Type": "application/octet-stream",
"Content-Encoding": "gzip",
credentials: "include",
},
body: compressed,
});
yield* next();
}
);
function* worker() {
const dispatcher = function* () {
while (true) {
const action = yield* take("*");
const { type: actionType, payload } = action as {
type: string;
payload: any;
};
switch (actionType) {
default:
console.log("did you forget this action: ", actionType, "?");
if (actionType.startsWith("/nop/")) {
break;
}
break;
}
}
};
yield* spawn(() =>
run(function* () {
const group = yield* parallel([
dispatcher,
pxApi.bootup,
pxThunks.bootup,
]);
yield* group;
})
);
// listener //
for (const event of yield* each(on(self, "message"))) {
//the fastest track //
if (event.data?.type === "/runner/arrayBuffer") {
// yield* spawn(() => postPersist.run(event.data.data));
yield* spawn(() => shardPersist.run(event.data.data));
yield* each.next();
continue;
}
// on login, set the email and perseed
if (event.data?.type === "/upgrade") {
try {
const { email, perseed } = JSON.parse(event.data.data);
yield* spawn(() => setStoreParams({ perseed, email }));
} finally {
yield* each.next();
}
continue;
}
const paramResult = yield* parseMessageEvent(
event,
setStoreName,
getStoreName
);
if (isErr(paramResult)) {
yield* each.next();
continue;
}
const { url } = paramResult.value;
yield* put({ type: url.pathname, payload: paramResult.value });
yield* each.next();
}
}
keepAlive([main(worker)]);
/// IN PROGRESS. BUT PROVEN IT IS A VIABLE SOLUTION
import { each, ensure, main, on, resource, run } from 'effection';
import { gzip } from 'fflate';
import {
call, createApi, createThunks, keepAlive, mdw, parallel, put, sleep, spawn, take, takeLeading
} from 'starfx';
import { IndexeddbPersistence } from 'y-indexeddb';
import { WebsocketProvider } from 'y-websocket';
import * as Y from 'yjs';
import { service } from '@app/service';
import { PERSISTOR_NAME } from '@app/state/constants';
import { isErr, isOk } from '../helpers';
import { parseMessageEvent } from './helpers';
const dev = process.env.NODE_ENV === "development";
const serverUrl =
process.env.NODE_ENV === "production"
? process.env.REACT_APP_SERVERURL_PROD || ""
: process.env.REACT_APP_SERVERURL_DEV || "";
const wsUrl = serverUrl.replace("http", "ws");
const pxMap = new Map<string, any>();
pxMap.set("hydrate", true);
pxMap.set("email", "");
pxMap.set("perseed", "");
//
// ---------------
// this works somehow but not in production yet.
let pyDoc = new Y.Doc();
let provider: IndexeddbPersistence;
let wsProvider: WebsocketProvider;
const pxSync = (email, perseed) => {
return resource(function* (provide) {
if (!email) {
return;
}
/// some parameters mitigation
pyDoc = new Y.Doc();
provider = new IndexeddbPersistence(`${PERSISTOR_NAME}.sync`, pyDoc);
wsProvider = new WebsocketProvider(wsUrl, "starfx-persist", pyDoc, {
params: {
email,
perseed,
},
});
wsProvider.connect();
function* stopSync() {
console.log("stopSync");
yield* call(() => wsProvider.disconnect());
yield* call(() => provider.destroy());
}
wsProvider.on("status", (event) => {
console.log("WSPROVIDER", event.status); // logs "connected" or "disconnected"
});
pyDoc.on("afterTransaction", async (transaction, remote) => {
const once = pxMap.get("hydrate");
console.log("hydrate", once);
if (once) {
const persisted = remote.getMap("persistor");
const data = persisted.get("data");
if (data) {
//guardian :: compare the current login/perseed with the persisted one.
// some logic here
self.postMessage({
type: "/rehydrate",
payload: JSON.stringify(statePersisted),
});
pxMap.set("hydrate", false);
}
return;
}
});
yield* provide(stopSync);
});
};
// todo: with logout operation at the end, after sync
const shardPersist = pxThunks.create<ArrayBuffer>(
"/shardPersist/yjs",
{ supervisor: takeLeading },
function* (ctx, next) {
const jsonString = new TextDecoder().decode(ctx.payload);
const fullDataset = JSON.parse(jsonString);
const filtered = filterStore(fullDataset);
const persistor = pyDoc.getMap("persistor");
persistor.set("data", filtered);
// yield* call(() => provider.whenSynced);
yield* next();
}
);
const getWsPersist = pxThunks.create(
"/persist/first",
{ supervisor: takeLeading },
function* (ctx, next) {
pxMap.set("hydrate", true);
// yield* call(() => provider.whenSynced);
yield* next();
}
);
//....
const allowlist = [...] as (keyof AppState)[];
export const persistor = createPersistor<AppState>({
//m. main
adapter: createIDBStorageAdapter<AppState>(PERSISTOR_NAME),
allowlist: allowlist,
});
export const store = createStore({
initialState: initialState,
middleware: [persistStoreMdw(persistor)],
});
window["fx"] = store; //important //
export const runFxState = () => {
devtoolsEnabled && setupDevTool({}, { name: "starfx", enabled: true });
store.run(function* () {
yield* persistor.rehydrate();
yield* schema.update(schema.loaders.success({ id: PERSIST_LOADER_ID }));
// yield* spawn(() => initWorker());
const group = yield* parallel([
...tasks,
xApi.bootup,
xThunk.bootup,
xWorker.bootup,
xouterApi.bootup,
xDbc.bootup,
]);
yield* group;
});
return store;
};
// px.worker-factory.ts
interface IWorkerOne {
hello?: (name: string) => Promise<string>;
postMessage?: (data: any) => void;
onmessage?: (data: any) => void | null;
}
let workerInstance = null;
export const startPxWorker = () => {
if (!workerInstance) {
console.log("px.worker INSTANCE STARTING startWorker");
workerInstance = new Worker(new URL("./px.worker.ts", import.meta.url));
}
return workerInstance;
};
export const terminatePxWorker = () => {
if (workerInstance) {
workerInstance.terminate();
workerInstance = null;
}
};
export const getPxWorker = () => {
return workerInstance;
};
// index.ts
export const initPxWorker = (userseed: string) => {
return resource(function* (provide) {
const pxWorker = startPxWorker();
pxWorker.postMessage(`idb@${userseed}`);
pxWorker.onmessage = (e) => {
switch (e.data.type) {
case "hello":
console.log("px.worker: hello", e.data);
break;
case "pong":
console.log("px.worker: pong", e.data);
break;
case "/rehydrate": {
fxDispatch(redisRehydrate(e.data.payload));
break;
}
default: {
const { type, payload, json } = e.data;
fxDispatch({ type, payload: json ?? payload });
if (process.env.NODE_ENV === "development") {
console.log("[dev]. px.worker: default action", e.data);
}
break;
}
}
};
yield* provide(pxWorker);
});
};
@VldMrgnn
Copy link
Author

As work in progress there is another approach that works fine but needs more attention:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment