I am fairly new to programming and need some guidance. Hoping to find some help here. I have an Autogen Group chat I am trying to connect to a react frontend via FastAPI and Websockets. I can get it to work and return the responses to the front end with just a User_proxy and one assistant. Like so:
autogen_chat.py:
import autogen
from user_proxy_webagent import UserProxyWebAgent
import asyncio
config_list = [
{
"model": "gpt-3.5-turbo",
}
]
llm_config = {
"model":"gpt-3.5-turbo-0613",
"temperature": 0,
"config_list": config_list,
}
class AutogenChat():
def __init__(self, chat_id=None, websocket=None):
self.websocket = websocket
self.chat_id = chat_id
self.client_sent_queue = asyncio.Queue()
self.client_receive_queue = asyncio.Queue()
self.assistant = autogen.AssistantAgent(
name="assistant",
llm_config=llm_config,
system_message="""You are a helpful assistant"""
)
self.user_proxy = UserProxyWebAgent(
name="user_proxy",
human_input_mode="ALWAYS",
max_consecutive_auto_reply=10,
is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
code_execution_config=False,
)
# add the queues to communicate
self.user_proxy.set_queues(self.client_sent_queue, self.client_receive_queue)
async def start(self, message):
await self.user_proxy.a_initiate_chat(
self.assistant,
clear_history=True,
message=message
)
user_proxy_webagent.py:
import autogen
from autogen import Agent, ConversableAgent
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
try:
from termcolor import colored
except ImportError:
def colored(x, *args, **kwargs):
return x
class UserProxyWebAgent(autogen.UserProxyAgent):
def __init__(self, *args, **kwargs):
super(UserProxyWebAgent, self).__init__(*args, **kwargs)
self._reply_func_list = []
self.register_reply([Agent, None], ConversableAgent.generate_oai_reply)
self.register_reply([Agent, None], ConversableAgent.generate_code_execution_reply)
self.register_reply([Agent, None], ConversableAgent.generate_function_call_reply)
self.register_reply([Agent, None], UserProxyWebAgent.a_check_termination_and_human_reply)
async def a_check_termination_and_human_reply(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
"""Check if the conversation should be terminated, and if human reply is provided."""
if config is None:
config = self
if messages is None:
messages = self._oai_messages[sender]
message = messages[-1]
reply = ""
no_human_input_msg = ""
if self.human_input_mode == "ALWAYS":
reply = await self.a_get_human_input(
f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not self._is_termination_msg(message) else "exit"
else:
if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]:
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
terminate = self._is_termination_msg(message)
reply = await self.a_get_human_input(
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
if terminate
else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not terminate else "exit"
elif self._is_termination_msg(message):
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
reply = await self.a_get_human_input(
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply or "exit"
# print the no_human_input_msg
if no_human_input_msg:
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
# stop the conversation
if reply == "exit":
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, None
# send the human reply
if reply or self._max_consecutive_auto_reply_dict[sender] == 0:
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, reply
# increment the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] += 1
if self.human_input_mode != "NEVER":
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
return False, None
def set_queues(self, client_sent_queue, client_receive_queue):
self.client_sent_queue = client_sent_queue
self.client_receive_queue = client_receive_queue
async def a_get_human_input(self, prompt: str) -> str:
last_message = self.last_message()
if last_message["content"]:
await self.client_receive_queue.put(last_message["content"])
reply = await self.client_sent_queue.get()
if reply and reply == "DO_FINISH":
return "exit"
return reply
else:
return
main.py:
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse
import uuid
from autogen_group_chat import AutogenChat
import asyncio
import uvicorn
from dotenv import load_dotenv, find_dotenv
import openai
import os
_ = load_dotenv(find_dotenv()) # read local .env file
openai.api_key = os.environ['OPENAI_API_KEY']
# openai.log='debug'
app = FastAPI()
app.autogen_chat = {}
class ConnectionManager:
def __init__(self):
self.active_connections: list[AutogenChat] = []
async def connect(self, autogen_chat: AutogenChat):
await autogen_chat.websocket.accept()
self.active_connections.append(autogen_chat)
async def disconnect(self, autogen_chat: AutogenChat):
autogen_chat.client_receive_queue.put_nowait("DO_FINISH")
print(f"autogen_chat {autogen_chat.chat_id} disconnected")
self.active_connections.remove(autogen_chat)
manager = ConnectionManager()
async def send_to_client(autogen_chat: AutogenChat):
while True:
reply = await autogen_chat.client_receive_queue.get()
if reply and reply == "DO_FINISH":
autogen_chat.client_receive_queue.task_done()
break
await autogen_chat.websocket.send_text(reply)
autogen_chat.client_receive_queue.task_done()
await asyncio.sleep(0.05)
async def receive_from_client(autogen_chat: AutogenChat):
while True:
data = await autogen_chat.websocket.receive_text()
if data and data == "DO_FINISH":
await autogen_chat.client_receive_queue.put("DO_FINISH")
await autogen_chat.client_sent_queue.put("DO_FINISH")
break
await autogen_chat.client_sent_queue.put(data)
await asyncio.sleep(0.05)
@app.websocket("/ws/{chat_id}")
async def websocket_endpoint(websocket: WebSocket, chat_id: str):
try:
autogen_chat = AutogenChat(chat_id=chat_id, websocket=websocket)
await manager.connect(autogen_chat)
data = await autogen_chat.websocket.receive_text()
future_calls = asyncio.gather(send_to_client(autogen_chat), receive_from_client(autogen_chat))
await autogen_chat.start(data)
print("DO_FINISHED")
except Exception as e:
print("ERROR", str(e))
finally:
try:
await manager.disconnect(autogen_chat)
except:
pass
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
But when I try to add the group chat the interface disconnects. I want it to return each response of all the Assisants one at a time providing all of them the ability to ask questions back to the user. I use the same files as above and just replace the autogen_chat.py with autogen_groupchat in the main.py file.
autogen_groupchat.py:
import autogen
from user_proxy_webagent import UserProxyWebAgent
import asyncio
config_list = [
{
"model": "gpt-3.5-turbo",
}
]
llm_config_assistant = {
"model":"gpt-3.5-turbo",
"temperature": 0,
"config_list": config_list,
}
llm_config_proxy = {
"model":"gpt-3.5-turbo-0613",
"temperature": 0,
"config_list": config_list,
}
class AutogenChat():
def __init__(self, chat_id=None, websocket=None):
self.websocket = websocket
self.chat_id = chat_id
self.client_sent_queue = asyncio.Queue()
self.client_receive_queue = asyncio.Queue()
self.diagnosis = autogen.AssistantAgent(
name="specialist",
llm_config=llm_config_assistant,
max_consecutive_auto_reply=0,
)
self.instructions = autogen.AssistantAgent(
name="instruction_writer",
llm_config=llm_config_assistant,
max_consecutive_auto_reply=0,
)
self.sourcing = autogen.AssistantAgent(
name="sourcing_agent",
llm_config=llm_config_assistant,
max_consecutive_auto_reply=0,
)
self.user_proxy = UserProxyWebAgent(
name="user_proxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
code_execution_config=False,
)
# add the queues to communicate
self.user_proxy.set_queues(self.client_sent_queue, self.client_receive_queue)
self.groupchat = autogen.GroupChat(agents=[self.diagnosis, self.instructions, self.sourcing], messages=[], max_round=20)
self.manager = autogen.GroupChatManager(groupchat=self.groupchat,
llm_config=llm_config_assistant,
human_input_mode="NEVER" )
async def start(self, message):
await self.user_proxy.a_initiate_chat(
self.manager,
clear_history=True,
message=message
)
Is this the best approach to accomplish this? If so can someone provide me some guidance on how to get this operating properly. Thank you!
For an alternative approach, consider monkey patching the
_print_received_messagein conversable_agent.py. For an in-depth tutorial on implementing this solution, refer to the video at this link: https://www.youtube.com/watch?v=dW-qr_ntOgc&t=173s