Created
January 13, 2024 09:37
-
-
Save Shulyaka/81284320c4f3999c176f9145aae9cd8d to your computer and use it in GitHub Desktop.
Interaction of two AI agents
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This is an example of a team work of two OpenAI Assistants. They have been given a task to review a test of a new employee candidate. The 'Reviewer' (aka Junior hiring manager) is given the instructions to review the test, and the 'Supervisor' (aka Senior hiring manager) is given the instructions to guide the Reviewer throug the process, to ask him to review section by section, and to check his work. User and Reviewer only talk to Supervisor and not to each other. | |
As of Jan 2024, it is not perfect and the AI Agents do make mistakes and still require human intervention to complete the task correctly. Howewer this approach significantly reduces the number of interventions required and provides better results because of self-check. The downside is the number of tokens required is several times greater than with a single agent approach. | |
The instructions are very custom and thus not included. But here is an example: | |
```supervisor_instructions.md | |
You are a senior hiring manager of a tech company. Your task is to supervise the work of a junior hiring manager. His task is to review the candidate's answers and rate each answer from 0 to 5 and provide a summarized feedback with total score for each section out of the max score. | |
The junior hiring manager sometimes makes errors. Sometimes he can't find the answers to the question when in fact it was provided by the candidate, sometimes he rates a wrong question, sometimes he only review part of the section he is asked to review and occasionally make other errors. Your task is to guide him to review all sections of the test one by one and then also ask him to provide a short summary as his final report. Don't stop until the final report is provied or an unrecoverable state detected. Check his response for mistakes and double check his findings. Check if all answers are reviewed. If he says the candidate did not provide the response for a question, double check yourself if the answer is really missing. If the junior hiring manager makes a mistake, ask him to correct it and guide him step-by-step how to do it until he does or until you verify there was no error. Quote the candidate's answer if needed. Don't rate the answers yourself, only verify the work of the junior hiring manager. | |
Use the appropriate function to talk to the junior hiring manager instead of asking the user. | |
He is given the same file with the candidate's answers as you. | |
``` | |
```reviewer_instructions.md | |
You are a hiring manager of a tech company. Your task is to review the candidate's answers and rate each answer from 0 to 5 and provide a summarized feedback with total score for each section out of the max score. Rate each question independently and honestly. You do not need to insert references to the document. If you find a sign of cheating, please include it into the feedback. | |
When in doubt, or you find that some background information is missing, you can ask for help from the user, who is your supervisor and a senior hiring manager, but don't take his opinion for granted and verify what he tells you. Be careful to not mistake the answer with part of the question. | |
<<< job description >>> | |
Please start by reviwing each section one by one, and once finished, provide a short summary as the final report: candidate name, the results of each section, the strongest points of the candidate, the weakest points of the candidate, and whether we should invite him for an online interview. | |
Here are some notes on the questions, use them during the review: | |
<<< Some guides on how to assess each question >>> | |
``` | |
""" | |
import logging | |
from time import sleep | |
from json import dumps as json_dumps, loads as json_loads | |
from pathlib import Path | |
import argparse | |
import openai | |
LOGGER = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger("httpx").setLevel(logging.WARNING) | |
LOGGER.setLevel(logging.DEBUG) | |
model = "gpt-4-1106-preview" | |
supervisor_agent_name = "GPT TQ Reviewer Supervisor" | |
supervisor_message = "Please find the test of a new candidate. Please ask the Reviewer to review it and then obtain the final report." | |
reviewer_agent_name = "GPT TQ Reviewer" | |
reviewer_message = "Please find the test of a new candidate." | |
class RateLimitExceeded(RuntimeError): | |
pass | |
class ServerError(RuntimeError): | |
pass | |
RESET = "\x1b[0m" | |
BOLD = "\x1b[1m" | |
GREEN = "\x1b[32;20m" | |
GREEN_BOLD = "\x1b[32;1m" | |
BLUE = "\x1b[34;20m" | |
PURPLE = "\x1b[35;20m" | |
LBLUE = "\x1b[36;20m" | |
LBLUE_BOLD = "\x1b[36;1m" | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--filename", | |
help="PDF file to review", | |
) | |
parser.add_argument( | |
"--continue_last_session", | |
type=bool, | |
help="Continue last session", | |
) | |
parser.add_argument( | |
"--supervisor_agent_id", | |
help="Supervisor assistant ID", | |
) | |
parser.add_argument( | |
"--reviewer_agent_id", | |
help="Reviewer assistant ID", | |
) | |
parser.add_argument( | |
"--supervisor_thread_id", | |
help="Supervisor thread ID", | |
) | |
parser.add_argument( | |
"--reviewer_thread_id", | |
help="Reviewer thread ID", | |
) | |
args = parser.parse_args() | |
reviewer_agent_id = args.reviewer_agent_id | |
supervisor_agent_id = args.supervisor_agent_id | |
supervisor_thread_id = args.supervisor_thread_id | |
reviewer_thread_id = args.reviewer_thread_id | |
client = openai.OpenAI() | |
# Get or create Supervisor agent | |
supervisor_agent = None | |
if supervisor_agent_id is None: | |
assistants = client.beta.assistants.list() | |
while supervisor_agent_id is None: | |
for assistant in assistants: | |
if assistant.name == supervisor_agent_name: | |
supervisor_agent = assistant | |
supervisor_agent_id = assistant.id | |
break | |
if assistants.has_next_page(): | |
assistants = assistants.get_next_page() | |
else: | |
break | |
if supervisor_agent_id is not None and supervisor_agent is None: | |
try: | |
supervisor_agent = client.beta.assistants.retrieve( | |
assistant_id=supervisor_agent_id | |
) | |
except Exception as e: | |
LOGGER.warning(str(e)) | |
supervisor_agent_id = None | |
if supervisor_agent_id is None: | |
supervisor_agent = client.beta.assistants.create( | |
model=model, | |
instructions=Path("supervisor_instructions.md").read_text(), | |
name=supervisor_agent_name, | |
tools=[ | |
{"type": "retrieval"}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "send_message", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"message": { | |
"type": "string", | |
"description": "The message text to send", | |
} | |
}, | |
"required": ["message"], | |
}, | |
"description": "Use this function to communicate to the junior hiring manager, write him a message and get a response", | |
}, | |
}, | |
], | |
) | |
supervisor_agent_id = supervisor_agent.id | |
# Get or create Reviewer agent | |
reviewer_agent = None | |
if reviewer_agent_id is None: | |
reviewer_agent_id = supervisor_agent.metadata.get("reviewer_agent_id") | |
if reviewer_agent_id is not None and reviewer_agent is None: | |
try: | |
reviewer_agent = client.beta.assistants.retrieve(assistant_id=reviewer_agent_id) | |
except Exception as e: | |
LOGGER.warning(str(e)) | |
reviewer_agent_id = None | |
if reviewer_agent_id is None: | |
assistants = client.beta.assistants.list() | |
while reviewer_agent_id is None: | |
for assistant in assistants: | |
if assistant.name == reviewer_agent_name: | |
reviewer_agent = assistant | |
reviewer_agent_id = assistant.id | |
break | |
if assistants.has_next_page(): | |
assistants = assistants.get_next_page() | |
else: | |
break | |
if reviewer_agent_id is not None and reviewer_agent is None: | |
try: | |
reviewer_agent = client.beta.assistants.retrieve(assistant_id=reviewer_agent_id) | |
except Exception as e: | |
LOGGER.warning(str(e)) | |
reviewer_agent_id = None | |
if reviewer_agent_id is None: | |
reviewer_agent = client.beta.assistants.create( | |
model=model, | |
instructions=Path("reviewer_instructions.md").read_text(), | |
name=reviewer_agent_name, | |
tools=[{"type": "retrieval"}], | |
) | |
reviewer_agent_id = reviewer_agent.id | |
metadata = supervisor_agent.metadata | |
if metadata.get("reviewer_agent_id") != reviewer_agent_id: | |
metadata.update({"reviewer_agent_id": reviewer_agent_id}) | |
supervisor_agent = client.beta.assistants.update( | |
assistant_id=supervisor_agent_id, metadata=metadata | |
) | |
# Get last supervisor thread | |
supervisor_thread = None | |
if supervisor_thread_id is None: | |
supervisor_thread_id = supervisor_agent.metadata.get("supervisor_thread_id") | |
if supervisor_thread_id is not None: | |
try: | |
supervisor_thread = client.beta.threads.retrieve(thread_id=supervisor_thread_id) | |
except Exception as e: | |
LOGGER.warning(str(e)) | |
supervisor_thread_id = None | |
# Get last reviewer thread | |
reviewer_thread = None | |
if reviewer_thread_id is None and supervisor_thread is not None: | |
reviewer_thread_id = supervisor_thread.metadata.get("reviewer_thread_id") | |
if reviewer_thread_id is not None: | |
try: | |
reviewer_thread = client.beta.threads.retrieve(thread_id=reviewer_thread_id) | |
except Exception as e: | |
LOGGER.warning(str(e)) | |
supervisor_thread_id = None | |
# Create new threads | |
if args.continue_last_session: | |
if supervisor_thread is None or reviewer_thread is None: | |
raise RuntimeError("Cannot find the thread ID to continue the last session") | |
else: | |
file_id = None | |
if supervisor_thread is not None: | |
file_id = supervisor_thread.metadata.get("file_id") | |
if file_id is not None: | |
try: | |
client.files.delete(file_id=file_id) | |
except Exception: | |
pass | |
client.beta.threads.delete(thread_id=supervisor_thread.id) | |
if reviewer_thread is not None: | |
client.beta.threads.delete(thread_id=reviewer_thread.id) | |
file = client.files.create(file=open(args.filename, "rb"), purpose="assistants") | |
reviewer_thread = client.beta.threads.create( | |
messages=[ | |
openai.types.beta.thread_create_params.Message( | |
content=reviewer_message, | |
role="user", | |
file_ids=[file.id], | |
) | |
] | |
) | |
supervisor_thread = client.beta.threads.create( | |
messages=[ | |
openai.types.beta.thread_create_params.Message( | |
content=supervisor_message, | |
role="user", | |
file_ids=[file.id], | |
) | |
], | |
metadata={"reviewer_thread_id": reviewer_thread.id, "file_id": file.id}, | |
) | |
metadata = supervisor_agent.metadata | |
metadata.update({"supervisor_thread_id": supervisor_thread.id}) | |
supervisor_agent = client.beta.assistants.update( | |
assistant_id=supervisor_agent_id, metadata=metadata | |
) | |
LOGGER.debug( | |
"Supervisor debug URL: https://platform.openai.com/playground?assistant=%s&mode=assistant&thread=%s", | |
supervisor_agent_id, | |
supervisor_thread.id, | |
) | |
LOGGER.debug( | |
"Reviewer debug URL: https://platform.openai.com/playground?assistant=%s&mode=assistant&thread=%s", | |
reviewer_agent_id, | |
reviewer_thread.id, | |
) | |
if args.continue_last_session: | |
messages = client.beta.assistants.list(thread_id=supervisor_thread.id) | |
while True: | |
for message in messages: | |
LOGGER.info(message) | |
if messages.has_next_page(): | |
messages = messages.get_next_page() | |
else: | |
break | |
else: | |
LOGGER.info( | |
"User to Reviewer:\nPlease find the test of a new candidate. [File ID %s]", | |
file.id, | |
) | |
LOGGER.info( | |
"User to Supervisor:\nPlease find the test of a new candidate. Please ask the junior hiring manager to review it and then obtain the final report. [File ID %s]", | |
file.id, | |
) | |
# Run the supervisor thread | |
supervisor_run = client.beta.threads.runs.create( | |
thread_id=supervisor_thread.id, | |
assistant_id=supervisor_agent_id, | |
) | |
while True: | |
supervisor_logged_steps = [] | |
while True: | |
supervisor_run = client.beta.threads.runs.retrieve( | |
thread_id=supervisor_thread.id, run_id=supervisor_run.id | |
) | |
steps = [ | |
step | |
for step in client.beta.threads.runs.steps.list( | |
thread_id=supervisor_thread.id, run_id=supervisor_run.id | |
) | |
if step.status == "completed" and step.id not in supervisor_logged_steps | |
] | |
supervisor_logged_steps.extend([step.id for step in steps]) | |
for step in steps: | |
if step.type == "message_creation": | |
message = client.beta.threads.messages.retrieve( | |
thread_id=supervisor_thread.id, | |
message_id=step.step_details.message_creation.message_id, | |
) | |
if ( | |
message.role == "assistant" | |
and message.content[0].type == "text" | |
and message.content[0].text.value | |
): | |
LOGGER.info( | |
"Supervisor to User:\n%s%s%s", | |
GREEN_BOLD, | |
message.content[0].text.value, | |
RESET, | |
) | |
elif step.type == "tool_calls": | |
for tool_call in step.step_details.tool_calls: | |
if ( | |
tool_call["type"] | |
if isinstance(tool_call, dict) | |
else tool_call.type | |
) == "retrieval": | |
LOGGER.info( | |
"%s*Supervisor is checking the file*%s", PURPLE, RESET | |
) | |
if supervisor_run.status in ("queued", "in_progress"): | |
sleep(3) | |
continue | |
elif ( | |
supervisor_run.status == "requires_action" | |
and supervisor_run.required_action.type == "submit_tool_outputs" | |
): | |
tool_outputs = [] | |
for ( | |
supervisor_tool_call | |
) in supervisor_run.required_action.submit_tool_outputs.tool_calls: | |
assert supervisor_tool_call.type == "function" | |
assert supervisor_tool_call.function.name == "send_message" | |
# Supervisor sends message to Reviewer, run the reviewer thread | |
message = json_loads(supervisor_tool_call.function.arguments)["message"] | |
LOGGER.info("Supervisor to Reviewer:\n%s%s%s", LBLUE, message, RESET) | |
client.beta.threads.messages.create( | |
thread_id=reviewer_thread.id, | |
role="user", | |
content=message, | |
) | |
reviewer_run = client.beta.threads.runs.create( | |
thread_id=reviewer_thread.id, | |
assistant_id=reviewer_agent_id, | |
) | |
reviewer_logged_steps = [] | |
messages = [] | |
while reviewer_run.status in ("queued", "in_progress"): | |
sleep(3) | |
reviewer_run = client.beta.threads.runs.retrieve( | |
thread_id=reviewer_thread.id, | |
run_id=reviewer_run.id, | |
) | |
steps = [ | |
step | |
for step in client.beta.threads.runs.steps.list( | |
thread_id=reviewer_thread.id, run_id=reviewer_run.id | |
) | |
if step.status == "completed" | |
and step.id not in reviewer_logged_steps | |
] | |
reviewer_logged_steps.extend([step.id for step in steps]) | |
for step in steps: | |
if step.type == "message_creation": | |
message = client.beta.threads.messages.retrieve( | |
thread_id=reviewer_thread.id, | |
message_id=step.step_details.message_creation.message_id, | |
) | |
if ( | |
message.role == "assistant" | |
and message.content[0].type == "text" | |
and message.content[0].text.value | |
): | |
LOGGER.info( | |
"Reviewer to Supervisor:\n%s%s%s", | |
LBLUE_BOLD, | |
message.content[0].text.value, | |
RESET, | |
) | |
messages.append(message.content[0].text.value) | |
elif step.type == "tool_calls": | |
for tool_call in step.step_details.tool_calls: | |
if ( | |
tool_call["type"] | |
if isinstance(tool_call, dict) | |
else tool_call.type | |
) == "retrieval": | |
LOGGER.info( | |
"%s*Reviewer is checking the file*%s", | |
BLUE, | |
RESET, | |
) | |
if reviewer_run.status != "completed": | |
if reviewer_run.last_error: | |
if reviewer_run.last_error.code == "rate_limit_exceeded": | |
raise RateLimitExceeded(reviewer_run.last_error.message) | |
elif reviewer_run.last_error.code == "server_error": | |
raise ServerError(reviewer_run.last_error.message) | |
raise RuntimeError(str(reviewer_run.last_error)) | |
raise RuntimeError("reviewer_run status = " + reviewer_run.status) | |
# Save Reviewer respose to Supervisor | |
tool_outputs.append( | |
openai.types.beta.threads.run_submit_tool_outputs_params.ToolOutput( | |
tool_call_id=supervisor_tool_call.id, | |
output=json_dumps( | |
{ | |
"response": messages | |
if len(messages) != 1 | |
else messages[0] | |
} | |
), | |
) | |
) | |
client.beta.threads.runs.submit_tool_outputs( | |
thread_id=supervisor_thread.id, | |
run_id=supervisor_run.id, | |
tool_outputs=tool_outputs, | |
) | |
elif supervisor_run.status == "completed": | |
break | |
else: | |
if supervisor_run.last_error: | |
if supervisor_run.last_error.code == "rate_limit_exceeded": | |
raise RateLimitExceeded(supervisor_run.last_error.message) | |
elif supervisor_run.last_error.code == "server_error": | |
raise ServerError(supervisor_run.last_error.message) | |
raise RuntimeError(str(supervisor_run.last_error)) | |
raise RuntimeError("supervisor_run status = " + supervisor_run.status) | |
LOGGER.info("User to Supervisor:") | |
message = input("> ") | |
if not message: | |
break | |
client.beta.threads.messages.create( | |
thread_id=supervisor_thread.id, | |
role="user", | |
content=message, | |
) | |
supervisor_run = client.beta.threads.runs.create( | |
thread_id=supervisor_thread.id, | |
assistant_id=supervisor_agent_id, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment