Last active
August 28, 2024 11:48
-
-
Save bonadio/2d548a493907c133bc10de806ecd08af to your computer and use it in GitHub Desktop.
Very basic implementation of Autogen with FastApi using websocket to interact with user_proxy in a web app
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import autogen | |
from user_proxy_webagent import UserProxyWebAgent | |
import asyncio | |
config_list = [ | |
{ | |
"model": "gpt-3.5-turbo", | |
# "api_key": "<YOUR KEY HERE>" | |
} | |
] | |
llm_config_assistant = { | |
"model":"gpt-3.5-turbo", | |
"temperature": 0, | |
"config_list": config_list, | |
"functions": [ | |
{ | |
"name": "search_db", | |
"description": "Search database for order status", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"order_number": { | |
"type": "integer", | |
"description": "Order number", | |
}, | |
"customer_number": { | |
"type": "string", | |
"description": "Customer number", | |
} | |
}, | |
"required": ["order_number","customer_number"], | |
}, | |
}, | |
], | |
} | |
llm_config_proxy = { | |
"model":"gpt-3.5-turbo-0613", | |
"temperature": 0, | |
"config_list": config_list, | |
} | |
############################################################################################# | |
# this is where you put your Autogen logic, here I have a simple 2 agents with a function call | |
class AutogenChat(): | |
def __init__(self, chat_id=None, websocket=None): | |
self.websocket = websocket | |
self.chat_id = chat_id | |
self.client_sent_queue = asyncio.Queue() | |
self.client_receive_queue = asyncio.Queue() | |
self.assistant = autogen.AssistantAgent( | |
name="assistant", | |
llm_config=llm_config_assistant, | |
system_message="""You are a helpful assistant, help the user find the status of his order. | |
Only use the tools provided to do the search. Only execute the search after you have all the information needed. | |
When you ask a question, always add the word "BRKT"" at the end. | |
When you responde with the status add the word TERMINATE""" | |
) | |
self.user_proxy = UserProxyWebAgent( | |
name="user_proxy", | |
human_input_mode="ALWAYS", | |
max_consecutive_auto_reply=10, | |
is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"), | |
code_execution_config=False, | |
function_map={ | |
"search_db": self.search_db | |
} | |
) | |
# add the queues to communicate | |
self.user_proxy.set_queues(self.client_sent_queue, self.client_receive_queue) | |
async def start(self, message): | |
await self.user_proxy.a_initiate_chat( | |
self.assistant, | |
clear_history=True, | |
message=message | |
) | |
#MOCH Function call | |
def search_db(self, order_number=None, customer_number=None): | |
return "Order status: delivered TERMINATE" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# to test this code | |
# add your openai key to the config_list on autogen_chat.py | |
# run the following code in the same directory | |
# python main.py | |
# access http://localhost:8000 | |
# send the following message: | |
# send -> What is the status of my order? | |
# send -> order 111 | |
# send -> customer 222 | |
# the response should be Delivered | |
# send -> exit to end | |
# CTRL+C terminate the process | |
from fastapi import FastAPI, WebSocket, Request | |
from fastapi.responses import HTMLResponse | |
import uuid | |
from autogen_chat import AutogenChat | |
import asyncio | |
import uvicorn | |
from dotenv import load_dotenv, find_dotenv | |
import openai | |
import os | |
_ = load_dotenv(find_dotenv()) # read local .env file | |
openai.api_key = os.environ['OPENAI_API_KEY'] | |
# openai.log='debug' | |
app = FastAPI() | |
app.autogen_chat = {} | |
@app.get("/") | |
async def get(request: Request): | |
chat_id = str(uuid.uuid1()) | |
html = f""" | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Chat</title> | |
</head> | |
<body> | |
<h1>WebSocket Chat</h1> | |
<h2>Your ID: <span id="ws-id"></span></h2> | |
<form action="" onsubmit="sendMessage(event)"> | |
<input type="text" id="messageText" autocomplete="off"/> | |
<button>Send</button> | |
</form> | |
<ul id='messages'> | |
</ul> | |
<script> | |
function showMessage(msg) {{ | |
var messages = document.getElementById('messages') | |
var message = document.createElement('li') | |
var content = document.createTextNode(msg) | |
message.appendChild(content) | |
messages.appendChild(message) | |
}}; | |
var chat_id = "{chat_id}" | |
document.querySelector("#ws-id").textContent = chat_id; | |
var ws = new WebSocket("ws://localhost:8000/ws/{chat_id}"); | |
ws.onmessage = function(event) {{ | |
showMessage(event.data) | |
}}; | |
function sendMessage(event) {{ | |
var input = document.getElementById("messageText") | |
ws.send(input.value) | |
showMessage(input.value) | |
input.value = '' | |
event.preventDefault() | |
}} | |
</script> | |
</body> | |
</html> | |
""" | |
return HTMLResponse(html) | |
class ConnectionManager: | |
def __init__(self): | |
self.active_connections: list[AutogenChat] = [] | |
async def connect(self, autogen_chat: AutogenChat): | |
await autogen_chat.websocket.accept() | |
self.active_connections.append(autogen_chat) | |
async def disconnect(self, autogen_chat: AutogenChat): | |
autogen_chat.client_receive_queue.put_nowait("DO_FINISH") | |
print(f"autogen_chat {autogen_chat.chat_id} disconnected") | |
self.active_connections.remove(autogen_chat) | |
manager = ConnectionManager() | |
async def send_to_client(autogen_chat: AutogenChat): | |
while True: | |
reply = await autogen_chat.client_receive_queue.get() | |
if reply and reply == "DO_FINISH": | |
autogen_chat.client_receive_queue.task_done() | |
break | |
await autogen_chat.websocket.send_text(reply) | |
autogen_chat.client_receive_queue.task_done() | |
await asyncio.sleep(0.05) | |
async def receive_from_client(autogen_chat: AutogenChat): | |
while True: | |
data = await autogen_chat.websocket.receive_text() | |
if data and data == "DO_FINISH": | |
await autogen_chat.client_receive_queue.put("DO_FINISH") | |
await autogen_chat.client_sent_queue.put("DO_FINISH") | |
break | |
await autogen_chat.client_sent_queue.put(data) | |
await asyncio.sleep(0.05) | |
@app.websocket("/ws/{chat_id}") | |
async def websocket_endpoint(websocket: WebSocket, chat_id: str): | |
try: | |
autogen_chat = AutogenChat(chat_id=chat_id, websocket=websocket) | |
await manager.connect(autogen_chat) | |
data = await autogen_chat.websocket.receive_text() | |
future_calls = asyncio.gather(send_to_client(autogen_chat), receive_from_client(autogen_chat)) | |
await autogen_chat.start(data) | |
print("DO_FINISHED") | |
except Exception as e: | |
print("ERROR", str(e)) | |
finally: | |
try: | |
await manager.disconnect(autogen_chat) | |
except: | |
pass | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import autogen | |
from autogen import Agent, ConversableAgent | |
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union | |
try: | |
from termcolor import colored | |
except ImportError: | |
def colored(x, *args, **kwargs): | |
return x | |
class UserProxyWebAgent(autogen.UserProxyAgent): | |
def __init__(self, *args, **kwargs): | |
super(UserProxyWebAgent, self).__init__(*args, **kwargs) | |
self._reply_func_list = [] | |
self.register_reply([Agent, None], ConversableAgent.generate_oai_reply) | |
self.register_reply([Agent, None], ConversableAgent.generate_code_execution_reply) | |
self.register_reply([Agent, None], ConversableAgent.generate_function_call_reply) | |
self.register_reply([Agent, None], UserProxyWebAgent.a_check_termination_and_human_reply) | |
async def a_check_termination_and_human_reply( | |
self, | |
messages: Optional[List[Dict]] = None, | |
sender: Optional[Agent] = None, | |
config: Optional[Any] = None, | |
) -> Tuple[bool, Union[str, Dict, None]]: | |
"""Check if the conversation should be terminated, and if human reply is provided.""" | |
if config is None: | |
config = self | |
if messages is None: | |
messages = self._oai_messages[sender] | |
message = messages[-1] | |
reply = "" | |
no_human_input_msg = "" | |
if self.human_input_mode == "ALWAYS": | |
reply = await self.a_get_human_input( | |
f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: " | |
) | |
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" | |
# if the human input is empty, and the message is a termination message, then we will terminate the conversation | |
reply = reply if reply or not self._is_termination_msg(message) else "exit" | |
else: | |
if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]: | |
if self.human_input_mode == "NEVER": | |
reply = "exit" | |
else: | |
# self.human_input_mode == "TERMINATE": | |
terminate = self._is_termination_msg(message) | |
reply = await self.a_get_human_input( | |
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " | |
if terminate | |
else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: " | |
) | |
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" | |
# if the human input is empty, and the message is a termination message, then we will terminate the conversation | |
reply = reply if reply or not terminate else "exit" | |
elif self._is_termination_msg(message): | |
if self.human_input_mode == "NEVER": | |
reply = "exit" | |
else: | |
# self.human_input_mode == "TERMINATE": | |
reply = await self.a_get_human_input( | |
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " | |
) | |
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" | |
# if the human input is empty, and the message is a termination message, then we will terminate the conversation | |
reply = reply or "exit" | |
# print the no_human_input_msg | |
if no_human_input_msg: | |
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True) | |
# stop the conversation | |
if reply == "exit": | |
# reset the consecutive_auto_reply_counter | |
self._consecutive_auto_reply_counter[sender] = 0 | |
return True, None | |
# send the human reply | |
if reply or self._max_consecutive_auto_reply_dict[sender] == 0: | |
# reset the consecutive_auto_reply_counter | |
self._consecutive_auto_reply_counter[sender] = 0 | |
return True, reply | |
# increment the consecutive_auto_reply_counter | |
self._consecutive_auto_reply_counter[sender] += 1 | |
if self.human_input_mode != "NEVER": | |
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True) | |
return False, None | |
def set_queues(self, client_sent_queue, client_receive_queue): | |
self.client_sent_queue = client_sent_queue | |
self.client_receive_queue = client_receive_queue | |
async def a_get_human_input(self, prompt: str) -> str: | |
last_message = self.last_message() | |
if last_message["content"]: | |
await self.client_receive_queue.put(last_message["content"]) | |
reply = await self.client_sent_queue.get() | |
if reply and reply == "DO_FINISH": | |
return "exit" | |
return reply | |
else: | |
return | |
Check the full demo
Autogen with FastApi backend and React frontend
https://github.com/bonadio/autogenwebdemo
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a new version that uses only asyncio and no threads