Ah, I see the issue! This is a race condition in your queue processing system. Let me explain what's happening:
When you send two messages quickly:
- Message 1 ("weather") → enqueued
- Message 2 ("news") → enqueued
- Processing loop picks up Message 1 → calls
agent.process(msg1) - Before Message 1 finishes processing, the loop picks up Message 2 → calls
agent.process(msg2) - Both messages are processed in the same OpenCode session
- Message 2 overwrites/interferes with Message 1's processing
- You get the same response (from Message 2) twice
Look at your kernel processing loop:
// kernel.ts - Lines run every 1 second
setInterval(async () => {
const queuedItem = await this.queue.dequeue<MessagePacket>("incoming");
if (!queuedItem) return;
const msg = queuedItem.data;
const agent = this.agents.get("opencode");
// ⚠️ NO LOCK - Multiple intervals can run this simultaneously!
const responseText = await agent.process(msg); // This is async and slow
await this.queue.enqueue("outgoing", response);
await this.queue.complete("incoming", queuedItem.id);
}, 1000);The issue: setInterval doesn't wait for the async callback to complete. If agent.process() takes 3 seconds, you'll have:
- Interval 1 (t=0s): Processing message 1
- Interval 2 (t=1s): Processing message 2 (while message 1 is still processing!)
- Interval 3 (t=2s): Processing message 3 (while both 1 and 2 are processing!)
export class Kernel {
private processingIncoming = false;
private processingOutgoing = false;
private startProcessingLoop() {
// Incoming queue processing
setInterval(async () => {
if (this.processingIncoming) return; // Skip if already processing
this.processingIncoming = true;
try {
const queuedItem = await this.queue.dequeue<MessagePacket>("incoming");
if (!queuedItem) return;
const msg = queuedItem.data;
const agent = this.agents.get("opencode");
if (!agent) {
console.error("No agent available!");
await this.queue.complete("incoming", queuedItem.id);
return;
}
console.log(`[${msg.source}] Processing message ${msg.id} via ${agent.name}...`);
try {
const responseText = await agent.process(msg);
const response: ResponsePacket = {
...msg,
id: `${msg.id}_response`,
payload: responseText,
originalMessageId: msg.id,
timestamp: Date.now(),
};
await this.queue.enqueue("outgoing", response);
await this.queue.complete("incoming", queuedItem.id);
} catch (err) {
console.error("Agent processing failed:", err);
await this.queue.complete("incoming", queuedItem.id);
}
} finally {
this.processingIncoming = false; // Always release lock
}
}, 1000);
// Outgoing queue processing
setInterval(async () => {
if (this.processingOutgoing) return; // Skip if already processing
this.processingOutgoing = true;
try {
const queuedItem = await this.queue.dequeue<ResponsePacket>("outgoing");
if (!queuedItem) return;
const response = queuedItem.data;
const outputAdapter = this.outputs.get(response.source);
if (outputAdapter) {
await outputAdapter.send(response);
await this.queue.complete("outgoing", queuedItem.id);
} else {
console.warn(`No output adapter found for source: ${response.source}`);
await this.queue.complete("outgoing", queuedItem.id);
}
} finally {
this.processingOutgoing = false; // Always release lock
}
}, 1000);
}
}private async processIncomingQueue() {
while (true) {
const queuedItem = await this.queue.dequeue<MessagePacket>("incoming");
if (!queuedItem) {
// Queue is empty, wait before checking again
await new Promise(resolve => setTimeout(resolve, 1000));
continue;
}
const msg = queuedItem.data;
const agent = this.agents.get("opencode");
if (!agent) {
console.error("No agent available!");
await this.queue.complete("incoming", queuedItem.id);
continue;
}
console.log(`[${msg.source}] Processing message ${msg.id} via ${agent.name}...`);
try {
const responseText = await agent.process(msg);
const response: ResponsePacket = {
...msg,
id: `${msg.id}_response`,
payload: responseText,
originalMessageId: msg.id,
timestamp: Date.now(),
};
await this.queue.enqueue("outgoing", response);
await this.queue.complete("incoming", queuedItem.id);
} catch (err) {
console.error("Agent processing failed:", err);
await this.queue.complete("incoming", queuedItem.id);
}
}
}
private async processOutgoingQueue() {
while (true) {
const queuedItem = await this.queue.dequeue<ResponsePacket>("outgoing");
if (!queuedItem) {
await new Promise(resolve => setTimeout(resolve, 1000));
continue;
}
const response = queuedItem.data;
const outputAdapter = this.outputs.get(response.source);
if (outputAdapter) {
await outputAdapter.send(response);
await this.queue.complete("outgoing", queuedItem.id);
} else {
console.warn(`No output adapter found for source: ${response.source}`);
await this.queue.complete("outgoing", queuedItem.id);
}
}
}
private startProcessingLoop() {
// Start both processing loops in parallel
this.processIncomingQueue().catch(err =>
console.error("Incoming queue processor crashed:", err)
);
this.processOutgoingQueue().catch(err =>
console.error("Outgoing queue processor crashed:", err)
);
}If you want to process messages from different channels in parallel but serialize messages from the same channel:
export class Kernel {
private channelLocks: Map<string, boolean> = new Map();
private startProcessingLoop() {
setInterval(async () => {
const queuedItem = await this.queue.dequeue<MessagePacket>("incoming");
if (!queuedItem) return;
const msg = queuedItem.data;
// Check if this channel is already being processed
if (this.channelLocks.get(msg.channelId)) {
// Put it back in the queue
await this.queue.enqueue("incoming", msg);
await this.queue.complete("incoming", queuedItem.id);
return;
}
// Acquire lock for this channel
this.channelLocks.set(msg.channelId, true);
try {
const agent = this.agents.get("opencode");
if (!agent) {
console.error("No agent available!");
return;
}
const responseText = await agent.process(msg);
const response: ResponsePacket = { /* ... */ };
await this.queue.enqueue("outgoing", response);
} catch (err) {
console.error("Agent processing failed:", err);
} finally {
// Always release the channel lock
this.channelLocks.delete(msg.channelId);
await this.queue.complete("incoming", queuedItem.id);
}
}, 100); // Can be faster now since we have locks
}
}Use Solution 1 (Processing Lock) - It's the simplest fix that solves your immediate problem without major refactoring.
Then, if you want better performance later, migrate to Solution 2 (Recursive Loop) which is more elegant and doesn't waste CPU cycles.
The key insight: Never let multiple async operations process the queue simultaneously unless you have proper locking per channel/session.