Skip to content

Instantly share code, notes, and snippets.

@schickling
Last active July 30, 2025 18:02
Show Gist options
  • Save schickling/7b5b19fb7eb4ae9a69c6d8ea11955b94 to your computer and use it in GitHub Desktop.
Save schickling/7b5b19fb7eb4ae9a69c6d8ea11955b94 to your computer and use it in GitHub Desktop.
Cloudflare Durable Object RPC Protocol Implementation with Effect - Complete implementation of protocol-durable-object.ts with comprehensive tests showing how to use Effect RPC between Durable Objects using Cloudflare's native RPC mechanism.
import { describe, expect, it } from 'vitest'
/**
* Test Architecture - Effect RPC between 2 Durable Objects
*
* ┌─────────────┐ HTTP ┌─────────────────┐
* │ Test Client │ ──────────▶ │ Worker (router) │
* │ (vitest) │ └─────────────────┘
* └─────────────┘ │
* │ routes to DOs
* ┌────────────────────────────────────────────┐
* │ │
* │ /test-rpc-client │ /test-rpc
* ▼ ▼
* ┌─────────────────┐ CF DO RPC ┌─────────────────┐
* │ Client DO │ ─────────────────────────▶│ Server DO │
* │ │ serverDO.rpc(payload) │ │
* │ RpcClient.make │ │ toDurableObject │
* │ TestRpcs │ │ Handler │
* │ │ │ │
* │ client.Ping() │ │ TestRpcs.toLayer│
* │ client.Echo() │ │ │
* │ client.Add() │ │ Ping/Echo/Add │
* └─────────────────┘ └─────────────────┘
*
* Test Path: Test → Worker → Client DO → Server DO (full Effect RPC)
*/
describe('Durable Object RPC', { timeout: 5000 }, () => {
// Idiomatic Effect RPC client tests
it('should use RPC client to call ping method', async () => {
const port = process.env.LIVESTORE_SYNC_PORT
const response = await fetch(`http://localhost:${port}/test-rpc-client?method=ping&message=Hello RPC Client`)
expect(response.status).toBe(200)
const result: any = await response.json()
expect(result.success).toBe(true)
expect(result.result).toEqual({ response: 'Pong: Hello RPC Client' })
})
it('should use RPC client to call echo method', async () => {
const port = process.env.LIVESTORE_SYNC_PORT
const response = await fetch(`http://localhost:${port}/test-rpc-client?method=echo&text=Echo via RPC Client`)
expect(response.status).toBe(200)
const result: any = await response.json()
expect(result.success).toBe(true)
expect(result.result).toEqual({ echo: 'Echo: Echo via RPC Client' })
})
it('should use RPC client to call add method', async () => {
const port = process.env.LIVESTORE_SYNC_PORT
const response = await fetch(`http://localhost:${port}/test-rpc-client?method=add&a=10&b=20`)
expect(response.status).toBe(200)
const result: any = await response.json()
expect(result.success).toBe(true)
expect(result.result).toEqual({ result: 30 })
})
it('should handle RPC client errors', async () => {
const port = process.env.LIVESTORE_SYNC_PORT
const response = await fetch(`http://localhost:${port}/test-rpc-client?method=unknown`)
expect(response.status).toBe(500)
const result: any = await response.json()
expect(result.success).toBe(false)
expect(result.error).toContain('Unknown method')
})
})
import {
Effect,
Headers,
Layer,
type Rpc,
RpcClient,
type RpcGroup,
RpcSerialization,
type Scope,
} from '@livestore/utils/effect'
/**
* Creates a Protocol layer that uses Cloudflare Durable Object RPC calls.
* This enables direct RPC communication with Durable Objects using Cloudflare's native RPC.
*/
export const layerProtocolDurableObject = (
callRpc: (payload: unknown) => Promise<unknown>,
): Layer.Layer<RpcClient.Protocol, never, RpcSerialization.RpcSerialization> =>
Layer.scoped(RpcClient.Protocol, makeProtocolDurableObject(callRpc))
/**
* Construct a Durable Object RPC handler from an `RpcGroup`.
* This is the DO equivalent of `RpcServer.toWebHandler`.
*
* This follows the same pattern as RpcServer.toWebHandler but adapts it for
* direct Durable Object RPC calls instead of HTTP.
*/
export const toDurableObjectHandler = <Rpcs extends Rpc.Any, LE>(
group: RpcGroup.RpcGroup<Rpcs>,
options: {
readonly layer: Layer.Layer<Rpc.ToHandler<Rpcs> | Rpc.Middleware<Rpcs> | RpcSerialization.RpcSerialization, LE>
readonly disableTracing?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly spanAttributes?: Record<string, unknown> | undefined
},
): ((serializedPayload: unknown) => Promise<unknown>) => {
return (serializedPayload: unknown): Promise<unknown> => {
return Effect.gen(function* () {
const serialization = yield* RpcSerialization.RpcSerialization
const parser = serialization.unsafeMake()
// Decode incoming requests - client sends array of requests
const decoded = parser.decode(serializedPayload as string | Uint8Array)
// Handle potential nested array from client serialization
let requests: any[]
if (Array.isArray(decoded) && decoded.length === 1 && Array.isArray(decoded[0])) {
// Double-wrapped array [[{...}]] -> [{...}]
requests = decoded[0]
} else if (Array.isArray(decoded)) {
// Single array [{...}]
requests = decoded
} else {
requests = []
}
const responses: any[] = []
// Get the context with handlers - similar to how RpcServer.make accesses handlers
const context = yield* Effect.context<Rpc.ToHandler<Rpcs> | Rpc.Middleware<Rpcs>>()
// Process each request - similar to the main server loop in RpcServer.make
for (const request of requests) {
if (request._tag !== 'Request') {
continue
}
// Find the RPC handler - similar to handleRequest in RpcServer line 225
const rpc = group.requests.get(request.tag) as any
const entry = context.unsafeMap.get(rpc?.key) as Rpc.Handler<Rpcs['_tag']>
if (!rpc || !entry) {
responses.push({
_tag: 'Exit',
requestId: request.id,
exit: {
_tag: 'Failure',
cause: { _tag: 'Die', defect: `Unknown request tag: ${request.tag}` },
},
})
continue
}
// Execute the handler - similar to line 241 in RpcServer
const result = yield* Effect.gen(function* () {
const handlerResult = entry.handler(request.payload, Headers.empty)
// Handle both Effect and Stream results
const value = Effect.isEffect(handlerResult) ? yield* handlerResult : handlerResult
return {
_tag: 'Exit' as const,
requestId: request.id,
exit: { _tag: 'Success' as const, value },
}
}).pipe(
Effect.catchAllCause((cause) =>
Effect.succeed({
_tag: 'Exit' as const,
requestId: request.id,
exit: { _tag: 'Failure' as const, cause },
}),
),
)
responses.push(result)
}
return parser.encode(responses)
}).pipe(Effect.provide(options.layer), Effect.scoped, Effect.runPromise)
}
}
/**
* Implementation of the RPC Protocol interface using Cloudflare Durable Object RPC calls.
* Provides the core protocol methods required by @effect/rpc.
*/
const makeProtocolDurableObject = (
callRpc: (payload: unknown) => Promise<unknown>,
): Effect.Effect<RpcClient.Protocol['Type'], never, Scope.Scope | RpcSerialization.RpcSerialization> =>
RpcClient.Protocol.make(
Effect.fnUntraced(function* (writeResponse) {
const serialization = yield* RpcSerialization.RpcSerialization
const parser = serialization.unsafeMake()
const send = (payload: any): Effect.Effect<void, any> => {
if (payload._tag !== 'Request') {
return Effect.void
}
// Wrap single Request in array to match server expected format
const serializedPayload = parser.encode([payload])
return Effect.gen(function* () {
const serializedResponse = yield* Effect.tryPromise({
try: () => callRpc(serializedPayload),
catch: (cause) => ({
_tag: 'ProtocolError' as const,
message: 'Failed to send Durable Object RPC payload',
cause,
}),
})
// Cloudflare DO RPC returns serialized responses, need to decode them
const decodedResponse = parser.decode(serializedResponse as string | Uint8Array)
// Handle potential nested array from serialization (same as server-side)
let responseArray: any[]
if (Array.isArray(decodedResponse) && decodedResponse.length === 1 && Array.isArray(decodedResponse[0])) {
// Double-wrapped array [[Exit]] -> [Exit]
responseArray = decodedResponse[0]
} else if (Array.isArray(decodedResponse)) {
// Single array [Exit]
responseArray = decodedResponse
} else {
responseArray = [decodedResponse]
}
for (const exitResponse of responseArray) {
yield* writeResponse(exitResponse)
}
})
}
return {
send,
supportsAck: false, // DO RPC doesn't support ack mechanism like WebSockets
supportsTransferables: false, // DO RPC doesn't support transferables yet
}
}),
)
/// <reference types="@cloudflare/workers-types" />
import { DurableObject } from 'cloudflare:workers'
import { layerProtocolDurableObject, toDurableObjectHandler } from '@livestore/common-cf'
import { Effect, Layer, Rpc, RpcClient, RpcGroup, RpcSerialization, Schema } from '@livestore/utils/effect'
export class TestRpcs extends RpcGroup.make(
Rpc.make('Ping', {
payload: Schema.Struct({ message: Schema.String }),
success: Schema.Struct({ response: Schema.String }),
}),
Rpc.make('Echo', {
payload: Schema.Struct({ text: Schema.String }),
success: Schema.Struct({ echo: Schema.String }),
}),
Rpc.make('Add', {
payload: Schema.Struct({ a: Schema.Number, b: Schema.Number }),
success: Schema.Struct({ result: Schema.Number }),
}),
) {}
export interface Env {
TEST_RPC_DO: DurableObjectNamespace<TestRpcDurableObject>
TEST_RPC_CLIENT_DO: DurableObjectNamespace
}
export class TestRpcDurableObject extends DurableObject {
__DURABLE_OBJECT_BRAND = 'TestRpcDurableObject' as never
async rpc(payload: unknown): Promise<unknown> {
const TestRpcsLive = TestRpcs.toLayer({
Ping: ({ message }) => Effect.succeed({ response: `Pong: ${message}` }),
Echo: ({ text }) => Effect.succeed({ echo: `Echo: ${text}` }),
Add: ({ a, b }) => Effect.succeed({ result: a + b }),
})
return toDurableObjectHandler(TestRpcs, {
layer: Layer.mergeAll(TestRpcsLive, RpcSerialization.layerJson),
})(payload)
}
}
export class TestRpcClientDO extends DurableObject {
__DURABLE_OBJECT_BRAND = 'TestRpcClientDO' as never
readonly env: Env
constructor(state: DurableObjectState, env: Env) {
super(state, env)
this.env = env
}
async fetch(request: Request): Promise<Response> {
try {
const url = new URL(request.url)
if (url.pathname === '/call-server') {
const method = url.searchParams.get('method')
const serverDO = this.env.TEST_RPC_DO.get(this.env.TEST_RPC_DO.idFromName('test-server'))
// Create protocol layer for DO RPC communication
const ProtocolLive = layerProtocolDurableObject((payload) => serverDO.rpc(payload)).pipe(
Layer.provide(RpcSerialization.layerJson),
)
// Use idiomatic Effect RPC client pattern from README
const program = Effect.gen(function* () {
const client = yield* RpcClient.make(TestRpcs)
// Call RPC methods using clean API
switch (method) {
case 'ping': {
const message = url.searchParams.get('message') || 'Hello'
return yield* client.Ping({ message })
}
case 'echo': {
const text = url.searchParams.get('text') || 'Hello World'
return yield* client.Echo({ text })
}
case 'add': {
const a = Number.parseInt(url.searchParams.get('a') || '5')
const b = Number.parseInt(url.searchParams.get('b') || '3')
return yield* client.Add({ a, b })
}
default:
return yield* Effect.fail(new Error(`Unknown method: ${method}`))
}
}).pipe(Effect.scoped)
const result = await program.pipe(Effect.provide(ProtocolLive), Effect.runPromise)
return new Response(JSON.stringify({ success: true, result }), {
headers: { 'Content-Type': 'application/json' },
})
}
return new Response('Not found', { status: 404 })
} catch (error) {
return new Response(JSON.stringify({ success: false, error: String(error) }), {
status: 500,
headers: { 'Content-Type': 'application/json' },
})
}
}
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
try {
const url = new URL(request.url)
// Idiomatic Effect RPC client testing
if (url.pathname === '/test-rpc-client') {
const clientDO = env.TEST_RPC_CLIENT_DO.get(env.TEST_RPC_CLIENT_DO.idFromName('test-client'))
const method = url.searchParams.get('method') ?? 'ping'
const clientUrl = new URL('/call-server', request.url)
clientUrl.searchParams.set('method', method)
// Forward parameters
for (const [key, value] of url.searchParams) {
if (key !== 'method') {
clientUrl.searchParams.set(key, value)
}
}
return clientDO.fetch(clientUrl.toString())
}
return new Response('Idiomatic Effect RPC Test\n\nEndpoints:\n- /test-rpc-client?method=ping|echo|add', {
headers: { 'Content-Type': 'text/plain' },
})
} catch (error) {
return new Response(`Error: ${error}`, { status: 500 })
}
},
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment