Skip to content

Commit 91a7188

Browse files
amith-ajithekzhu
andauthored
Add example using autogen-core and FastAPI for handoff multi-agent design pattern with streaming and UI (#6391)
## Why are these changes needed? This PR adds an example which demonstrates how to build a streaming chat API with multi-turn conversation history and a simple web UI for handoff multi-agent design pattern. --------- Co-authored-by: Eric Zhu <[email protected]>
1 parent d96aaeb commit 91a7188

File tree

13 files changed

+980
-0
lines changed

13 files changed

+980
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
model_config.yaml
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# AutoGen-Core Streaming Chat with Multi-Agent Handoffs via FastAPI
2+
3+
This sample demonstrates how to build a streaming chat API featuring multi-agent handoffs and persistent conversation history using `autogen-core` and FastAPI. For more details on the handoff pattern, see the [AutoGen documentation](https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/design-patterns/handoffs.html).
4+
5+
Inspired by `@ToryPan`'s example for streaming with Core API.
6+
7+
## Key Features
8+
9+
1. **Streaming Response**: Implements real-time streaming of agent responses using FastAPI's `StreamingResponse`, `autogen-core`'s asynchronous features, and an `asyncio.Queue` to manage the data stream.
10+
2. **Multi-Agent Handoffs**: Showcases a system where different agents (Triage, Sales, Issues & Repairs) handle specific parts of a conversation, using tools (`delegate_tools`) to transfer the conversation between agents based on the context.
11+
3. **Persistent Multi-Turn Conversation**: Agents receive and process conversation history, enabling context-aware interactions. History is saved per conversation ID in JSON files within the `chat_history` directory, allowing conversations to resume across sessions.
12+
4. **Simple Web UI**: Includes a basic web interface (served via FastAPI's static files) for easy interaction with the chat system directly from a browser.
13+
14+
## File Structure
15+
16+
* `app.py`: Main FastAPI application code, including API endpoints, agent definitions, runtime setup, handoff logic, and streaming.
17+
* `agent_user.py`: Defines the `UserAgent` responsible for interacting with the human user and saving chat history.
18+
* `agent_base.py`: Defines the base `AIAgent` class used by specialized agents.
19+
* `models.py`: Contains data models used for communication (e.g., `UserTask`, `AgentResponse`).
20+
* `topics.py`: Defines topic types used for routing messages between agents.
21+
* `tools.py`: Defines tools that agents can execute (e.g., `execute_order_tool`).
22+
* `tools_delegate.py`: Defines tools specifically for delegating/transferring the conversation to other agents.
23+
* `README.md`: (This document) Project introduction and usage instructions.
24+
* `static/`: Contains static files for the web UI (e.g., `index.html`).
25+
* `model_config_template.yaml`: Template for the model configuration file.
26+
27+
## Installation
28+
29+
First, ensure you have Python installed (recommended 3.8 or higher). Then, install the necessary libraries:
30+
31+
```bash
32+
pip install "fastapi" "uvicorn[standard]" "autogen-core" "autogen-ext[openai]" "PyYAML"
33+
```
34+
35+
## Configuration
36+
37+
Create a new file named `model_config.yaml` in the same directory as this README file to configure your language model settings (e.g., Azure OpenAI details). Use `model_config_template.yaml` as a starting point.
38+
39+
**Note**: For production, manage API keys securely using environment variables or other secrets management tools instead of hardcoding them in the configuration file.
40+
41+
## Running the Application
42+
43+
In the directory containing `app.py`, run the following command to start the FastAPI application:
44+
45+
```bash
46+
uvicorn app:app --host 0.0.0.0 --port 8501 --reload
47+
```
48+
49+
The application includes a simple web interface. After starting the server, navigate to `http://localhost:8501` in your browser.
50+
51+
The API endpoint for chat completions will be available at `http://localhost:8501/chat/completions`.
52+
53+
## Using the API
54+
55+
You can interact with the agent system by sending a POST request to the `/chat/completions` endpoint. The request body must be in JSON format and contain a `message` field (the user's input) and a `conversation_id` field to track the chat session.
56+
57+
**Request Body Format**:
58+
59+
```json
60+
{
61+
"message": "I need refund for a product.",
62+
"conversation_id": "user123_session456"
63+
}
64+
```
65+
66+
**Example (using curl)**:
67+
68+
```bash
69+
curl -N -X POST http://localhost:8501/chat/completions \
70+
-H "Content-Type: application/json" \
71+
-d '{
72+
"message": "Hi, I bought a rocket-powered unicycle and it exploded.",
73+
"conversation_id": "wile_e_coyote_1"
74+
}'
75+
```
76+
77+
**Example (using Python requests)**:
78+
79+
```python
80+
import requests
81+
import json
82+
import uuid
83+
84+
url = "http://localhost:8501/chat/completions"
85+
conversation_id = f"conv-id" # Generate a unique conversation ID for a different session.
86+
87+
def send_message(message_text):
88+
data = {
89+
'message': message_text,
90+
'conversation_id': conversation_id
91+
}
92+
headers = {'Content-Type': 'application/json'}
93+
try:
94+
print(f"\n>>> User: {message_text}")
95+
print("<<< Assistant: ", end="", flush=True)
96+
response = requests.post(url, json=data, headers=headers, stream=True)
97+
response.raise_for_status()
98+
full_response = ""
99+
for chunk in response.iter_content(chunk_size=None):
100+
if chunk:
101+
try:
102+
# Decode the chunk
103+
chunk_str = chunk.decode('utf-8')
104+
# Handle potential multiple JSON objects in a single chunk
105+
for line in chunk_str.strip().split('\n'):
106+
if line:
107+
data = json.loads(line)
108+
# Check the new structure
109+
if 'content' in data and isinstance(data['content'], dict) and 'message' in data['content']:
110+
message_content = data['content']['message']
111+
message_type = data['content'].get('type', 'string') # Default to string if type is missing
112+
113+
# Print based on type (optional, could just print message_content)
114+
if message_type == 'function':
115+
print(f"[{message_type.upper()}] {message_content}", end='\n', flush=True) # Print function calls on new lines for clarity
116+
print("<<< Assistant: ", end="", flush=True) # Reprint prefix for next string part
117+
else:
118+
print(message_content, end='', flush=True)
119+
120+
full_response += message_content # Append only the message part
121+
else:
122+
print(f"\nUnexpected chunk format: {line}")
123+
124+
except json.JSONDecodeError:
125+
print(f"\nError decoding chunk/line: '{line if 'line' in locals() else chunk_str}'")
126+
127+
print("\n--- End of Response ---")
128+
return full_response
129+
130+
except requests.exceptions.RequestException as e:
131+
print(f"\nError: {e}")
132+
except Exception as e:
133+
print(f"\nAn unexpected error occurred: {e}")
134+
135+
# Start conversation
136+
send_message("I want refund")
137+
# Continue conversation (example)
138+
# send_message("I want the rocket my friend Amith bought.")
139+
# send_message("They are the SpaceX 3000s")
140+
# send_message("That sounds great, I'll take it!")
141+
# send_message("Yes, I agree to the price and the caveat.")
142+
143+
144+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import json
2+
from typing import List, Tuple
3+
4+
from autogen_core import (
5+
FunctionCall,
6+
MessageContext,
7+
RoutedAgent,
8+
TopicId,
9+
message_handler,
10+
)
11+
from autogen_core.models import (
12+
AssistantMessage,
13+
ChatCompletionClient,
14+
FunctionExecutionResult,
15+
FunctionExecutionResultMessage,
16+
SystemMessage
17+
)
18+
from autogen_core.tools import Tool
19+
from models import UserTask,AgentResponse
20+
import asyncio
21+
22+
23+
24+
class AIAgent(RoutedAgent):
25+
def __init__(
26+
self,
27+
description: str,
28+
system_message: SystemMessage,
29+
model_client: ChatCompletionClient,
30+
tools: List[Tool],
31+
delegate_tools: List[Tool],
32+
agent_topic_type: str,
33+
user_topic_type: str,
34+
response_queue : asyncio.Queue[str | object]
35+
) -> None:
36+
super().__init__(description)
37+
self._system_message = system_message
38+
self._model_client = model_client
39+
self._tools = dict([(tool.name, tool) for tool in tools])
40+
self._tool_schema = [tool.schema for tool in tools]
41+
self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
42+
self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
43+
self._agent_topic_type = agent_topic_type
44+
self._user_topic_type = user_topic_type
45+
self._response_queue = response_queue
46+
47+
@message_handler
48+
async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
49+
# Start streaming LLM responses
50+
llm_stream = self._model_client.create_stream(
51+
messages=[self._system_message] + message.context,
52+
tools=self._tool_schema + self._delegate_tool_schema,
53+
cancellation_token=ctx.cancellation_token
54+
)
55+
final_response = None
56+
async for chunk in llm_stream:
57+
if isinstance(chunk, str):
58+
await self._response_queue.put({'type': "string", 'message': chunk})
59+
else:
60+
final_response = chunk
61+
assert final_response is not None, "No response from model"
62+
print(f"{'-'*80}\n{self.id.type}:\n{final_response.content}", flush=True)
63+
# Process the LLM result.
64+
while isinstance(final_response.content, list) and all(isinstance(m, FunctionCall) for m in final_response.content):
65+
tool_call_results: List[FunctionExecutionResult] = []
66+
delegate_targets: List[Tuple[str, UserTask]] = []
67+
# Process each function call.
68+
for call in final_response.content:
69+
arguments = json.loads(call.arguments)
70+
await self._response_queue.put({"type":"function","message":f"Executing {call.name}"})
71+
if call.name in self._tools:
72+
# Execute the tool directly.
73+
result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
74+
result_as_str = self._tools[call.name].return_value_as_string(result)
75+
tool_call_results.append(
76+
FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name)
77+
)
78+
elif call.name in self._delegate_tools:
79+
# Execute the tool to get the delegate agent's topic type.
80+
result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
81+
topic_type = self._delegate_tools[call.name].return_value_as_string(result)
82+
# Create the context for the delegate agent, including the function call and the result.
83+
delegate_messages = list(message.context) + [
84+
AssistantMessage(content=[call], source=self.id.type),
85+
FunctionExecutionResultMessage(
86+
content=[
87+
FunctionExecutionResult(
88+
call_id=call.id,
89+
content=f"Transferred to {topic_type}. Adopt persona immediately.",
90+
is_error=False,
91+
name=call.name,
92+
)
93+
]
94+
),
95+
]
96+
delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
97+
else:
98+
raise ValueError(f"Unknown tool: {call.name}")
99+
if len(delegate_targets) > 0:
100+
# Delegate the task to other agents by publishing messages to the corresponding topics.
101+
for topic_type, task in delegate_targets:
102+
print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True)
103+
await self._response_queue.put({"type":"function","message":f"You are now talking to {topic_type}"})
104+
await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key))
105+
if len(tool_call_results) > 0:
106+
print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
107+
# Make another LLM call with the results.
108+
message.context.extend([
109+
AssistantMessage(content=final_response.content, source=self.id.type),
110+
FunctionExecutionResultMessage(content=tool_call_results),
111+
])
112+
llm_stream = self._model_client.create_stream(
113+
messages=[self._system_message] + message.context,
114+
tools=self._tool_schema + self._delegate_tool_schema,
115+
cancellation_token=ctx.cancellation_token
116+
)
117+
final_response = None
118+
async for chunk in llm_stream:
119+
if isinstance(chunk, str):
120+
await self._response_queue.put({'type': 'string', 'message': chunk})
121+
else:
122+
final_response = chunk
123+
assert final_response is not None, "No response from model"
124+
print(f"{'-'*80}\n{self.id.type}:\n{final_response.content}", flush=True)
125+
else:
126+
# The task has been delegated, so we are done.
127+
return
128+
# The task has been completed, publish the final result.
129+
assert isinstance(final_response.content, str)
130+
message.context.append(AssistantMessage(content=final_response.content, source=self.id.type))
131+
await self.publish_message(
132+
AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
133+
topic_id=TopicId(self._user_topic_type, source=self.id.key),
134+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from autogen_core import (
2+
MessageContext,
3+
RoutedAgent,
4+
message_handler,
5+
)
6+
7+
from autogen_core.model_context import BufferedChatCompletionContext
8+
9+
from models import AgentResponse
10+
import asyncio
11+
import json
12+
import os
13+
14+
15+
16+
class UserAgent(RoutedAgent):
17+
def __init__(self,
18+
description: str,
19+
user_topic_type: str,
20+
agent_topic_type: str,
21+
response_queue : asyncio.Queue[str | object],
22+
stream_done : object) -> None:
23+
super().__init__(description)
24+
self._user_topic_type = user_topic_type
25+
self._agent_topic_type = agent_topic_type
26+
self._response_queue = response_queue
27+
self._STREAM_DONE = stream_done
28+
29+
@message_handler
30+
async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None:
31+
#Save chat history
32+
context = BufferedChatCompletionContext(buffer_size=10,initial_messages=message.context)
33+
save_context = await context.save_state()
34+
# Save context to JSON file
35+
chat_history_dir = "chat_history"
36+
if ctx.topic_id is None:
37+
raise ValueError("MessageContext.topic_id is None, cannot save chat history")
38+
file_path = os.path.join(chat_history_dir, f"history-{ctx.topic_id.source}.json")
39+
with open(file_path, 'w') as f:
40+
json.dump(save_context, f, indent=4)
41+
42+
#End stream
43+
await self._response_queue.put(self._STREAM_DONE)
44+

0 commit comments

Comments
 (0)