Last active
May 31, 2025 22:59
-
-
Save tkersey/be54619f14dd60ede4439c3465396cf4 to your computer and use it in GitHub Desktop.
Agents with Algebraic Effects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Operation, | |
Resource, | |
Context, | |
action, | |
resource, | |
spawn, | |
sleep, | |
main, | |
suspend as effectionSuspend, | |
useScope, | |
each, | |
createChannel | |
} from 'effection'; | |
import OpenAI from 'openai'; | |
import { parseExpression } from 'cron-parser'; | |
import { EventEmitter } from 'events'; | |
// Types for our agent system | |
interface AgentState { | |
id: string; | |
status: 'running' | 'suspended' | 'completed' | 'failed'; | |
lastRun?: Date; | |
nextRun?: Date; | |
continuationStack: Array<() => Operation<any>>; | |
context: Map<string, any>; | |
metaLevel: number; | |
} | |
interface CronSchedule { | |
expression: string; | |
timezone?: string; | |
maxRuns?: number; | |
currentRuns: number; | |
} | |
interface Tool { | |
name: string; | |
description: string; | |
execute: (params: any) => Operation<any>; | |
} | |
// Context for managing agent continuations | |
const AgentContext = Context.create<{ | |
state: AgentState; | |
openai: OpenAI; | |
tools: Map<string, Tool>; | |
eventBus: EventEmitter; | |
}>("AgentContext"); | |
// Pattern 1: Prompts as Initial Continuations | |
class PromptContinuation { | |
private prompts: Map<string, string> = new Map(); | |
private continuationHandlers: Map<string, (result: any) => Operation<any>> = new Map(); | |
constructor(private openai: OpenAI) {} | |
*registerPrompt(name: string, prompt: string): Operation<void> { | |
this.prompts.set(name, prompt); | |
} | |
*executePrompt( | |
name: string, | |
variables: Record<string, any> = {} | |
): Operation<string> { | |
const prompt = this.prompts.get(name); | |
if (!prompt) throw new Error(`Prompt ${name} not found`); | |
// Interpolate variables | |
const interpolated = prompt.replace(/\{\{(\w+)\}\}/g, (_, key) => variables[key] || ''); | |
// Create continuation boundary | |
return yield* action<string>(function* (resolve) { | |
const completion = yield* callOpenAI(this.openai, interpolated); | |
// Check for continuation markers | |
if (completion.includes("[CONTINUE]")) { | |
const handler = this.continuationHandlers.get(name); | |
if (handler) { | |
const continued = yield* handler(completion); | |
resolve(continued); | |
} else { | |
resolve(completion); | |
} | |
} else { | |
resolve(completion); | |
} | |
}.bind(this)); | |
} | |
*chainPrompt( | |
fromName: string, | |
toName: string, | |
transformer?: (result: string) => string | |
): Operation<void> { | |
this.continuationHandlers.set(fromName, function* (result) { | |
const transformed = transformer ? transformer(result) : result; | |
return yield* this.executePrompt(toName, { previousResult: transformed }); | |
}.bind(this)); | |
} | |
} | |
// Pattern 2: Dynamic Prompt Weaving | |
class DynamicPromptWeaver { | |
private fragments: Map<string, string> = new Map(); | |
private conditions: Map<string, (context: any) => boolean> = new Map(); | |
private weavingRules: Array<{ | |
condition: (context: any) => boolean; | |
fragments: string[]; | |
}> = []; | |
*addFragment(key: string, fragment: string, condition?: (context: any) => boolean): Operation<void> { | |
this.fragments.set(key, fragment); | |
if (condition) { | |
this.conditions.set(key, condition); | |
} | |
} | |
*addWeavingRule( | |
condition: (context: any) => boolean, | |
fragmentKeys: string[] | |
): Operation<void> { | |
this.weavingRules.push({ condition, fragments: fragmentKeys }); | |
} | |
*weave(context: any): Operation<string> { | |
const activeFragments: string[] = []; | |
// Apply weaving rules | |
for (const rule of this.weavingRules) { | |
if (rule.condition(context)) { | |
for (const key of rule.fragments) { | |
const fragment = this.fragments.get(key); | |
if (fragment) { | |
activeFragments.push(fragment); | |
} | |
} | |
} | |
} | |
// Apply individual fragment conditions | |
for (const [key, fragment] of this.fragments) { | |
const condition = this.conditions.get(key); | |
if (!condition || condition(context)) { | |
if (!activeFragments.includes(fragment)) { | |
activeFragments.push(fragment); | |
} | |
} | |
} | |
return activeFragments.join('\n\n'); | |
} | |
} | |
// Pattern 3: Prompt Handlers as Continuation Marks | |
class PromptHandlerSystem { | |
private marks: Map<string, any> = new Map(); | |
private handlers: Map<string, (prompt: string, mark: any) => Operation<string>> = new Map(); | |
constructor(private openai: OpenAI) { | |
// Register default handlers | |
this.handlers.set('reasoning', function* (prompt, mark) { | |
return yield* callOpenAI(this.openai, `[REASONING MODE]\n${prompt}\nLet's think step by step:`); | |
}.bind(this)); | |
this.handlers.set('creative', function* (prompt, mark) { | |
return yield* callOpenAI(this.openai, `[CREATIVE MODE]\n${prompt}\nBe imaginative and explore unusual connections:`); | |
}.bind(this)); | |
this.handlers.set('tool_use', function* (prompt, mark) { | |
return yield* callOpenAI(this.openai, `[TOOL USE MODE]\n${prompt}\nIdentify and use appropriate tools:`); | |
}.bind(this)); | |
} | |
*markPrompt(promptId: string, mark: any): Operation<void> { | |
this.marks.set(promptId, mark); | |
} | |
*handleMarkedPrompt(promptId: string, prompt: string): Operation<string> { | |
const mark = this.marks.get(promptId); | |
if (mark && this.handlers.has(mark.type)) { | |
const handler = this.handlers.get(mark.type)!; | |
return yield* handler(prompt, mark); | |
} | |
return yield* callOpenAI(this.openai, prompt); | |
} | |
*registerHandler( | |
type: string, | |
handler: (prompt: string, mark: any) => Operation<string> | |
): Operation<void> { | |
this.handlers.set(type, handler); | |
} | |
} | |
// Pattern 4: Meta-prompting Through Continuations | |
class MetaPromptingSystem { | |
private metaStack: Array<{ | |
level: number; | |
prompt: string; | |
improvements: string[]; | |
}> = []; | |
constructor(private openai: OpenAI, private maxMetaLevel: number = 3) {} | |
*generateMetaPrompt( | |
base: string, | |
instructions: string | |
): Operation<string> { | |
if (this.metaStack.length >= this.maxMetaLevel) { | |
return base; | |
} | |
const metaPrompt = `${instructions}\n\nOriginal prompt: ${base}\n\nImproved version:`; | |
const improved = yield* callOpenAI(this.openai, metaPrompt); | |
this.metaStack.push({ | |
level: this.metaStack.length + 1, | |
prompt: base, | |
improvements: [improved] | |
}); | |
return improved; | |
} | |
*executeWithMeta(prompt: string): Operation<string> { | |
// Execute through meta-improvement chain | |
let currentPrompt = prompt; | |
for (let i = 0; i < Math.min(this.maxMetaLevel, 2); i++) { | |
currentPrompt = yield* this.generateMetaPrompt( | |
currentPrompt, | |
"Optimize this prompt for clarity, specificity, and effectiveness" | |
); | |
} | |
return yield* callOpenAI(this.openai, currentPrompt); | |
} | |
} | |
// Cron-like scheduling system | |
class CronScheduler { | |
private schedules: Map<string, CronSchedule> = new Map(); | |
private timers: Map<string, NodeJS.Timeout> = new Map(); | |
*schedule( | |
id: string, | |
cronExpression: string, | |
options: { timezone?: string; maxRuns?: number } = {} | |
): Operation<void> { | |
this.schedules.set(id, { | |
expression: cronExpression, | |
timezone: options.timezone, | |
maxRuns: options.maxRuns, | |
currentRuns: 0 | |
}); | |
} | |
*getNextRunTime(id: string): Operation<Date | null> { | |
const schedule = this.schedules.get(id); | |
if (!schedule) return null; | |
if (schedule.maxRuns && schedule.currentRuns >= schedule.maxRuns) { | |
return null; | |
} | |
try { | |
const interval = parseExpression(schedule.expression, { | |
tz: schedule.timezone | |
}); | |
return interval.next().toDate(); | |
} catch (error) { | |
console.error(`Invalid cron expression for ${id}:`, error); | |
return null; | |
} | |
} | |
*waitUntilNextRun(id: string): Operation<void> { | |
const nextRun = yield* this.getNextRunTime(id); | |
if (!nextRun) return; | |
const now = new Date(); | |
const delay = nextRun.getTime() - now.getTime(); | |
if (delay > 0) { | |
yield* sleep(delay); | |
} | |
const schedule = this.schedules.get(id); | |
if (schedule) { | |
schedule.currentRuns++; | |
} | |
} | |
} | |
// Main CronAgent implementation | |
function* CronAgent(config: { | |
id: string; | |
openaiApiKey: string; | |
cronExpression: string; | |
tools?: Tool[]; | |
maxRuns?: number; | |
}): Operation<Resource<{ | |
start: () => Operation<void>; | |
suspend: () => Operation<void>; | |
resume: () => Operation<void>; | |
getState: () => AgentState; | |
}>> { | |
return yield* resource(function* (provide) { | |
// Initialize OpenAI client | |
const openai = new OpenAI({ apiKey: config.openaiApiKey }); | |
// Initialize agent state | |
const state: AgentState = { | |
id: config.id, | |
status: 'suspended', | |
continuationStack: [], | |
context: new Map(), | |
metaLevel: 0 | |
}; | |
// Initialize subsystems | |
const promptCont = new PromptContinuation(openai); | |
const weaver = new DynamicPromptWeaver(); | |
const handlers = new PromptHandlerSystem(openai); | |
const metaSystem = new MetaPromptingSystem(openai); | |
const scheduler = new CronScheduler(); | |
const eventBus = new EventEmitter(); | |
// Set up cron schedule | |
yield* scheduler.schedule(config.id, config.cronExpression, { | |
maxRuns: config.maxRuns | |
}); | |
// Register tools | |
const tools = new Map<string, Tool>(); | |
if (config.tools) { | |
for (const tool of config.tools) { | |
tools.set(tool.name, tool); | |
} | |
} | |
// Set up prompts | |
yield* promptCont.registerPrompt('main', ` | |
You are an autonomous agent with the following capabilities: | |
- Execute scheduled tasks | |
- Use available tools | |
- Maintain context across runs | |
- Self-improve through meta-prompting | |
Current context: {{context}} | |
Available tools: {{tools}} | |
What should be done for this scheduled run? | |
`); | |
yield* promptCont.registerPrompt('tool_selection', ` | |
Based on the task: {{task}} | |
Available tools: {{toolList}} | |
Select the appropriate tool and parameters. Respond in JSON format: | |
{ | |
"tool": "tool_name", | |
"parameters": {} | |
} | |
`); | |
// Set up dynamic fragments | |
yield* weaver.addFragment('error_context', | |
'Previous run encountered an error. Be more careful.', | |
(ctx) => ctx.hasError | |
); | |
yield* weaver.addFragment('performance_optimization', | |
'Focus on performance and efficiency.', | |
(ctx) => ctx.runCount > 10 | |
); | |
// Main agent loop | |
const agentLoop = function* (): Operation<void> { | |
const scope = yield* useScope(); | |
while (state.status !== 'completed') { | |
try { | |
// Wait for next scheduled run | |
state.status = 'suspended'; | |
eventBus.emit('suspended', state); | |
yield* scheduler.waitUntilNextRun(config.id); | |
state.status = 'running'; | |
state.lastRun = new Date(); | |
eventBus.emit('running', state); | |
// Prepare context | |
const context = { | |
runCount: state.context.get('runCount') || 0, | |
lastResult: state.context.get('lastResult'), | |
hasError: state.context.get('hasError') || false | |
}; | |
// Weave dynamic prompt | |
const dynamicContext = yield* weaver.weave(context); | |
// Execute main prompt with continuation support | |
const mainPromptId = `main_${Date.now()}`; | |
yield* handlers.markPrompt(mainPromptId, { | |
type: 'reasoning', | |
context | |
}); | |
const task = yield* handlers.handleMarkedPrompt( | |
mainPromptId, | |
dynamicContext + '\n' + (yield* promptCont.executePrompt('main', { | |
context: JSON.stringify(context), | |
tools: Array.from(tools.keys()).join(', ') | |
})) | |
); | |
// Check for tool usage | |
if (task.includes('[USE_TOOL]') || task.includes('tool')) { | |
const toolSelection = yield* metaSystem.executeWithMeta( | |
yield* promptCont.executePrompt('tool_selection', { | |
task, | |
toolList: JSON.stringify(Array.from(tools.entries()).map(([name, tool]) => ({ | |
name, | |
description: tool.description | |
}))) | |
}) | |
); | |
try { | |
const { tool: toolName, parameters } = JSON.parse(toolSelection); | |
const tool = tools.get(toolName); | |
if (tool) { | |
const result = yield* tool.execute(parameters); | |
state.context.set('lastResult', result); | |
eventBus.emit('toolExecuted', { tool: toolName, result }); | |
} | |
} catch (error) { | |
console.error('Tool execution failed:', error); | |
state.context.set('hasError', true); | |
} | |
} | |
// Update context | |
state.context.set('runCount', context.runCount + 1); | |
state.context.set('hasError', false); | |
// Check for continuation markers | |
if (task.includes('[SUSPEND_UNTIL]')) { | |
const match = task.match(/\[SUSPEND_UNTIL:([^\]]+)\]/); | |
if (match) { | |
const resumeTime = new Date(match[1]); | |
const delay = resumeTime.getTime() - Date.now(); | |
if (delay > 0) { | |
yield* sleep(delay); | |
} | |
} | |
} | |
// Calculate next run | |
state.nextRun = yield* scheduler.getNextRunTime(config.id) || undefined; | |
} catch (error) { | |
state.status = 'failed'; | |
state.context.set('hasError', true); | |
eventBus.emit('error', { state, error }); | |
// Wait before retry | |
yield* sleep(60000); // 1 minute | |
} | |
} | |
}; | |
// Create control interface | |
const control = { | |
start: function* (): Operation<void> { | |
yield* spawn(agentLoop()); | |
}, | |
suspend: function* (): Operation<void> { | |
state.status = 'suspended'; | |
// Capture current continuation | |
yield* action(function* (resolve) { | |
state.continuationStack.push(() => agentLoop()); | |
resolve(); | |
}); | |
}, | |
resume: function* (): Operation<void> { | |
if (state.continuationStack.length > 0) { | |
const continuation = state.continuationStack.pop()!; | |
yield* spawn(continuation()); | |
} | |
}, | |
getState: () => state | |
}; | |
try { | |
yield* provide(control); | |
} finally { | |
// Cleanup | |
eventBus.removeAllListeners(); | |
} | |
}); | |
} | |
// Helper function for OpenAI calls | |
function* callOpenAI(client: OpenAI, prompt: string): Operation<string> { | |
return yield* action<string>(async (resolve, reject) => { | |
try { | |
const completion = await client.chat.completions.create({ | |
model: 'gpt-4-turbo-preview', | |
messages: [{ role: 'user', content: prompt }], | |
temperature: 0.7, | |
max_tokens: 1000 | |
}); | |
resolve(completion.choices[0]?.message?.content || ''); | |
} catch (error) { | |
reject(error); | |
} | |
}); | |
} | |
// Example usage | |
async function runExample() { | |
await main(function* () { | |
// Define some example tools | |
const tools: Tool[] = [ | |
{ | |
name: 'fetchData', | |
description: 'Fetches data from an API endpoint', | |
execute: function* (params: { url: string }) { | |
const response = yield* fetch(params.url); | |
return yield* response.json(); | |
} | |
}, | |
{ | |
name: 'processData', | |
description: 'Processes and transforms data', | |
execute: function* (params: { data: any, operation: string }) { | |
// Simulated data processing | |
yield* sleep(1000); | |
return { processed: true, operation: params.operation }; | |
} | |
}, | |
{ | |
name: 'saveResult', | |
description: 'Saves results to storage', | |
execute: function* (params: { key: string, value: any }) { | |
// Simulated save operation | |
console.log(`Saving ${params.key}:`, params.value); | |
return { saved: true, key: params.key }; | |
} | |
} | |
]; | |
// Create agent that runs every 5 minutes | |
const agent = yield* CronAgent({ | |
id: 'data-processor', | |
openaiApiKey: process.env.OPENAI_API_KEY!, | |
cronExpression: '*/5 * * * *', // Every 5 minutes | |
tools, | |
maxRuns: 10 // Stop after 10 runs | |
}); | |
// Start the agent | |
yield* agent.start(); | |
// Monitor agent events | |
const channel = yield* createChannel<{ event: string; data: any }>(); | |
yield* spawn(function* () { | |
yield* each(channel, function* ({ event, data }) { | |
console.log(`Agent event: ${event}`, data); | |
}); | |
}); | |
// Keep running until interrupted | |
yield* suspend(); | |
}); | |
} | |
// Export main components | |
export { | |
CronAgent, | |
PromptContinuation, | |
DynamicPromptWeaver, | |
PromptHandlerSystem, | |
MetaPromptingSystem, | |
CronScheduler, | |
type AgentState, | |
type Tool | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Operation, | |
Resource, | |
Context, | |
action, | |
resource, | |
spawn, | |
sleep, | |
main, | |
useScope, | |
each, | |
createChannel, | |
Channel, | |
Stream | |
} from 'effection'; | |
import { | |
CronAgent, | |
PromptContinuation, | |
DynamicPromptWeaver, | |
PromptHandlerSystem, | |
MetaPromptingSystem, | |
type Tool, | |
type AgentState | |
} from './cron-agent'; | |
import OpenAI from 'openai'; | |
import { | |
Resource as OTResource, | |
metrics, | |
trace, | |
context as otContext, | |
SpanStatusCode, | |
SpanKind, | |
ValueType, | |
Histogram, | |
Counter, | |
ObservableGauge | |
} from '@opentelemetry/api'; | |
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http'; | |
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'; | |
import { MeterProvider, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; | |
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; | |
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base'; | |
import { registerInstrumentations } from '@opentelemetry/instrumentation'; | |
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; | |
// Types for eval system | |
interface EvalDefinition { | |
id: string; | |
name: string; | |
description: string; | |
promptTemplate: string; | |
scoringCriteria: { | |
metric: string; | |
threshold: number; | |
comparison: 'gt' | 'lt' | 'eq' | 'gte' | 'lte'; | |
}[]; | |
metadata?: Record<string, any>; | |
} | |
interface EvalResult { | |
evalId: string; | |
timestamp: Date; | |
score: number; | |
passed: boolean; | |
details: { | |
input: string; | |
output: string; | |
expected?: string; | |
reasoning?: string; | |
}; | |
latency: number; | |
tokenUsage?: { | |
prompt: number; | |
completion: number; | |
total: number; | |
}; | |
} | |
// Context for observability state | |
const ObservabilityContext = Context.create<{ | |
tracer: trace.Tracer; | |
meter: metrics.Meter; | |
evalChannel: Channel<EvalResult, void>; | |
metrics: { | |
evalScore: Histogram; | |
evalLatency: Histogram; | |
evalCount: Counter; | |
evalPassRate: ObservableGauge; | |
agentExecutions: Counter; | |
continuationCaptures: Counter; | |
toolExecutions: Counter; | |
}; | |
}>("ObservabilityContext"); | |
// Pattern 1 Extension: Eval Prompts as Continuations | |
class EvalPromptContinuation extends PromptContinuation { | |
private evalDefinitions: Map<string, EvalDefinition> = new Map(); | |
*registerEval(eval: EvalDefinition): Operation<void> { | |
this.evalDefinitions.set(eval.id, eval); | |
yield* this.registerPrompt(`eval_${eval.id}`, eval.promptTemplate); | |
} | |
*executeEval( | |
evalId: string, | |
input: string, | |
context: Record<string, any> = {} | |
): Operation<EvalResult> { | |
const observability = yield* ObservabilityContext; | |
const tracer = observability.tracer; | |
return yield* tracer.startActiveSpan(`eval.${evalId}`, function* (span) { | |
span.setAttributes({ | |
'eval.id': evalId, | |
'eval.input.length': input.length | |
}); | |
const startTime = Date.now(); | |
try { | |
const evalDef = this.evalDefinitions.get(evalId); | |
if (!evalDef) throw new Error(`Eval ${evalId} not found`); | |
// Execute eval prompt with continuation boundary | |
const output = yield* this.executePrompt(`eval_${evalId}`, { | |
input, | |
...context | |
}); | |
const latency = Date.now() - startTime; | |
// Score the result | |
const score = yield* this.scoreResult(evalDef, input, output); | |
const passed = this.checkCriteria(evalDef, score); | |
const result: EvalResult = { | |
evalId, | |
timestamp: new Date(), | |
score, | |
passed, | |
details: { input, output }, | |
latency | |
}; | |
// Record metrics | |
observability.metrics.evalScore.record(score, { evalId, passed: String(passed) }); | |
observability.metrics.evalLatency.record(latency, { evalId }); | |
observability.metrics.evalCount.add(1, { evalId, passed: String(passed) }); | |
span.setStatus({ code: passed ? SpanStatusCode.OK : SpanStatusCode.ERROR }); | |
span.end(); | |
return result; | |
} catch (error) { | |
span.recordException(error as Error); | |
span.setStatus({ code: SpanStatusCode.ERROR }); | |
span.end(); | |
throw error; | |
} | |
}); | |
} | |
private *scoreResult( | |
evalDef: EvalDefinition, | |
input: string, | |
output: string | |
): Operation<number> { | |
// Use meta-prompting to score the result | |
const scoringPrompt = ` | |
Evaluate this output based on the criteria: | |
Input: ${input} | |
Output: ${output} | |
Criteria: ${JSON.stringify(evalDef.scoringCriteria)} | |
Provide a score from 0-100 with reasoning. | |
Format: {"score": <number>, "reasoning": "<text>"} | |
`; | |
const response = yield* this.executePrompt('scoring', { prompt: scoringPrompt }); | |
const { score } = JSON.parse(response); | |
return score; | |
} | |
private checkCriteria(evalDef: EvalDefinition, score: number): boolean { | |
return evalDef.scoringCriteria.every(criterion => { | |
switch (criterion.comparison) { | |
case 'gt': return score > criterion.threshold; | |
case 'lt': return score < criterion.threshold; | |
case 'eq': return score === criterion.threshold; | |
case 'gte': return score >= criterion.threshold; | |
case 'lte': return score <= criterion.threshold; | |
} | |
}); | |
} | |
} | |
// Pattern 2 Extension: Dynamic Eval Weaving | |
class DynamicEvalWeaver extends DynamicPromptWeaver { | |
*addEvalFragment( | |
evalId: string, | |
fragment: string, | |
condition: (result: EvalResult) => boolean | |
): Operation<void> { | |
yield* this.addFragment(`eval_${evalId}`, fragment, condition); | |
} | |
*weaveEvalContext( | |
recentResults: EvalResult[] | |
): Operation<string> { | |
const context = { | |
hasFailures: recentResults.some(r => !r.passed), | |
avgScore: recentResults.reduce((acc, r) => acc + r.score, 0) / recentResults.length, | |
recentResults | |
}; | |
return yield* this.weave(context); | |
} | |
} | |
// Observability Agent Implementation | |
function* ObservabilityAgent(config: { | |
openaiApiKey: string; | |
otlpEndpoint: string; | |
serviceName: string; | |
evalDefinitions: EvalDefinition[]; | |
evalSchedule?: string; // Cron expression for eval runs | |
}): Operation<Resource<{ | |
runEval: (evalId: string, input: string) => Operation<EvalResult>; | |
runAllEvals: () => Operation<EvalResult[]>; | |
getMetrics: () => Operation<Record<string, any>>; | |
streamEvalResults: () => Stream<EvalResult, void>; | |
}>> { | |
return yield* resource(function* (provide) { | |
// Initialize OpenTelemetry | |
const traceExporter = new OTLPTraceExporter({ | |
url: `${config.otlpEndpoint}/v1/traces` | |
}); | |
const metricExporter = new OTLPMetricExporter({ | |
url: `${config.otlpEndpoint}/v1/metrics` | |
}); | |
const tracerProvider = new NodeTracerProvider(); | |
tracerProvider.addSpanProcessor(new BatchSpanProcessor(traceExporter)); | |
tracerProvider.register(); | |
const meterProvider = new MeterProvider({ | |
readers: [ | |
new PeriodicExportingMetricReader({ | |
exporter: metricExporter, | |
exportIntervalMillis: 10000 | |
}) | |
] | |
}); | |
metrics.setGlobalMeterProvider(meterProvider); | |
// Register instrumentations | |
registerInstrumentations({ | |
instrumentations: [new HttpInstrumentation()] | |
}); | |
const tracer = trace.getTracer(config.serviceName, '1.0.0'); | |
const meter = metrics.getMeter(config.serviceName, '1.0.0'); | |
// Create metrics | |
const evalScore = meter.createHistogram('agent.eval.score', { | |
description: 'Eval scores', | |
unit: 'score', | |
valueType: ValueType.DOUBLE | |
}); | |
const evalLatency = meter.createHistogram('agent.eval.latency', { | |
description: 'Eval execution latency', | |
unit: 'ms', | |
valueType: ValueType.INT | |
}); | |
const evalCount = meter.createCounter('agent.eval.count', { | |
description: 'Number of evals executed' | |
}); | |
const agentExecutions = meter.createCounter('agent.executions', { | |
description: 'Number of agent executions' | |
}); | |
const continuationCaptures = meter.createCounter('agent.continuation.captures', { | |
description: 'Number of continuation captures' | |
}); | |
const toolExecutions = meter.createCounter('agent.tool.executions', { | |
description: 'Number of tool executions' | |
}); | |
// Create observable gauge for pass rate | |
let passRateValue = 0; | |
const evalPassRate = meter.createObservableGauge('agent.eval.pass_rate', { | |
description: 'Current eval pass rate' | |
}); | |
evalPassRate.addCallback(result => { | |
result.observe(passRateValue); | |
}); | |
// Initialize subsystems | |
const openai = new OpenAI({ apiKey: config.openaiApiKey }); | |
const evalPrompts = new EvalPromptContinuation(openai); | |
const evalWeaver = new DynamicEvalWeaver(); | |
const evalChannel = yield* createChannel<EvalResult, void>(); | |
// Register evals | |
for (const evalDef of config.evalDefinitions) { | |
yield* evalPrompts.registerEval(evalDef); | |
} | |
// Set up observability context | |
const observabilityContext = { | |
tracer, | |
meter, | |
evalChannel, | |
metrics: { | |
evalScore, | |
evalLatency, | |
evalCount, | |
evalPassRate, | |
agentExecutions, | |
continuationCaptures, | |
toolExecutions | |
} | |
}; | |
// Track eval results for pass rate calculation | |
const recentResults: EvalResult[] = []; | |
const MAX_RECENT_RESULTS = 100; | |
yield* spawn(function* () { | |
yield* each(evalChannel, function* (result) { | |
recentResults.push(result); | |
if (recentResults.length > MAX_RECENT_RESULTS) { | |
recentResults.shift(); | |
} | |
// Update pass rate | |
const passed = recentResults.filter(r => r.passed).length; | |
passRateValue = passed / recentResults.length; | |
}); | |
}); | |
// Eval execution functions | |
const runEval = function* ( | |
evalId: string, | |
input: string | |
): Operation<EvalResult> { | |
return yield* ObservabilityContext.provide(observabilityContext, function* () { | |
const result = yield* evalPrompts.executeEval(evalId, input); | |
yield* evalChannel.send(result); | |
return result; | |
}); | |
}; | |
const runAllEvals = function* (): Operation<EvalResult[]> { | |
return yield* ObservabilityContext.provide(observabilityContext, function* () { | |
return yield* tracer.startActiveSpan('eval.run_all', function* (span) { | |
const results: EvalResult[] = []; | |
for (const evalDef of config.evalDefinitions) { | |
try { | |
// Generate test input based on recent agent activity | |
const testInput = yield* generateTestInput(evalDef); | |
const result = yield* runEval(evalDef.id, testInput); | |
results.push(result); | |
} catch (error) { | |
span.recordException(error as Error); | |
} | |
} | |
span.setAttributes({ | |
'eval.total': results.length, | |
'eval.passed': results.filter(r => r.passed).length | |
}); | |
span.end(); | |
return results; | |
}); | |
}); | |
}; | |
const getMetrics = function* (): Operation<Record<string, any>> { | |
return { | |
totalEvals: recentResults.length, | |
passRate: passRateValue, | |
avgScore: recentResults.reduce((acc, r) => acc + r.score, 0) / recentResults.length, | |
recentFailures: recentResults.filter(r => !r.passed).slice(-10) | |
}; | |
}; | |
const control = { | |
runEval, | |
runAllEvals, | |
getMetrics, | |
streamEvalResults: () => evalChannel.stream | |
}; | |
try { | |
yield* provide(control); | |
} finally { | |
// Cleanup | |
yield* evalChannel.close(); | |
yield* sleep(1000); // Allow final metrics export | |
} | |
}); | |
} | |
// Composed Agent: CronAgent with Observability | |
function* ObservableCronAgent(config: { | |
id: string; | |
openaiApiKey: string; | |
cronExpression: string; | |
tools?: Tool[]; | |
maxRuns?: number; | |
observability: { | |
otlpEndpoint: string; | |
serviceName: string; | |
evalDefinitions: EvalDefinition[]; | |
evalSchedule?: string; | |
}; | |
}): Operation<void> { | |
yield* main(function* () { | |
// Initialize observability agent | |
const obsAgent = yield* ObservabilityAgent({ | |
openaiApiKey: config.openaiApiKey, | |
otlpEndpoint: config.observability.otlpEndpoint, | |
serviceName: config.observability.serviceName, | |
evalDefinitions: config.observability.evalDefinitions, | |
evalSchedule: config.observability.evalSchedule | |
}); | |
// Create observability-aware tools | |
const observableTools: Tool[] = [ | |
...(config.tools || []), | |
{ | |
name: 'runEval', | |
description: 'Run an evaluation to test agent capabilities', | |
execute: function* (params: { evalId: string; input: string }) { | |
return yield* obsAgent.runEval(params.evalId, params.input); | |
} | |
}, | |
{ | |
name: 'checkMetrics', | |
description: 'Check current observability metrics', | |
execute: function* () { | |
return yield* obsAgent.getMetrics(); | |
} | |
} | |
]; | |
// Initialize base cron agent with observability tools | |
const cronAgent = yield* CronAgent({ | |
...config, | |
tools: observableTools | |
}); | |
// Start monitoring eval results | |
yield* spawn(function* () { | |
yield* each(obsAgent.streamEvalResults(), function* (result) { | |
console.log(`Eval ${result.evalId}: ${result.passed ? 'PASSED' : 'FAILED'} (${result.score})`); | |
// If eval fails, agent could self-adjust | |
if (!result.passed && result.score < 50) { | |
console.log('Low eval score detected, agent may need adjustment'); | |
} | |
}); | |
}); | |
// Schedule regular eval runs if specified | |
if (config.observability.evalSchedule) { | |
const { CronScheduler } = await import('./cron-agent'); | |
const evalScheduler = new CronScheduler(); | |
yield* evalScheduler.schedule('evals', config.observability.evalSchedule); | |
yield* spawn(function* () { | |
while (true) { | |
yield* evalScheduler.waitUntilNextRun('evals'); | |
console.log('Running scheduled evals...'); | |
const results = yield* obsAgent.runAllEvals(); | |
console.log(`Completed ${results.length} evals`); | |
} | |
}); | |
} | |
// Start the cron agent | |
yield* cronAgent.start(); | |
// Keep running | |
yield* suspend(); | |
}); | |
} | |
// Helper to generate test inputs | |
function* generateTestInput(evalDef: EvalDefinition): Operation<string> { | |
// This could be made more sophisticated based on eval type | |
const templates = [ | |
"Explain the concept of distributed systems", | |
"Write a function to sort an array", | |
"Analyze the sentiment of this text: 'Great product!'", | |
"Suggest improvements for this code snippet" | |
]; | |
return templates[Math.floor(Math.random() * templates.length)]; | |
} | |
// Example usage with specific eval definitions | |
const exampleEvalDefinitions: EvalDefinition[] = [ | |
{ | |
id: 'reasoning_quality', | |
name: 'Reasoning Quality Eval', | |
description: 'Tests the agent\'s ability to provide step-by-step reasoning', | |
promptTemplate: ` | |
Solve this problem step by step: {{input}} | |
Show your reasoning clearly. | |
`, | |
scoringCriteria: [ | |
{ metric: 'clarity', threshold: 70, comparison: 'gte' }, | |
{ metric: 'correctness', threshold: 80, comparison: 'gte' } | |
] | |
}, | |
{ | |
id: 'tool_usage', | |
name: 'Tool Usage Eval', | |
description: 'Tests the agent\'s ability to correctly use tools', | |
promptTemplate: ` | |
Task: {{input}} | |
Available tools: {{tools}} | |
Select and use the appropriate tool. | |
`, | |
scoringCriteria: [ | |
{ metric: 'tool_selection', threshold: 90, comparison: 'gte' } | |
] | |
}, | |
{ | |
id: 'context_retention', | |
name: 'Context Retention Eval', | |
description: 'Tests if agent maintains context across interactions', | |
promptTemplate: ` | |
Previous context: {{previousContext}} | |
New query: {{input}} | |
Respond while maintaining awareness of previous context. | |
`, | |
scoringCriteria: [ | |
{ metric: 'context_awareness', threshold: 75, comparison: 'gte' } | |
] | |
} | |
]; | |
// Run the composed agent | |
async function runExample() { | |
await ObservableCronAgent({ | |
id: 'observable-data-processor', | |
openaiApiKey: process.env.OPENAI_API_KEY!, | |
cronExpression: '*/30 * * * *', // Every 30 minutes | |
maxRuns: 48, // 24 hours | |
tools: [ | |
{ | |
name: 'analyzeData', | |
description: 'Analyzes data patterns', | |
execute: function* (params) { | |
yield* sleep(1000); | |
return { analysis: 'completed', patterns: ['trend_up'] }; | |
} | |
} | |
], | |
observability: { | |
otlpEndpoint: process.env.OTLP_ENDPOINT || 'http://localhost:4318', | |
serviceName: 'observable-cron-agent', | |
evalDefinitions: exampleEvalDefinitions, | |
evalSchedule: '0 * * * *' // Run evals hourly | |
} | |
}); | |
} | |
// Export main components | |
export { | |
ObservabilityAgent, | |
ObservableCronAgent, | |
EvalPromptContinuation, | |
DynamicEvalWeaver, | |
type EvalDefinition, | |
type EvalResult | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment