Skip to content

Instantly share code, notes, and snippets.

@artalar
Last active July 14, 2025 08:25
Show Gist options
  • Save artalar/01274a5a875d22d96515278e1f5a43aa to your computer and use it in GitHub Desktop.
Save artalar/01274a5a875d22d96515278e1f5a43aa to your computer and use it in GitHub Desktop.
Jazz Inbox feature example

Jazz Inbox: RPC-like Server Endpoints

Server Setup

Before implementing inbox-based endpoints, you need to set up a Jazz server worker.

Generating Worker Credentials

Server workers have Jazz accounts with static credentials. Generate new credentials by running:

npx jazz-run account create --name "Balance Service Worker"

This will output:

JAZZ_WORKER_ACCOUNT=co_zH3i4n2V8X...
JAZZ_WORKER_SECRET=sealerSecret_a5DyE7H9c2...

Storing Credentials

Store these credentials as environment variables. Handle the Account Secret like any other secret (DB password, API key, etc.)

# .env file
JAZZ_WORKER_ACCOUNT=co_zH3i4n2V8X...
JAZZ_WORKER_SECRET=sealerSecret_a5DyE7H9c2...

Starting the Worker

import { startWorker } from 'jazz-tools/worker';

const { worker, experimental: { inbox } } = await startWorker({
  syncServer: 'wss://cloud.jazz.tools/[email protected]',
  // Credentials are automatically read from JAZZ_WORKER_ACCOUNT and JAZZ_WORKER_SECRET
  // Or pass them explicitly:
  // accountID: process.env.JAZZ_WORKER_ACCOUNT,
  // accountSecret: process.env.JAZZ_WORKER_SECRET,
});

console.log("Worker started:", worker.id);

Base Guide

The Jazz inbox system enables RPC-like communication for operations requiring server-side validation, atomic updates, or controlled access. Unlike traditional request/response patterns, Jazz leverages automatic data synchronization for reads while using inbox messages only for writes.

Defining the Data Model

Create a balance system where only the worker can modify balances, but users can subscribe to changes:

import { co, z, Group } from "jazz-tools";

// User balance - readable by owner, writable only by worker
export const UserBalance = co.map({
  userId: z.string(),
  balance: z.number(),
  lastUpdated: z.number(),
});

// User's private root to store balance reference
export const UserPrivateRoot = co.map({
  balanceId: z.string(),
});

// Purchase record - immutable audit trail
export const PurchaseRecord = co.map({
  userId: z.string(),
  itemId: z.string(),
  amount: z.number(),
  timestamp: z.number(),
  balanceAfter: z.number(),
});

// Write operations via inbox
export const BuyItemRequest = co.map({
  type: z.literal("buy"),
  itemId: z.string(),
  amount: z.number(),
});

export const CreateAccountRequest = co.map({
  type: z.literal("createAccount"),
  initialBalance: z.number(),
});

export const InboxMessage = z.discriminatedUnion("type", [
  BuyItemRequest,
  CreateAccountRequest,
]);

Implementing Write Operations

The worker handles only write operations through the inbox:

inbox.subscribe(
  InboxMessage,
  async (message, senderAccountID) => {
    const senderAccount = await co.account().load(senderAccountID, { loadAs: worker });
    if (!senderAccount) return;

    switch (message.type) {
      case "createAccount": {
        // Create balance with restricted permissions
        const balanceGroup = Group.create({ owner: worker });
        balanceGroup.addMember(senderAccount, "reader");
        
        const balance = UserBalance.create(
          {
            userId: senderAccountID,
            balance: message.initialBalance,
            lastUpdated: Date.now(),
          },
          { owner: balanceGroup }
        );
        
        // Store balance ID in user's private root for easy access
        const userRoot = senderAccount.root;
        if (userRoot) {
          userRoot.balanceId = balance.id;
          await userRoot.waitForSync();
        }
        
        return balance;
      }

      case "buy": {
        // Load user's balance from their root
        const userRoot = senderAccount.root as UserPrivateRoot | undefined;
        if (!userRoot?.balanceId) throw new Error("No account found");
        
        const balance = await UserBalance.load(userRoot.balanceId, { loadAs: worker });
        if (!balance) throw new Error("Balance not found");
        
        // Validate and update atomically
        if (balance.balance < message.amount) {
          throw new Error("Insufficient balance");
        }
        
        balance.balance -= message.amount;
        balance.lastUpdated = Date.now();
        
        // Create audit record
        const purchaseGroup = Group.create({ owner: worker });
        purchaseGroup.addMember(senderAccount, "reader");
        
        const purchase = PurchaseRecord.create(
          {
            userId: senderAccountID,
            itemId: message.itemId,
            amount: message.amount,
            timestamp: Date.now(),
            balanceAfter: balance.balance,
          },
          { owner: purchaseGroup }
        );
        
        await Promise.all([
          balance.waitForSync(),
          purchase.waitForSync()
        ]);
        
        return purchase;
      }
    }
  },
  { retries: 3 }
);

Client-Side: Writes via Inbox, Reads via Subscription

import { InboxSender, useCoState } from "jazz-tools";
import { UserBalance, BuyItemRequest } from "./schema";

// Initialize once - for write operations
const purchaseService = await InboxSender.load(
  WORKER_ACCOUNT_ID,
  currentUserAccount
);

// Create account (one-time setup)
const balance = await purchaseService.sendMessage(
  CreateAccountRequest.create({
    type: "createAccount",
    initialBalance: 100,
  })
);

// Subscribe to balance changes (real-time updates)
// In React:
const balance = useCoState(UserBalance, currentUserAccount.root?.balanceId);

// In vanilla JS:
const unsubscribe = UserBalance.subscribe(
  currentUserAccount.root?.balanceId,
  (balance) => {
    console.log("Current balance:", balance?.balance);
  }
);

// Make purchases through inbox
try {
  const purchase = await purchaseService.sendMessage(
    BuyItemRequest.create({
      type: "buy",
      itemId: "item-123",
      amount: 25,
    })
  );
  // Balance will auto-update via subscription
} catch (error) {
  console.error("Purchase failed:", error.message);
}

Interesting Points

Read vs Write Separation

  • Reads are automatic: Clients subscribe directly to CoValues they have permission to read
  • Writes go through inbox: Only operations that modify state need inbox messages
  • No "get" methods needed: Jazz's automatic sync eliminates request/response for reads
  • Real-time updates: All permitted clients see changes immediately

State Machine Pattern

Instead of request/response, use state transitions:

export const Order = co.map({
  status: z.enum(["draft", "processing", "completed", "failed"]),
  items: z.array(z.string()),
  total: z.number(),
  error: z.optional(z.string()),
});

// Client creates order in "draft" state
const order = Order.create({ 
  status: "draft", 
  items: ["item-1"], 
  total: 50 
});

// Client transitions to "processing"
order.status = "processing";

// Worker subscribes to orders and processes them
Order.subscribe(orderId, (order) => {
  if (order?.status === "processing") {
    processOrder(order);
  }
});

Permission Patterns

1. Worker-Only Write

const group = Group.create({ owner: worker });
group.addMember(user, "reader");
// User can read but not write

2. Shared Write Access

const group = Group.create({ owner: worker });
group.addMember(user, "writer");
// Both can modify

3. Public Read

group.addMember("everyone", "reader");
// Anyone can read

Advanced Inbox Patterns

1. Batch Processing

const batchProcessor = new Map<string, BuyItemRequest[]>();

inbox.subscribe(InboxMessage, async (message, senderID) => {
  if (message.type === "buy") {
    // Accumulate requests
    const batch = batchProcessor.get(senderID) || [];
    batch.push(message);
    batchProcessor.set(senderID, batch);
    
    // Process when batch is full or on timeout
    if (batch.length >= 10) {
      await processBatch(senderID, batch);
      batchProcessor.delete(senderID);
    }
  }
});

2. External Service Integration

case "buy": {
  // Call external API
  const paymentResult = await stripeAPI.charge({
    amount: message.amount,
    currency: "usd",
  });
  
  if (paymentResult.success) {
    balance.balance -= message.amount;
    balance.paymentId = paymentResult.id;
  } else {
    throw new Error("Payment failed: " + paymentResult.error);
  }
}

3. Rate Limiting with CoMaps

export const RateLimitMap = co.map({
  requests: z.record(z.number()), // userId -> timestamp
});

const rateLimits = RateLimitMap.create({}, { owner: worker });

inbox.subscribe(InboxMessage, async (message, senderID) => {
  const now = Date.now();
  const userRequests = Object.values(rateLimits.requests)
    .filter(time => now - time < 60000) // Last minute
    .length;
    
  if (userRequests >= 10) {
    throw new Error("Rate limit exceeded");
  }
  
  rateLimits.requests[senderID] = now;
  // ... process message
});

Testing Strategies

// Integration test
const testWorker = await startWorker({ ... });
const testAccount = await Account.create({ ... });

// Subscribe to balance before making changes
let balanceUpdates = 0;
UserBalance.subscribe(testAccount.root?.balanceId, () => {
  balanceUpdates++;
});

// Send purchase request
const sender = await InboxSender.load(testWorker.id, testAccount);
await sender.sendMessage(BuyItemRequest.create({
  type: "buy",
  itemId: "test-item",
  amount: 10,
}));

// Verify real-time update
expect(balanceUpdates).toBeGreaterThan(0);

Performance & Architecture

  • Optimistic UI: Update UI immediately, revert on error
  • Offline support: Inbox messages queue automatically
  • Horizontal scaling: Multiple workers can share inbox processing
  • Event sourcing: Purchase records create an immutable audit log

Based on the Jazz documentation, workers are full Jazz accounts that participate in the permission system, making them ideal for controlled server-side operations while maintaining Jazz's real-time collaborative nature.

@artalar
Copy link
Author

artalar commented Jul 9, 2025

import { action, withAsync } from "@reatom/core"
import {
	Account,
	type CoMapSchema,
	type CoValue,
	co,
	Group,
	type Inbox,
	InboxSender,
} from "jazz-tools"
import { type ZodType, z } from "zod/v4"
import type { $ZodObject } from "zod/v4/core"

type Rec = Record<string, any>

type OverloadProperties =
	| "create"
	| "load"
	| "subscribe"
	| "upsertUnique"
	| "loadUnique"
	| "catchall"
	| "withMigration"
	| keyof $ZodObject
	| keyof ZodType

type CoMapBaseSchema = Omit<CoMapSchema<any>, OverloadProperties>

const WorkerAccountID = z.string("Worker account ID is required")

const handlers: {
	method: string
	message: CoMapSchema<{
		method: string
		params: Rec
	}>
	result: CoMapSchema<Rec>
	handler: (options: {
		worker: Account
		params: Rec
		senderAccountID: string
	}) => Promise<Rec | CoMapSchema<Rec>>
}[] = []

const createdMethods = new Set<string>()

/**
 * Defines a type-safe Jazz RPC method with client and server implementations
 * 
 * @param method - Unique identifier for the RPC method (e.g., "email-auth.sendCode")
 * @param Params - CoMap schema defining the parameters structure
 * @param Result - CoMap schema defining the result structure (optional, defaults to empty map)
 * 
 * @returns Object containing `client` and `serverHandler` functions
 * 
 * @example
 * ```typescript
 * // Define RPC method
 * export const TopUpBalanceRpc = defineJazzRPC(
 *   "payment.topUpBalance",
 *   co.map({
 *     amount: z.number().positive(),
 *     currency: z.enum(["USD", "EUR"]),
 *   }),
 *   co.map({
 *     success: z.boolean(),
 *     newBalance: z.number(),
 *   })
 * )
 * 
 * // Client usage
 * const topUpBalance = await TopUpBalanceRpc.client(account)
 * const result = await topUpBalance({ amount: 100, currency: "USD" })
 * 
 * // Server handler
 * TopUpBalanceRpc.serverHandler(async ({ worker, params, senderAccountID }) => {
 *   const userAccount = await Account.load(senderAccountID, { loadAs: worker })
 *   const payment = await processPayment(params.amount, params.currency)
 *   
 *   return {
 *     success: payment.success,
 *     newBalance: userAccount.balance + params.amount,
 *   }
 * })
 * ```
 */
export function defineJazzRPC<
	ParamsSchema extends CoMapBaseSchema,
	ResultSchema extends CoMapBaseSchema = CoMapSchema<{}>,
>(
	method: string,
	Params: ParamsSchema,
	Result: ResultSchema = co.map({}) as unknown as ResultSchema,
) {
	if (createdMethods.has(method)) {
		throw new Error(
			`RPC method "${method}" has already been created. Each method must have a unique name.`,
		)
	}
	createdMethods.add(method)

	const MessageSchema = co.map({
		method: z.literal(method),
		params: Params,
	})

	let isServerSetup = false

	// @ts-expect-error generics
	type ParamsLoaded = co.loaded<ParamsSchema>
	// @ts-expect-error generics
	type ParamsJSON = Parameters<ParamsSchema["create"]>[0]

	// @ts-expect-error generics
	type ResultLoaded = co.loaded<ResultSchema>
	// @ts-expect-error generics
	type ResultJSON = Parameters<ResultSchema["create"]>[0]

	return {
		/**
		 * Create a client for making RPC calls
		 */
		client: async (account: Account) => {
			const workerAccountID = WorkerAccountID.parse(
				import.meta.env.VITE_JAZZ_RPC_ACCOUNT,
			)
			const sender = await InboxSender.load(workerAccountID, account)

			return action(async (params: ParamsJSON): Promise<ResultLoaded> => {
				const group = Group.create(account)
				const paramsLoaded = (Params as unknown as CoMapSchema<any>).create(
					params as any,
					group,
				)
				const message = MessageSchema.create(
					// @ts-ignore generics
					{ method, params: paramsLoaded },
					group,
				)

				const resultId = await sender.sendMessage(message)

				if (!resultId) {
					throw new Error(`[${method}] Result ID is null`)
				}

				const result = await Result
					// @ts-expect-error generic typecast
					.load(resultId, { loadAs: account, resolve: { $each: true } })

				if (!result) {
					throw new Error(`[${method}] Empty result`)
				}

				return result
			}, method).extend(withAsync())
		},

		/**
		 * Create a server handler for this RPC method
		 */
		serverHandler(
			handler: (options: {
				worker: Account
				params: ParamsJSON
				senderAccountID: string
			}) => Promise<
				ResultLoaded | ResultJSON | ({} extends ResultJSON ? void : never)
			>,
		) {
			if (isServerSetup) {
				throw new Error("Server handler already setup")
			}
			isServerSetup = true

			handlers.push({
				method,
				// @ts-expect-error generic typecast
				message: MessageSchema,
				// @ts-expect-error generic typecast
				result: Result,
				// @ts-expect-error generic typecast
				handler: action(handler, method).extend(withAsync()),
			})
		},
	}
}

const MessagesBase = z.discriminatedUnion("method", [
	co.map({
		method: z.string(),
		params: z.any(),
	}),
])

/**
 * Creates a Jazz RPC server that listens for incoming messages and routes them to registered handlers
 *
 * @param inbox - The Jazz inbox to listen on for incoming RPC messages
 *
 * @example
 * ```typescript
 * // Complete setup in worker (server/index.ts)
 * import "./auth.worker" // Import handlers first
 * import { startWorker } from "jazz-tools/worker"
 * import { createJazzRPCServer } from "../shared/lib/jazz"
 * import z from "zod/v4"
 * 
 * // Wait for Jazz sync server to be ready
 * await new Promise((r) => setTimeout(r, 1000))
 * 
 * // Initialize worker with environment variables
 * const {
 *   worker,
 *   experimental: { inbox },
 * } = await startWorker({
 *   syncServer: z.string().parse(import.meta.env.JAZZ_SYNC_URL),
 *   accountID: z.string().parse(import.meta.env.JAZZ_RPC_ACCOUNT),
 *   accountSecret: z.string().parse(import.meta.env.JAZZ_RPC_SECRET),
 * })
 * 
 * // Start RPC server - automatically registers all defined handlers
 * createJazzRPCServer(worker, inbox.subscribe)
 * ```
 */
export const createJazzRPCServer = action(
	(worker: Account, subscribeInbox: Inbox["subscribe"]) => {
		const methods = handlers.map(({ method }) => method)

		const Messages = z.discriminatedUnion(
			"method",
			// @ts-ignore generics
			handlers.map(({ message }) => message),
		) as typeof MessagesBase

		subscribeInbox(
			Messages,
			async (message, senderAccountID): Promise<CoValue> => {
				const handler = handlers.find(({ method }) => method === message.method)

				if (!handler) {
					throw new Error(`Handler for method ${message.method} not found`)
				}

				const messageLoaded = await handler.message.load(message.id, {
					loadAs: worker,
					// @ts-expect-error generics
					resolve: { $each: true },
				})

				if (!messageLoaded) {
					throw new Error(`Message ${message.id} not found`)
				}

				const result =
					(await handler.handler({
						worker,
						params: messageLoaded.params,
						senderAccountID,
					})) ?? {}

				if ("_zod" in result) {
					return result as CoValue
				}

				const group = Group.create({ owner: worker })
				const senderAccount = await Account.load(senderAccountID, {
					loadAs: worker,
				})
				if (senderAccount) {
					group.addMember(senderAccount, "reader")
				}

				return handler.result.create(result, {
					owner: group,
				})
			},
			{ retries: 0 },
		)

		return methods
	},
	"createJazzRPCServer",
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment