Added a typed EventEmitter to the deck generation pipeline so we can tap into raw card HTML before post-processing (layout formatting, image injection, etc.).
Generic, typesafe EventEmitter<EventPayloads> class. Supports:
- Typed
on/off/emitkeyed by an event map pipe()to forward events to another emittersetup/teardownlifecycle hooks (called when first listener registers / last unregisters)
Standalone utility, no dependencies.
ProcessSectionOptionsnow accepts an optionalemitter?: EventEmitter<{ unprocessedCardHtml: string }>- Inside
applySectionFormattingForGenerateStream, we emitunprocessedCardHtmlwith the raw section chunk beforeapplySectionFormattingForGenerateruns on it
GenerateDeckWithLayoutvars now require anemitterand pass it through to the stream processor
- Creates the emitter at the call site (generate card content flow)
- Subscribes to
unprocessedCardHtmlwith a placeholder// put in redishandler
This gives us a hook to capture raw LLM output per card before any post-processing transforms it. The intended use is caching unprocessed HTML in Redis so it can be accessed independently.
The // put in redis handler is a stub. Next step is wiring up the actual Redis write + whatever consumes it downstream.
type Handler<A> = (val: A) => void
type UnsubscribeFn = () => void
export class EventEmitter<
EventPayloads,
Keys extends keyof EventPayloads = keyof EventPayloads,
> {
protected listeners: {
[s in Keys]: Handler<EventPayloads[Keys]>[]
} = {} as any
protected piped: EventEmitter<EventPayloads, Keys>[] = []
/**
* This method will be called the first time a listener is registered for
* a given event of this emitter.
* More specifically, when the count of listeners goes from 0 to more
*/
protected setup(_type: Keys) {}
/**
* This method will be called after the last listener is unregistered for a
* particular event.
*/
protected teardown(_type: Keys) {}
on<K extends Keys>(
key: K,
handler: Handler<EventPayloads[K]>,
): UnsubscribeFn {
this.listeners[key] = this.listeners[key] || []
this.listeners[key].push(handler)
if (this.listeners[key].length === 1) {
this.setup(key)
}
return () => {
this.off(key, handler)
}
}
off<K extends Keys>(key: K): void
off<K extends Keys>(key: K, handler: Handler<EventPayloads[K]>): void
off<K extends Keys>(key: K, handler?: Handler<EventPayloads[K]>) {
if (typeof handler === 'function') {
const ind = this.listeners[key].indexOf(handler)
if (ind > -1) {
this.listeners[key].splice(ind, 1)
if (this.listeners[key].length === 0) {
this.teardown(key)
}
}
} else {
if (this.listeners[key].length > 0) {
this.teardown(key)
}
this.listeners[key] = []
}
}
emit<L extends Keys>(key: L, payload: EventPayloads[L]) {
const fns = this.listeners[key] || []
fns.forEach((fn) => fn(payload))
this.piped.forEach((emitter) => {
emitter.emit(key, payload)
})
}
pipe(emitter: EventEmitter<EventPayloads, Keys>): UnsubscribeFn {
this.piped.push(emitter)
return () => {
const ind = this.piped.indexOf(emitter)
if (ind > -1) {
this.piped.splice(ind, 1)
}
}
}
}diff --git a/packages/server/src/ai/jsx/prompts/GenerateDeck/generate-deck-with-layout.prompt.tsx b/packages/server/src/ai/jsx/prompts/GenerateDeck/generate-deck-with-layout.prompt.tsx
index dd9dbce9ef..488962207c 100644
--- a/packages/server/src/ai/jsx/prompts/GenerateDeck/generate-deck-with-layout.prompt.tsx
+++ b/packages/server/src/ai/jsx/prompts/GenerateDeck/generate-deck-with-layout.prompt.tsx
@@ -1,5 +1,6 @@
import { createPrompt, streamPrompt } from '@gammatech/aijsx'
+import { EventEmitter } from '../../../../common/event-emitter'
import { evaluateCardLayouts } from './evaluators/card-layouts.evaluator'
import { GenerateDeckContinuousPrompt } from './generate-deck-continuous.prompt'
import { generateDeckEvaluator } from './generate-deck.evaluator'
@@ -11,7 +12,11 @@ import {
import { setupWithLayoutGeneration } from './utils/generationHelpers'
import { applySectionFormattingForGenerateStream } from './utils/streamSectionProcessor'
-async function* GenerateDeckWithLayout(vars: GenerateDeckWithLayoutVariables) {
+async function* GenerateDeckWithLayout(
+ vars: GenerateDeckWithLayoutVariables & {
+ emitter: EventEmitter<{ unprocessedCardHtml: string }>
+ },
+) {
const { styleReferenceKey, infographicStyleReferenceKey } =
await setupWithLayoutGeneration(vars)
@@ -23,6 +28,7 @@ async function* GenerateDeckWithLayout(vars: GenerateDeckWithLayoutVariables) {
theme: vars.theme,
blockFlags: vars.blockFlags,
textAmount: vars.textAmount,
+ emitter: vars.emitter,
imageParams: {
model: vars.imageOptions?.model,
stylePrompt: vars.imageOptions?.stylePrompt,
diff --git a/packages/server/src/ai/jsx/prompts/GenerateDeck/utils/streamSectionProcessor.ts b/packages/server/src/ai/jsx/prompts/GenerateDeck/utils/streamSectionProcessor.ts
index 19bb167a30..f6c5bfcd04 100644
--- a/packages/server/src/ai/jsx/prompts/GenerateDeck/utils/streamSectionProcessor.ts
+++ b/packages/server/src/ai/jsx/prompts/GenerateDeck/utils/streamSectionProcessor.ts
@@ -1,6 +1,7 @@
import { CardDimensionsKey } from '@gammatech/lib/dist/cards'
import { DocFormatKey } from '@gammatech/lib/dist/prosemirror-helpers'
+import { EventEmitter } from '../../../../../common/event-emitter'
import { ImageParams } from '../postProcessing/image'
import { applySectionFormattingForGenerate } from '../postProcessing/postProcessSection'
import { BlockFlags, TextAmount, Theme } from '../schema'
@@ -12,6 +13,7 @@ type ProcessSectionOptions = {
imageParams: ImageParams
blockFlags?: BlockFlags
textAmount?: TextAmount
+ emitter?: EventEmitter<{ unprocessedCardHtml: string }>
}
export type SectionMatch = {
id: string
@@ -38,6 +40,7 @@ export async function* applySectionFormattingForGenerateStream(
// Otherwise, it's a section - apply formatting
try {
+ options.emitter?.emit('unprocessedCardHtml', chunk)
const sectionHtmlWithLayouts = applySectionFormattingForGenerate({
sectionHtml: chunk,
cardIndex,
diff --git a/packages/server/src/public-api/public-api.service.ts b/packages/server/src/public-api/public-api.service.ts
index 413ade971d..9daa78c2bd 100644
--- a/packages/server/src/public-api/public-api.service.ts
+++ b/packages/server/src/public-api/public-api.service.ts
@@ -43,6 +43,7 @@ import { GenerateDeckFromRemixVariables } from '../ai/jsx/prompts/RemixDeck/sche
import { ImageUploadService } from '../ai/upload/image.service'
import { CardsService } from '../cards/cards.service'
import { asyncPipe } from '../common/asyncPipe'
+import { EventEmitter } from '../common/event-emitter'
import { CreditOperation } from '../credits/types'
import { incrementWithContainerId } from '../datadog-metrics'
import { DocsService } from '../docs/docs.service'
@@ -2118,6 +2119,14 @@ export class PublicAPIService {
styleImageService: this.styleImageService,
availableProductFeatures,
})
+
+ class Emitter extends EventEmitter<{ unprocessedCardHtml: string }> {}
+ const emitter = new Emitter()
+
+ emitter.on('unprocessedCardHtml', (html) => {
+ // put in redis
+ })
+
;[cardContent, processedCardMarginSettings] = await Promise.all([
// Process card content stream
(async () => {
@@ -2128,6 +2137,7 @@ export class PublicAPIService {
workspaceId,
interactionId,
variables: {
+ emitter,
input: options.inputText,
docId, // For tracing
numCards: options.numCards,