Skip to content

Instantly share code, notes, and snippets.

@skorfmann
Created November 16, 2024 20:31
Show Gist options
  • Save skorfmann/30ddcee87510139c27ac1b38a5a4714b to your computer and use it in GitHub Desktop.
Save skorfmann/30ddcee87510139c27ac1b38a5a4714b to your computer and use it in GitHub Desktop.
cloudflare durable object relaying bluesky's jetstream. had memory issues, switched to a dedicated container
import { DurableObject } from "cloudflare:workers";
/**
* Welcome to Cloudflare Workers! This is your first Durable Objects application.
*
* - Run `npm run dev` in your terminal to start a development server
* - Open a browser tab at http://localhost:8787/ to see your Durable Object in action
* - Run `npm run deploy` to publish your application
*
* Bind resources to your worker in `wrangler.toml`. After adding bindings, a type definition for the
* `Env` object can be regenerated with `npm run cf-typegen`.
*
* Learn more at https://developers.cloudflare.com/durable-objects
*/
/** A Durable Object's behavior is defined in an exported Javascript class */
export class SkymoodRelay extends DurableObject {
private clients: Map<WebSocket, Set<string>>;
private upstreamSocket: WebSocket | null;
private state: DurableObjectState;
public env: Env;
constructor(state: DurableObjectState, env: Env) {
super(state, env);
this.state = state;
this.env = env;
this.clients = new Map();
this.upstreamSocket = null;
}
async webSocketConnect(ws: WebSocket) {
ws.accept();
this.clients.set(ws, new Set());
console.log(`Client connected. Total clients: ${this.clients.size}`);
this.broadcastClientCount();
ws.addEventListener('message', (event) => {
try {
const message = JSON.parse(event.data as string);
if (message.type === 'filter' && message.emoji) {
const clientFilters = this.clients.get(ws);
if (clientFilters) {
if (message.emoji === 'clear') {
clientFilters.clear();
} else {
clientFilters.add(message.emoji);
}
}
}
} catch (error) {
console.error('Error processing client message:', error);
}
});
ws.addEventListener('close', () => {
this.clients.delete(ws);
console.log(`Client disconnected. Total clients: ${this.clients.size}`);
this.broadcastClientCount();
ws.close();
if (this.clients.size === 0 && this.upstreamSocket) {
console.log('No more clients, closing upstream connection');
this.upstreamSocket.close();
this.upstreamSocket = null;
}
});
// If we don't have an upstream connection, create one
if (!this.upstreamSocket) {
console.log('Initiating upstream connection');
await this.connectToUpstream();
}
}
private async connectToUpstream() {
// Close existing connection if it exists
if (this.upstreamSocket) {
this.upstreamSocket.close();
this.upstreamSocket = null;
}
console.log('Connecting to Bluesky firehose...');
const ws = new WebSocket('wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post');
// Store the socket only after successful connection
ws.addEventListener('open', () => {
this.upstreamSocket = ws;
console.log('Upstream connection established');
});
ws.addEventListener('message', async (event) => {
// Only process messages if this is still the active connection
if (ws !== this.upstreamSocket) return;
try {
const data = JSON.parse(typeof event.data === 'string'
? event.data
: new TextDecoder().decode(event.data as ArrayBuffer));
if (data.kind === 'commit' && data.commit.operation === 'create') {
const postText = data.commit.record.text?.toLowerCase() || '';
const emojiRegex = /[\p{Emoji_Presentation}\p{Extended_Pictographic}]/gu;
const emojis = [...new Set(postText.match(emojiRegex) || [])];
if (emojis.length > 0) {
// console.log(`Found post with ${emojis.length} unique emoji(s): ${emojis.join(' ')}`);
// Broadcast to clients based on their filters
const failures = [];
for (const [client, filters] of this.clients) {
try {
// Always send emoji-only message
client.send(JSON.stringify({ type: 'emojis', emojis }));
// If filters exist and match, also send full post data
if (filters.size > 0 && emojis.some(emoji => filters.has(emoji))) {
client.send(JSON.stringify({
type: 'post',
text: data.commit.record.text,
url: `https://bsky.app/profile/${data.did}/post/${data.commit.rkey}`,
timestamp: Date.now(),
emojis
}));
}
} catch (err) {
console.error('Failed to send to client:', err);
failures.push(client);
}
}
// Clean up failed connections
for (const failed of failures) {
this.clients.delete(failed);
console.log('Removed failed client connection');
}
}
}
} catch (error) {
console.error('Error processing message:', error);
}
});
ws.addEventListener('close', () => {
// Only attempt reconnection if this was the active connection
if (ws === this.upstreamSocket) {
console.log('Upstream connection closed');
this.upstreamSocket = null;
// Attempt to reconnect after a delay if we still have clients
if (this.clients.size > 0) {
console.log('Scheduling reconnection attempt in 5 seconds...');
setTimeout(() => this.connectToUpstream(), 5000);
}
}
});
}
async fetch(request: Request) {
if (request.headers.get('Upgrade') === 'websocket') {
const pair = new WebSocketPair();
await this.webSocketConnect(pair[1]);
return new Response(null, { status: 101, webSocket: pair[0] });
}
return new Response('Expected WebSocket', { status: 400 });
}
private broadcastClientCount() {
const message = JSON.stringify({
type: 'clientCount',
count: this.clients.size
});
for (const client of this.clients.keys()) {
try {
client.send(message);
} catch (err) {
console.error('Failed to send client count:', err);
}
}
}
}
export default {
/**
* This is the standard fetch handler for a Cloudflare Worker
*
* @param request - The request submitted to the Worker from the client
* @param env - The interface to reference bindings declared in wrangler.toml
* @param ctx - The execution context of the Worker
* @returns The response to be sent back to the client
*/
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const id = env.SKYMOOD_RELAY.idFromName('default-instance');
const stub = env.SKYMOOD_RELAY.get(id);
return stub.fetch(request);
},
} satisfies ExportedHandler<Env>;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment