Skip to content

Instantly share code, notes, and snippets.

@tim-smart
Last active December 11, 2024 01:06
Show Gist options
  • Save tim-smart/bc32caa3d0a817c7418d58421c2a7d5d to your computer and use it in GitHub Desktop.
Save tim-smart/bc32caa3d0a817c7418d58421c2a7d5d to your computer and use it in GitHub Desktop.
import {
Data,
Effect,
Layer,
pipe,
PubSub,
Schedule,
Scope,
Stream,
} from "effect"
import { Socket } from "@effect/platform"
import type {
At,
ComAtprotoSyncSubscribeRepos,
Records,
} from "@atcute/client/lexicons"
import "@atcute/bluesky/lexicons"
import { NodeRuntime, NodeSocket } from "@effect/platform-node"
import { Jetstream as SWJetstream } from "@skyware/jetstream"
export class JetstreamError extends Data.TaggedError("JetstreamError")<{
message: string
cause: unknown
}> {}
class Jetstream extends Effect.Service<Jetstream>()("Jetstream", {
effect: Effect.gen(function* () {
// access the WebSocket constructor for the current platform
const ws = yield* Socket.WebSocketConstructor
const subscribe = <
WantedCollections extends CollectionOrWildcard = CollectionOrWildcard,
>(options: {
readonly endpoint?: string
readonly wantedCollections?: Array<WantedCollections>
readonly startCursor?: number
}) =>
Effect.gen(function* () {
const scope = yield* Effect.scope
// create a URL for the Jetstream endpoint
const url = new URL(
options.endpoint ?? "wss://jetstream1.us-east.bsky.network/subscribe",
)
options.wantedCollections?.forEach((collection) => {
url.searchParams.append("wantedCollections", collection)
})
// create a mailbox to receive events
const pubsub = yield* PubSub.unbounded<
| CommitEvent<ResolveLexiconWildcard<WantedCollections>>
| AccountEvent
| IdentityEvent
>()
yield* Scope.addFinalizer(scope, pubsub.shutdown)
// track the cursor for the last event received
let cursor = options.startCursor ?? 0
// create a WebSocket connection to the Jetstream endpoint
// It effectfully constructs the URL based on the current cursor
const socket = yield* Socket.makeWebSocket(
Effect.sync(() => {
if (cursor > 0) {
url.searchParams.set("cursor", cursor.toString())
}
return url.toString()
}),
).pipe(Effect.provideService(Socket.WebSocketConstructor, ws))
yield* pipe(
socket.runRaw((message) => {
try {
const event = JSON.parse(message as string) as
| CommitEvent<ResolveLexiconWildcard<WantedCollections>>
| AccountEvent
| IdentityEvent
if (event.time_us < cursor) return
cursor = event.time_us
switch (event.kind) {
case EventType.Commit: {
if (
!event.commit?.collection ||
!event.commit.rkey ||
!event.commit.rev
) {
return
}
if (
event.commit.operation === CommitType.Create &&
!event.commit.record
) {
return
}
pubsub.unsafeOffer(event)
return
}
case EventType.Account: {
if (!event.account?.did) return
pubsub.unsafeOffer(event)
return
}
case EventType.Identity: {
if (!event.identity?.did) return
pubsub.unsafeOffer(event)
return
}
}
} catch (error) {
return new JetstreamError({
message: "Failed to parse event",
cause: error,
})
}
}),
Effect.tapErrorCause(Effect.logWarning),
Effect.retry(
Schedule.exponential("1 second").pipe(
Schedule.union(Schedule.spaced("10 second")),
),
),
Effect.forkIn(scope),
)
return pubsub
})
const stream = <
WantedCollections extends CollectionOrWildcard = CollectionOrWildcard,
>(options: {
readonly endpoint?: string
readonly wantedCollections?: Array<WantedCollections>
readonly startCursor?: number
}) =>
subscribe(options).pipe(
Effect.map((pubsub) => Stream.fromPubSub(pubsub)),
Stream.unwrapScoped,
)
return { subscribe, stream } as const
}),
}) {}
// ----------------------------------------------------------------------------
Effect.gen(function* () {
const jetstream = yield* Jetstream
yield* jetstream
.stream({ wantedCollections: ["app.bsky.feed.post"] })
.pipe(Stream.runForEach(Effect.log))
}).pipe(
Effect.provide(
Jetstream.Default.pipe(Layer.provide(NodeSocket.layerWebSocketConstructor)),
),
NodeRuntime.runMain,
)
// ----------------------------------------------------------------------------
/** Resolves a lexicon name to its record operation. */
export type ResolveLexicon<T extends string> = T extends keyof Records
? Records[T]
: { $type: T }
/** Checks if any member of a union is assignable to a given operation. */
type UnionMemberIsAssignableTo<Union, AssignableTo> =
// Distribute over union members
Union extends Union
? // `Union` here refers to a given union member
Union extends AssignableTo
? true
: never
: never
/** Resolves a wildcard string to the record types it matches. */
export type ResolveLexiconWildcard<T extends string> =
// Match the prefix
T extends `${infer Prefix}*`
? // Check that at least one collection name matches the prefix (we use `true extends` because `never` extends everything)
true extends UnionMemberIsAssignableTo<
keyof Records,
`${Prefix}${string}`
>
? // If so, return known matching collection names
keyof Records & `${Prefix}${string}` extends infer Lexicon extends
string
? Lexicon
: never
: // If no collection name matches the prefix, return as a operation-level wildcard string
`${Prefix}${string}`
: // If there's no wildcard, return the original string
T
/** The name of a collection. */
export type Collection = keyof Records | (string & {})
/** Generates all possible wildcard strings that match a given collection name. */
type PossibleCollectionWildcards<CollectionName extends string> =
CollectionName extends `${infer Prefix}.${infer Suffix}`
? `${Prefix}.*` | `${Prefix}.${PossibleCollectionWildcards<Suffix>}`
: never
/** The name of a collection or a wildcard string matching multiple collections. */
export type CollectionOrWildcard =
| PossibleCollectionWildcards<keyof Records>
| Collection
/**
* The types of events that are emitted by {@link Jetstream}.
* @enum
*/
export const EventType = {
/** A new commit. */
Commit: "commit",
/** An account's status was updated. */
Account: "account",
/** An account's identity was updated. */
Identity: "identity",
} as const
export type EventType = (typeof EventType)[keyof typeof EventType]
/**
* The types of commits that can be received.
* @enum
*/
export const CommitType = {
/** A record was created. */
Create: "create",
/** A record was updated. */
Update: "update",
/** A record was deleted. */
Delete: "delete",
} as const
export type CommitType = (typeof CommitType)[keyof typeof CommitType]
/**
* The base operation for events emitted by the {@link Jetstream} class.
*/
export interface EventBase {
did: At.DID
time_us: number
kind: EventType
}
/**
* A commit event. Represents a commit to a user repository.
*/
export interface CommitEvent<RecordType extends string> extends EventBase {
kind: typeof EventType.Commit
commit: Commit<RecordType>
}
/** A commit event where a record was created. */
export interface CommitCreateEvent<RecordType extends string>
extends CommitEvent<RecordType> {
commit: CommitCreate<RecordType>
}
/** A commit event where a record was updated. */
export interface CommitUpdateEvent<RecordType extends string>
extends CommitEvent<RecordType> {
commit: CommitUpdate<RecordType>
}
/** A commit event where a record was deleted. */
export interface CommitDeleteEvent<RecordType extends string>
extends CommitEvent<RecordType> {
commit: CommitDelete<RecordType>
}
/**
* An account event. Represents a change to an account's status on a host (e.g. PDS or Relay).
*/
export interface AccountEvent extends EventBase {
kind: typeof EventType.Account
account: ComAtprotoSyncSubscribeRepos.Account
}
/**
* An identity event. Represents a change to an account's identity.
*/
export interface IdentityEvent extends EventBase {
kind: typeof EventType.Identity
identity: ComAtprotoSyncSubscribeRepos.Identity
}
/**
* The base operation for commit events.
*/
export interface CommitBase<RecordType extends string> {
operation: CommitType
rev: string
collection: RecordType
rkey: string
}
/**
* A commit event representing a new record.
*/
export interface CommitCreate<RecordType extends string>
extends CommitBase<RecordType> {
operation: typeof CommitType.Create
record: ResolveLexicon<RecordType>
cid: At.CID
}
/**
* A commit event representing an update to an existing record.
*/
export interface CommitUpdate<RecordType extends string>
extends CommitBase<RecordType> {
operation: typeof CommitType.Update
record: ResolveLexicon<RecordType>
cid: At.CID
}
/**
* A commit event representing a deletion of an existing record.
*/
export interface CommitDelete<RecordType extends string>
extends CommitBase<RecordType> {
operation: typeof CommitType.Delete
}
/**
* A commit event.
*/
export type Commit<RecordType extends string> =
| CommitCreate<RecordType>
| CommitUpdate<RecordType>
| CommitDelete<RecordType>
Stream.asyncPush<any, Error>((emit) =>
Effect.gen(function* () {
const jetstream = yield* Effect.acquireRelease(
Effect.sync(
() =>
new SWJetstream({
wantedCollections: ["app.bsky.feed.post"],
}),
),
(js) => Effect.sync(() => js.close()),
)
jetstream.start()
jetstream.onCreate("app.bsky.feed.post", (record) => {
emit.single(record)
})
jetstream.on("error", (error) => {
emit.fail(error)
})
}),
).pipe(Stream.retry(Schedule.exponential("1 second")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment