Skip to content

Instantly share code, notes, and snippets.

@VldMrgnn
Last active November 19, 2024 15:19
Show Gist options
  • Save VldMrgnn/63f015a262ebfd06cabc7cb852babbf7 to your computer and use it in GitHub Desktop.
Save VldMrgnn/63f015a262ebfd06cabc7cb852babbf7 to your computer and use it in GitHub Desktop.
sync a Starfx slice via y-websocket
import http from "http";
import wsUtils from "y-websocket/bin/utils";
import * as Y from "yjs";
import WebSocket from "ws";
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
const docs = new Map < string, Y.Doc> ();
const getYDoc = (roomName: string): Y.Doc => {
if (!docs.has(roomName)) {
const newDoc = new Y.Doc();
docs.set(roomName, newDoc);
console.log(`Created new Y.Doc for room: ${roomName}`);
}
return docs.get(roomName)!;
};
wss.on("connection", (conn, req) => {
console.log("WebSocket connection established");
const urlParts = new URL(req.url || "", `http://${req.headers.host}`);
const roomName = urlParts.pathname.slice(1);
if (!roomName) {
console.error("No room name provided in the URL");
conn.close();
return;
}
const doc = getYDoc(roomName);
wsUtils.setupWSConnection(conn, req, {
doc,
// awareness: new Y.Awareness(doc), //
gc: true,
});
console.log(`Client connected to room: ${roomName}`);
});
// just for dev purposes
setInterval(() => {
console.log("Cleaning up empty YDocs...");
docs.forEach((doc, roomName) => {
if (doc.getMap("state").size === 0) {
docs.delete(roomName);
console.log(`Deleted empty room: ${roomName}`);
}
});
}, 60000); // Run every minute
const port = process.env.PORT;
server.listen(port, () => {
/////...
});
import {
createLocalStorageAdapter, createPersistor, createStore, parallel, PERSIST_LOADER_ID,
persistStoreMdw, take
} from 'starfx';
import { transform } from './yjsAdapter';
import { initialState, schema } from './schema';
export type AppState = typeof initialState;
const persistor = createPersistor<AppState>({
adapter: createLocalStorageAdapter<AppState>(),
allowlist: ["applog", "test"],
transform: transform
});
const store = createStore({
initialState: initialState,
middleware: [persistStoreMdw(persistor)],
});
export const setupState = (pName) => {
store.run(function* () {
yield* persistor.rehydrate();
yield* schema.update(schema.loaders.success({ id: PERSIST_LOADER_ID }));
const group = yield* parallel([
//...tunks, sagas, otehr tasks
function* () {
while (true) {
const action = yield* take("YJS_UPDATE");
yield* store.update((s: AppState) => {
for (const [key, value] of Object.entries(action.payload)) {
s[key] = value;
}
});
}
},
]);
yield* group;
});
return store;
};
import sum from 'hash-sum';
import { createTransform } from 'starfx';
import { WebsocketProvider } from 'y-websocket'; // Websocket provider
import * as Y from 'yjs';
import { AppState } from '@/types';
const base =
process.env.NODE_ENV === "development"
? process.env.VITE_SERVICE
: process.env.VITE_SERVICE_PROD;
const wsBase = base.replace("http", "ws");
const yDoc = new Y.Doc();
const yState = yDoc.getMap("state");
const provider = new WebsocketProvider(
`${wsBase}/yadapter`,
"starfx-room",
yDoc
);
provider.connect();
const watchedKeys: (keyof AppState)[] = ["test"];
export const transform = createTransform<AppState>();
transform.in = function (state: Partial<AppState>) {
const origin = "starfx-local-update";
yDoc.transact((transaction) => {
for (const watchKey of watchedKeys) {
const existingValue = yState.get(watchKey) as AppState[keyof AppState];
// any fast comparer ....
if (sum(existingValue) !== sum(state[watchKey])) {
yState.set(watchKey, state[watchKey]);
} else {
console.log('filtered-out');
}
}
}, origin);
return state;
};
// Listen for Y.js updates
yState.observe((event, transaction) => {
const origin = transaction.origin;
const changes: Partial<AppState> = {};
event.keysChanged.forEach((key) => {
if (watchedKeys.includes(key as keyof AppState)) {
changes[key] = yState.get(key) as AppState[keyof AppState];
}
});
if (Object.keys(changes).length > 0) {
if (origin === "starfx-local-update") {
// console.log("Update originated from Starfx transform.in");
void 0;
} else {
// console.log("Update originated externally");
window.fx.dispatch({ type: "YJS_UPDATE", payload: changes });
}
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment