Skip to content

Instantly share code, notes, and snippets.

@lemassykoi
Last active April 21, 2025 01:37
Show Gist options
  • Save lemassykoi/6b372f7a3f51b91284320007433e993e to your computer and use it in GitHub Desktop.
Save lemassykoi/6b372f7a3f51b91284320007433e993e to your computer and use it in GitHub Desktop.
This Python script is an asynchronous task management system that uses the Ollama LLM service to handle complex queries by breaking them into smaller tasks. It employs an event broker for communication and includes components like a client, server, supervisor, agent, and sub-agent to process and synthesize reports.
import asyncio
import logging
import json
from typing import Dict, Any
from ollama import AsyncClient
import aiohttp
#USER_QUERY = "Generate a report about the solar system"
#USER_QUERY = "Generate a report about large language models"
USER_QUERY = "Rédige un rapport complet sur les transformations à venir dans le monde du travail, causées par l'intelligence artificielle abordable et démocratisée."
OLLAMA_HOST = "http://127.0.0.1:11434"
OLLAMA_MODEL = "granite3.3:8b"
OLLAMA_MODEL_DECOMPOZE = OLLAMA_MODEL
OLLAMA_MODEL_AGENT1 = OLLAMA_MODEL
OLLAMA_MODEL_SUBAGENT1 = OLLAMA_MODEL
OLLAMA_MODEL_SYNTHESIZE = OLLAMA_MODEL
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Event Broker: Tracks progress and events
class EventBroker:
def __init__(self):
self.subscribers = []
def subscribe(self, callback):
self.subscribers.append(callback)
async def publish(self, event: Dict[str, Any]):
logging.debug(f"Event Broker: Publishing event - {event}")
# Use asyncio.create_task for non-blocking event publishing
asyncio.create_task(self._notify_subscribers(event))
async def _notify_subscribers(self, event: Dict[str, Any]):
# Notify subscribers concurrently without waiting for each one
await asyncio.gather(*(callback(event) for callback in self.subscribers))
event_broker = EventBroker()
# Queues for communication between components (asyncio.Queue is thread-safe for async tasks)
client_to_server = asyncio.Queue()
server_to_supervisor = asyncio.Queue()
supervisor_to_agent = asyncio.Queue()
agent_to_supervisor = asyncio.Queue()
supervisor_to_subsupervisor = asyncio.Queue()
subsupervisor_to_subagent = asyncio.Queue()
subagent_to_subsupervisor = asyncio.Queue()
subsupervisor_to_supervisor = asyncio.Queue() # For final sub-result aggregation
supervisor_to_server = asyncio.Queue()
server_to_client = asyncio.Queue()
# Function to decompose a query into a main task and sub-tasks using Ollama with JSON format
async def decompose_task(query: str) -> Dict[str, Any]:
ollama_client = AsyncClient(host=OLLAMA_HOST)
try:
prompt = (
f"From the following question, generate a main task and a list of 2 sub-tasks in JSON format:\n"
f"Question: '{query}'\n\n"
f"Return the output as a JSON object with the following structure:\n"
f'{{\n "main_task": "Description of the main task",\n "sub_tasks": ["Description of sub-task 1", "Description of sub-task 2"]\n}}\n\n'
f"Ensure the response is a valid JSON object matching this structure."
)
message = {"role": "user", "content": prompt}
response = await ollama_client.chat(model=OLLAMA_MODEL_DECOMPOZE, messages=[message], format="json", options={"temperature": 0.0, "seed": 1234567890})
# Parse the JSON response
result_content = response["message"]["content"]
logging.debug(f"Decompose Raw Response: {result_content}")
result = json.loads(result_content)
# Validate the structure
if not isinstance(result, dict) or "main_task" not in result or "sub_tasks" not in result:
raise ValueError("Invalid JSON structure: missing required fields")
if not isinstance(result["main_task"], str) or not isinstance(result["sub_tasks"], list):
raise ValueError("Invalid JSON structure: incorrect types")
if not all(isinstance(sub_task, str) for sub_task in result["sub_tasks"]):
# Try to handle if sub_tasks contains dicts like {"task": "..."}
if all(isinstance(sub_task, dict) and "task" in sub_task and isinstance(sub_task["task"], str) for sub_task in result["sub_tasks"]):
result["sub_tasks"] = [st["task"] for st in result["sub_tasks"]]
else:
raise ValueError("Invalid JSON structure: sub_tasks must be a list of strings")
return result
except (json.JSONDecodeError, ValueError, aiohttp.ClientError, KeyError) as e:
logging.error(f"Failed to decompose task: {str(e)}. Raw response: '{result_content if 'result_content' in locals() else 'N/A'}'")
# Fallback in case of failure
return {
"main_task": f"Generate a comprehensive report about {query}",
"sub_tasks": [
f"Research key aspects related to {query}",
f"Summarize the findings about {query}"
# f"Write a detailed response for {query}" # Reduced default fallback
]
}
except Exception as e:
logging.error(f"Unexpected error during task decomposition: {str(e)}", exc_info=True)
# Generic fallback
return {
"main_task": f"Generate a comprehensive report about {query}",
"sub_tasks": [
f"Research key aspects related to {query}",
f"Summarize the findings about {query}"
]
}
# Client: Initiates the request
async def client(command = USER_QUERY):
async def log_event(event):
# Avoid printing excessively long results directly in the log event message for clarity
event_str = {k: (v[:100] + '...' if isinstance(v, str) and len(v) > 100 else v) for k, v in event.items()}
print(f"\033[94mClient: Received event - {event_str}\033[0m")
event_broker.subscribe(log_event)
print("\033[94mClient: Sending command to server\033[0m")
await client_to_server.put(command)
await event_broker.publish({"stage": "client", "message": "Command sent", "command": command})
# Wait for the final result
print("\033[94mClient: Waiting for final result...\033[0m")
result = await server_to_client.get()
print(f"\n\033[1;94m--- FINAL RESULT RECEIVED BY CLIENT --- \033[0m\n{result}\n\033[1;94m--- END OF FINAL RESULT --- \033[0m")
await event_broker.publish({"stage": "client", "message": "Result received", "result": result}) # Keep full result in event data
# Server Endpoint: Forwards the command to the supervisor
async def server_endpoint():
command = await client_to_server.get()
print("\033[92mServer Endpoint: Received command from client - {}\033[0m".format(command))
await event_broker.publish({"stage": "server", "message": "Received command", "command": command})
print("\033[92mServer Endpoint: Forwarding command to supervisor\033[0m")
await server_to_supervisor.put(command)
# Wait for results from supervisor
print("\033[92mServer Endpoint: Waiting for final result from supervisor...\033[0m")
result = await supervisor_to_server.get()
print("\033[92mServer Endpoint: Received final result from supervisor. Forwarding to client.\033[0m")
await event_broker.publish({"stage": "server", "message": "Forwarding result", "result": result})
await server_to_client.put(result)
# Agent (Supervisor): Manages agents and sub-supervisors with synthesis
async def agent_supervisor():
command = await server_to_supervisor.get()
print(f"\033[93mAgent (Supervisor): Received command - {command}\033[0m")
await event_broker.publish({"stage": "supervisor", "message": "Received command", "command": command})
print("\033[93mAgent (Supervisor): Decomposing task using LLM...\033[0m")
tasks = await decompose_task(command)
main_task = tasks["main_task"]
sub_tasks = tasks["sub_tasks"]
print(f"\033[93mAgent (Supervisor): Decomposed into Main Task and {len(sub_tasks)} Sub-Tasks.\033[0m")
print(f" Main Task: {main_task}")
print(f" Sub-Tasks: {sub_tasks}\033[0m")
await event_broker.publish({"stage": "supervisor", "message": "Decomposed task", "main_task": main_task, "sub_tasks": sub_tasks})
print("\033[93mAgent (Supervisor): Delegating main task and sub-tasks concurrently...\033[0m")
await supervisor_to_agent.put(main_task)
await event_broker.publish({"stage": "supervisor", "message": "Delegated main task to agent"})
await supervisor_to_subsupervisor.put(sub_tasks)
await event_broker.publish({"stage": "supervisor", "message": "Delegated sub-tasks to sub-agent supervisor"})
print("\033[93mAgent (Supervisor): Waiting for results from Agent and Sub-Agent Supervisor...\033[0m")
agent_result_task = asyncio.create_task(agent_to_supervisor.get())
sub_result_task = asyncio.create_task(subsupervisor_to_supervisor.get())
results = await asyncio.gather(agent_result_task, sub_result_task)
agent_result = results[0]
sub_result = results[1]
print("\033[93mAgent (Supervisor): Received result from agent.\033[0m")
await event_broker.publish({"stage": "supervisor", "message": "Received result from agent", "result": agent_result})
print("\033[93mAgent (Supervisor): Received aggregated result from sub-agent supervisor.\033[0m")
await event_broker.publish({"stage": "supervisor", "message": "Received result from sub-agent supervisor", "result": sub_result})
# --- START SYNTHESIS STEP ---
preliminary_combined_report = f"--- Information from Main Task ---\n{agent_result}\n\n--- Information from Sub-Tasks ---\n{sub_result}"
final_report_content = ""
try:
print("\033[93mAgent (Supervisor): Synthesizing final report using LLM...\033[0m")
await event_broker.publish({"stage": "supervisor", "message": "Starting final synthesis"})
final_report_content = await synthesize_final_report(preliminary_combined_report, command) # Pass original command for language/context
print("\033[92mAgent (Supervisor): Final synthesis successful.\033[0m")
await event_broker.publish({"stage": "supervisor", "message": "Final synthesis successful", "result": final_report_content})
except Exception as e:
print(f"\033[91mAgent (Supervisor): Final synthesis failed: {str(e)}. Falling back to combined preliminary report.\033[0m")
logging.error("Agent Supervisor: Final synthesis step failed.", exc_info=True)
await event_broker.publish({"stage": "supervisor", "message": "Final synthesis failed", "error": str(e)})
# Fallback to the un-synthesized content
final_report_content = f"--- FINAL REPORT (SYNTHESIS FAILED) ---\n{preliminary_combined_report}"
# --- END SYNTHESIS STEP ---
print("\033[93mAgent (Supervisor): Forwarding final result to server.\033[0m")
# Publish the potentially very long final result only if needed, maybe just a confirmation?
await event_broker.publish({"stage": "supervisor", "message": "Forwarding final result to server"})
await supervisor_to_server.put(final_report_content) # Send the synthesized (or fallback) result
# Agent: Executes tasks assigned by the supervisor using Ollama LLM
async def agent():
ollama_client = AsyncClient(host=OLLAMA_HOST)
max_retries = 3
task = await supervisor_to_agent.get() # Get task once
for attempt in range(max_retries):
try:
print(f"\033[95mAgent: Executing task (Attempt {attempt + 1}/{max_retries}) - {task[:100]}...\033[0m")
await event_broker.publish({"stage": "agent", "message": "Executing task", "task": task, "attempt": attempt + 1})
# Use Ollama LLM to process the task
message = [{"role": "user", "content": task}]
#message = [{"role": "system", "content": "Enable deep thinking subroutine."}, {"role": "user", "content": task}]
response = await ollama_client.chat(model=OLLAMA_MODEL_AGENT1, messages=message)
result = response["message"]["content"]
print("\033[95mAgent: Task completed successfully. Returning result to supervisor.\033[0m")
await event_broker.publish({"stage": "agent", "message": "Task completed", "result": result})
await agent_to_supervisor.put(result)
return # Exit loop on success
except aiohttp.ClientError as e:
print(f"\033[91mAgent: Network Error on attempt {attempt + 1}/{max_retries} - {str(e)}\033[0m")
await event_broker.publish({"stage": "agent", "message": "Network Error", "error": str(e), "attempt": attempt + 1})
if attempt == max_retries - 1:
print("\033[91mAgent: Max retries reached, sending failure message.\033[0m")
await event_broker.publish({"stage": "agent", "message": "Max retries failed"})
await agent_to_supervisor.put(f"Agent failed to execute task '{task[:50]}...' after {max_retries} retries due to network errors.")
return # Exit loop on final failure
else:
await asyncio.sleep(2**(attempt)) # Exponential backoff
except Exception as e:
print(f"\033[91mAgent: Unexpected Error on attempt {attempt + 1}/{max_retries} - {str(e)}\033[0m")
logging.error("Agent unexpected error", exc_info=True)
await event_broker.publish({"stage": "agent", "message": "Unexpected Error", "error": str(e), "attempt": attempt + 1})
if attempt == max_retries - 1:
print("\033[91mAgent: Max retries reached after unexpected error, sending failure message.\033[0m")
await event_broker.publish({"stage": "agent", "message": "Max retries failed"})
await agent_to_supervisor.put(f"Agent failed to execute task '{task[:50]}...' after {max_retries} retries due to unexpected error: {str(e)}")
return # Exit loop on final failure
else:
await asyncio.sleep(1) # Wait before retrying
# SubAgent (Supervisor): Manages sub-agents, processes sub-tasks sequentially
async def subagent_supervisor():
try:
sub_tasks = await supervisor_to_subsupervisor.get()
if not isinstance(sub_tasks, list) or not all(isinstance(st, str) for st in sub_tasks):
logging.error(f"SubAgent Supervisor received invalid sub_tasks format: {sub_tasks}")
await subsupervisor_to_supervisor.put("Error: SubAgent Supervisor received invalid sub-tasks format.") # Send error up
return
print(f"\033[96mSubAgent (Supervisor): Received {len(sub_tasks)} sub-tasks.\033[0m")
await event_broker.publish({
"stage": "subagent_supervisor",
"message": f"Received {len(sub_tasks)} sub-tasks",
"sub_tasks": sub_tasks
})
except Exception as e:
print(f"\033[91mSubAgent Supervisor: Error receiving sub-tasks: {e}\033[0m")
logging.error("SubAgent Supervisor error receiving tasks", exc_info=True)
await subsupervisor_to_supervisor.put(f"Error: SubAgent Supervisor failed to receive tasks: {e}") # Send error up
return
# Process each sub-task sequentially
sub_results = []
for i, sub_task in enumerate(sub_tasks, 1):
try:
print(f"\033[96mSubAgent (Supervisor): Delegating sub-task {i}/{len(sub_tasks)}: {sub_task[:100]}...\033[0m")
await event_broker.publish({"stage": "subagent_supervisor", "message": f"Delegating sub-task {i}", "sub_task": sub_task})
# Send the task to the sub-agent
await subsupervisor_to_subagent.put(sub_task)
#print(f"\033[92mSubAgent (Supervisor): Sent sub-task {i} for processing\033[0m") # Less verbose log
# Collect result from sub-agent (using the correct queue)
print(f"\033[96mSubAgent (Supervisor): Waiting for result of sub-task {i}...\033[0m")
# No need for timeout here if subagent handles its own retries/failure
result = await subagent_to_subsupervisor.get()
print(f"\033[96mSubAgent (Supervisor): Received result for sub-task {i}.\033[0m") # Result too long for log
await event_broker.publish({
"stage": "subagent_supervisor",
"message": f"Completed sub-task {i}",
"result": result
})
sub_results.append(f"--- Result for Sub-Task {i}: {sub_task} ---\n{result}")
except Exception as e:
print(f"\033[91mSubAgent Supervisor: Error processing sub-task {i} ('{sub_task[:50]}...'): {e}\033[0m")
logging.error(f"SubAgent Supervisor error processing sub-task {i}", exc_info=True)
sub_results.append(f"--- Error for Sub-Task {i}: {sub_task} ---\nFailed due to supervisor error: {e}")
# Decide if we should continue with other subtasks or abort? Let's continue for now.
# Combine and return results
combined_result = "\n\n".join(sub_results)
try:
print("\033[96mSubAgent (Supervisor): All sub-tasks processed. Sending combined result UP to Agent Supervisor.\033[0m")
await subsupervisor_to_supervisor.put(combined_result) # --- Send to the NEW queue ---
await event_broker.publish({"stage": "subagent_supervisor", "message": "Sent combined result", "result": combined_result})
except Exception as e:
print(f"\033[91mSubAgent Supervisor: Error sending combined result UP: {e}\033[0m")
logging.error("SubAgent Supervisor error sending final result", exc_info=True)
# If sending fails, the main supervisor will hang. Not much we can do here unless we add another error path.
# SubAgent: Executes sub-tasks using Ollama LLM
async def subagent():
ollama_client = AsyncClient(host=OLLAMA_HOST)
max_retries = 3
while True: # Keep subagent alive to process multiple tasks sequentially
task = await subsupervisor_to_subagent.get()
print(f"\033[91m SubAgent: Received sub-task - {task[:100]}...\033[0m")
for attempt in range(max_retries):
try:
print(f"\033[91m SubAgent: Executing sub-task (Attempt {attempt + 1}/{max_retries})...\033[0m")
await event_broker.publish({"stage": "subagent", "message": "Executing sub-task", "task": task, "attempt": attempt + 1})
# Use Ollama LLM to process the sub-task
message = {"role": "user", "content": task}
response = await ollama_client.chat(model=OLLAMA_MODEL_SUBAGENT1, messages=[message])
result = response["message"]["content"]
print("\033[91m SubAgent: Sub-task completed successfully. Returning result.\033[0m")
await event_broker.publish({"stage": "subagent", "message": "Sub-task completed", "result": result})
await subagent_to_subsupervisor.put(result)
break # Exit retry loop on success, wait for next task
except aiohttp.ClientError as e:
print(f"\033[91m SubAgent: Network Error on attempt {attempt + 1}/{max_retries} - {str(e)}\033[0m")
await event_broker.publish({"stage": "subagent", "message": "Network Error", "error": str(e), "attempt": attempt + 1})
if attempt == max_retries - 1:
print("\033[91m SubAgent: Max retries reached, sending failure message.\033[0m")
await event_broker.publish({"stage": "subagent", "message": "Max retries failed"})
await subagent_to_subsupervisor.put(f"SubAgent failed to execute sub-task '{task[:50]}...' after {max_retries} retries due to network errors.")
break # Exit retry loop on final failure
else:
await asyncio.sleep(2**(attempt)) # Exponential backoff
except Exception as e:
print(f"\033[91m SubAgent: Unexpected Error on attempt {attempt + 1}/{max_retries} - {str(e)}\033[0m")
logging.error("Subagent unexpected error", exc_info=True)
await event_broker.publish({"stage": "subagent", "message": "Unexpected Error", "error": str(e), "attempt": attempt + 1})
if attempt == max_retries - 1:
print("\033[91m SubAgent: Max retries reached after unexpected error, sending failure message.\033[0m")
await event_broker.publish({"stage": "subagent", "message": "Max retries failed"})
await subagent_to_subsupervisor.put(f"SubAgent failed to execute sub-task '{task[:50]}...' after {max_retries} retries due to unexpected error: {str(e)}")
break # Exit retry loop on final failure
else:
await asyncio.sleep(1) # Wait before retrying
# Final synthesis
async def synthesize_final_report(report_content: str, original_query: str) -> str:
"""
Uses an LLM to synthesize the combined report content into a final,
coherent response in the language of the original query.
"""
ollama_client = AsyncClient(host=OLLAMA_HOST)
max_retries = 2 # Synthesis is important, but maybe fewer retries than core tasks
synthesis_prompt = f"""
Original User Query (for language and context): "{original_query}"
Combined Report Content (from main task and sub-tasks):
--- START REPORT CONTENT ---
{report_content}
--- END REPORT CONTENT ---
Your Task:
1. Synthesize the above 'Combined Report Content' into a single, comprehensive, and coherent final report.
2. Remove redundancy and logically integrate information from the different sections (main report and sub-tasks).
3. IMPORTANT: Ensure the *entire* final output is written in the *same language* as the 'Original User Query'.
4. Structure the final report clearly with appropriate headings or paragraphs if needed.
5. Do NOT include explanatory phrases about your process (e.g., "Here is the synthesized report:"). Just output the final report itself.
"""
message = {"role": "user", "content": synthesis_prompt}
for attempt in range(max_retries):
try:
print(f"\033[97mSynthesizer: Calling LLM (Attempt {attempt + 1}/{max_retries})...\033[0m")
response = await ollama_client.chat(model=OLLAMA_MODEL_SYNTHESIZE, messages=[message], options={"temperature": 0.7, "seed": 1234567890})
synthesized_result = response["message"]["content"]
print("\033[97mSynthesizer: LLM call successful.\033[0m")
return synthesized_result
except aiohttp.ClientError as e:
print(f"\033[91mSynthesizer: Network Error on attempt {attempt + 1}/{max_retries} - {str(e)}\033[0m")
if attempt == max_retries - 1:
raise ConnectionError(f"Synthesis failed after {max_retries} network retries: {e}") from e
await asyncio.sleep(2**(attempt))
except Exception as e:
print(f"\033[91mSynthesizer: Unexpected Error on attempt {attempt + 1}/{max_retries} - {str(e)}\033[0m")
logging.error("Synthesizer unexpected error", exc_info=True)
if attempt == max_retries - 1:
raise RuntimeError(f"Synthesis failed after {max_retries} unexpected error retries: {e}") from e
await asyncio.sleep(1)
# Should not be reachable if max_retries > 0, but added for safety
raise RuntimeError("Synthesis failed after all retries.")
# Main function to run all components concurrently
async def main():
print("Starting components...")
tasks = [
asyncio.create_task(client()),
asyncio.create_task(server_endpoint()),
asyncio.create_task(agent_supervisor()),
asyncio.create_task(agent()),
asyncio.create_task(subagent_supervisor()),
asyncio.create_task(subagent()), # Only need one subagent instance as it processes tasks sequentially
]
print("Components started. Waiting for completion...")
# Wait for the client task to finish as it represents the end of the user request flow
# Or wait for all tasks if you want to ensure everything shuts down gracefully (though agents might run forever if designed to)
# await asyncio.gather(*tasks) # This might hang if agents are in infinite loops
# Let's wait specifically for the client to finish receiving the result
await tasks[0] # Wait for client() task to complete
print("Client task finished. Shutting down other tasks...")
logging.info(f"Model --> {OLLAMA_MODEL_SYNTHESIZE}")
# Optionally cancel other tasks if they are still running in loops
for i, task in enumerate(tasks):
if i != 0 and not task.done(): # Don't cancel client task or already done tasks
task.cancel()
# Allow cancellations to propagate
await asyncio.sleep(0.1)
print("Shutdown complete.")
# Run the script
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nCaught KeyboardInterrupt, exiting.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment