Created
May 24, 2023 08:05
-
-
Save danopia/b66023ac488c4913c3747e37669c7f55 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as ows from "https://deno.land/x/[email protected]/mod.ts"; | |
import { trace, SpanKind, propagation, context, TextMapGetter, ROOT_CONTEXT, SpanContext } from "https://deno.land/x/[email protected]/opentelemetry/api.js"; | |
type KvRealtimeEvent = | |
| { | |
type: 'insert' | 'replace'; | |
appliedAt: Date; | |
key: Deno.KvKey; | |
value: unknown; | |
versionstamp: string; | |
} | |
| { | |
type: 'delete'; | |
appliedAt: Date; | |
key: Deno.KvKey; | |
versionstamp: string; | |
} | |
; | |
const tracer = trace.getTracer('kv-realtime'); | |
export class KvRealtimeContext { | |
constructor( | |
private readonly kv: Deno.Kv, | |
private readonly broadcastChannel?: BroadcastChannel | |
) { | |
broadcastChannel?.addEventListener('message', (evt) => { | |
const payload: { event: KvRealtimeEvent; baggage: Record<string,string> } = evt.data; | |
const ctx = propagation.extract(ROOT_CONTEXT, payload.baggage, BaggageGetter); | |
context.with(ctx, () => this.processEvent(payload.event)); | |
}); | |
} | |
private async generateEvent(event: KvRealtimeEvent) { | |
await tracer.startActiveSpan(`KvRealtime:${event.type}`, { | |
kind: SpanKind.PRODUCER, | |
attributes: { | |
'kv.event': event.type, | |
'kv.key': event.key as string[], | |
'kv.versionstamp': event.versionstamp, | |
}, | |
}, async span => { | |
const baggage: Record<string,string> = {}; | |
propagation.inject(context.active(), baggage, { | |
set: (h, k, v) => h[k] = typeof v === 'string' ? v : String(v), | |
}); | |
this.broadcastChannel?.postMessage({ | |
event, | |
baggage, | |
}); | |
await this.processEvent(event) | |
.catch(err => { | |
span.recordException(err); | |
return Promise.reject(err); | |
}) | |
.finally(() => span.end()); | |
}); | |
} | |
private async processEvent(event: KvRealtimeEvent) { | |
console.log('running kv event:', event.type, event.key.join('/')); | |
for (const observer of this.observers) { | |
if (event.key.length < observer.prefix.length) continue; | |
if (observer.prefix.every((part, idx) => event.key[idx] == part)) { | |
tracer.startActiveSpan(`KvRealtime:${event.type}`, { | |
kind: SpanKind.CONSUMER, | |
attributes: { | |
'kv.event': event.type, | |
'kv.key': event.key as string[], | |
'kv.versionstamp': event.versionstamp, | |
}, | |
links: observer.spanCtx ? [{ | |
context: observer.spanCtx, | |
}] : [], | |
}, span => { | |
observer.next(event); | |
span.end(); | |
}); | |
} | |
} | |
} | |
async getKey(key: Deno.KvKey) { | |
const result = await this.kv.get(key); | |
return result; | |
} | |
async collectList(opts: { prefix: Deno.KvKey }) { | |
const entities = new Array<Deno.KvEntry<unknown>>(); | |
for await (const entry of this.kv.list(opts)) { | |
entities.push(entry); | |
} | |
return entities; | |
} | |
observers = new Set<{ | |
prefix: Deno.KvKey, | |
next: ows.NextFunc<KvRealtimeEvent>, | |
spanCtx?: SpanContext, | |
}>(); | |
observePrefix(prefix: Deno.KvKey, abortSignal: AbortSignal) { | |
return ows | |
.concat<KvRealtimeEvent | {type: 'ready'}>( | |
ows | |
.fromIterable(this.kv.list({ prefix })) | |
.pipeThrough(ows.map(entry => ({ | |
type: 'insert', | |
appliedAt: new Date(), | |
key: entry.key, | |
value: entry.value, | |
versionstamp: entry.versionstamp, | |
}))), | |
ows | |
.just({type: 'ready'}), | |
ows | |
.fromNext<KvRealtimeEvent>(next => { | |
const observer = { | |
prefix, | |
next, | |
spanCtx: trace.getSpanContext(context.active()), | |
}; | |
abortSignal.throwIfAborted(); | |
this.observers.add(observer); | |
abortSignal.addEventListener('abort', () => { | |
this.observers.delete(observer); | |
next(ows.EOF); | |
}); | |
}) | |
// .pipeThrough(ows.filter<KvRealtimeEvent>(evt => { | |
// if (evt.key.length < prefix.length) return false; | |
// return prefix.every((part, idx) => evt.key[idx] == part); | |
// })), | |
); | |
} | |
async createKey(key: Deno.KvKey, value: unknown) { | |
const result = await this.kv.atomic() | |
.check({ key, versionstamp: null }) | |
.set(key, value) | |
.commit(); | |
if (result.ok) { | |
this.generateEvent({ | |
type: 'insert', | |
appliedAt: new Date(), | |
key, value, | |
versionstamp: result.versionstamp, | |
}); | |
} | |
return result; | |
} | |
async replaceKey(key: Deno.KvKey, versionstamp: string, value: unknown) { | |
const result = await this.kv.atomic() | |
.check({ key, versionstamp }) | |
.set(key, value) | |
.commit(); | |
if (result.ok) { | |
this.generateEvent({ | |
type: 'replace', | |
appliedAt: new Date(), | |
key, value, | |
versionstamp: result.versionstamp, | |
}); | |
} | |
return result; | |
} | |
async deleteKey(key: Deno.KvKey, versionstamp: string) { | |
const result = await this.kv.atomic() | |
.check({ key, versionstamp }) | |
.delete(key) | |
.commit(); | |
if (result.ok) { | |
this.generateEvent({ | |
type: 'delete', | |
appliedAt: new Date(), | |
key, | |
versionstamp: result.versionstamp, | |
}); | |
} | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment