Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions python/semantic_kernel/agents/orchestration/group_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ class GroupChatResponseMessage(KernelBaseModel):
body: ChatMessageContent


@experimental
class GroupChatHistorySyncMessage(KernelBaseModel):
"""A message type to synchronize chat history from manager to agents."""

history: list[ChatMessageContent]


_TGroupChatManagerResult = TypeVar("_TGroupChatManagerResult", ChatMessageContent, str, bool)


Expand Down Expand Up @@ -119,6 +126,14 @@ async def _handle_response_message(self, message: GroupChatResponseMessage, ctx:
logger.debug(f"{self.id}: Received group chat response message.")
self._message_cache.add_message(message.body)

@message_handler
async def _handle_history_sync_message(self, message: GroupChatHistorySyncMessage, ctx: MessageContext) -> None:
"""Handle chat history synchronization from the manager."""
logger.debug(f"{self.id}: Received group chat history sync message with {len(message.history)} messages.")
self._message_cache.clear()
for msg in message.history:
self._message_cache.add_message(msg)

@message_handler
async def _handle_request_message(self, message: GroupChatRequestMessage, ctx: MessageContext) -> None:
if message.agent_name != self._agent.name:
Expand Down Expand Up @@ -259,6 +274,7 @@ def __init__(
participant_descriptions: dict[str, str],
exception_callback: Callable[[BaseException], None],
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
chat_history: ChatHistory | None = None,
):
"""Initialize the group chat manager actor.

Expand All @@ -268,40 +284,74 @@ def __init__(
participant_descriptions (dict[str, str]): The descriptions of the participants in the group chat.
exception_callback (Callable[[BaseException], None]): A function that is called when an exception occurs.
result_callback (Callable | None): A function that is called when the group chat manager produces a result.
chat_history (ChatHistory | None): A chat history instance to use. If None, creates a new ChatHistory.
This can be a ChatHistoryReducer subclass for context management.
"""
self._manager = manager
self._internal_topic_type = internal_topic_type
self._chat_history = ChatHistory()
self._chat_history = chat_history if chat_history is not None else ChatHistory()
self._participant_descriptions = participant_descriptions
self._result_callback = result_callback

super().__init__("An actor for the group chat manager.", exception_callback)

async def _sync_history_to_agents(self, cancellation_token: CancellationToken) -> None:
"""Synchronize the manager's chat history to all agents."""
# FIXED: Publish sync message and add small delay to ensure processing completes
# before request message is published (avoids race condition in concurrent handlers)
await self.publish_message(
GroupChatHistorySyncMessage(history=self._chat_history.messages),
TopicId(self._internal_topic_type, self.id.key),
cancellation_token=cancellation_token,
)

# Small delay to allow sync message processing to complete
# This ensures agent caches are populated before request processing
await asyncio.sleep(0.01) # 10ms should be sufficient for message processing

@message_handler
async def _handle_start_message(self, message: GroupChatStartMessage, ctx: MessageContext) -> None:
"""Handle the start message for the group chat."""
logger.debug(f"{self.id}: Received group chat start message.")
if isinstance(message.body, ChatMessageContent):
self._chat_history.add_message(message.body)
# Use add_message_async for reducers, fallback to add_message for regular ChatHistory
if hasattr(self._chat_history, "add_message_async"):
await self._chat_history.add_message_async(message.body)
else:
self._chat_history.add_message(message.body)
elif isinstance(message.body, list) and all(isinstance(m, ChatMessageContent) for m in message.body):
for m in message.body:
self._chat_history.add_message(m)
if hasattr(self._chat_history, "add_message_async"):
await self._chat_history.add_message_async(m)
else:
self._chat_history.add_message(m)
else:
raise ValueError(f"Invalid message body type: {type(message.body)}. Expected {DefaultTypeAlias}.")

# Sync history to agents after adding messages
await self._sync_history_to_agents(ctx.cancellation_token)
await self._determine_state_and_take_action(ctx.cancellation_token)

@message_handler
async def _handle_response_message(self, message: GroupChatResponseMessage, ctx: MessageContext) -> None:
if message.body.role != AuthorRole.USER:
self._chat_history.add_message(
ChatMessageContent(
role=AuthorRole.USER,
content=f"Transferred to {message.body.name}",
)
transfer_message = ChatMessageContent(
role=AuthorRole.USER,
content=f"Transferred to {message.body.name}",
)
self._chat_history.add_message(message.body)
if hasattr(self._chat_history, "add_message_async"):
await self._chat_history.add_message_async(transfer_message)
else:
self._chat_history.add_message(transfer_message)

# Add the main message
if hasattr(self._chat_history, "add_message_async"):
await self._chat_history.add_message_async(message.body)
else:
self._chat_history.add_message(message.body)

# Sync history to agents after adding messages
await self._sync_history_to_agents(ctx.cancellation_token)
await self._determine_state_and_take_action(ctx.cancellation_token)

@ActorBase.exception_handler
Expand All @@ -314,7 +364,13 @@ async def _determine_state_and_take_action(self, cancellation_token: Cancellatio
if should_request_user_input.result and self._manager.human_response_function:
logger.debug(f"Group chat manager requested user input. Reason: {should_request_user_input.reason}")
user_input_message = await self._call_human_response_function()
self._chat_history.add_message(user_input_message)
# Add user input message with proper async handling for reducers
if hasattr(self._chat_history, "add_message_async"):
await self._chat_history.add_message_async(user_input_message)
else:
self._chat_history.add_message(user_input_message)
# Sync history to agents after adding user input
await self._sync_history_to_agents(cancellation_token)
await self.publish_message(
GroupChatResponseMessage(body=user_input_message),
TopicId(self._internal_topic_type, self.id.key),
Expand Down Expand Up @@ -377,6 +433,7 @@ def __init__(
agent_response_callback: Callable[[DefaultTypeAlias], Awaitable[None] | None] | None = None,
streaming_agent_response_callback: Callable[[StreamingChatMessageContent, bool], Awaitable[None] | None]
| None = None,
chat_history: ChatHistory | None = None,
) -> None:
"""Initialize the group chat orchestration.

Expand All @@ -392,8 +449,11 @@ def __init__(
by the agents.
streaming_agent_response_callback (Callable | None): A function that is called when a streaming response
is produced by the agents.
chat_history (ChatHistory | None): A chat history instance to use for the manager. If None, creates a new
ChatHistory. This can be a ChatHistoryReducer subclass for context management.
"""
self._manager = manager
self._chat_history = chat_history

for member in members:
if member.description is None:
Expand Down Expand Up @@ -496,6 +556,7 @@ async def _register_manager(
participant_descriptions={agent.name: agent.description for agent in self._members}, # type: ignore[misc]
exception_callback=exception_callback,
result_callback=result_callback,
chat_history=self._chat_history,
),
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright (c) Microsoft. All rights reserved.

import sys

from semantic_kernel.agents.orchestration.group_chat import (
GroupChatOrchestration,
RoundRobinGroupChatManager,
)
from semantic_kernel.agents.runtime.in_process.in_process_runtime import InProcessRuntime
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.contents.history_reducer.chat_history_truncation_reducer import ChatHistoryTruncationReducer
from tests.unit.agents.orchestration.conftest import MockAgent

if sys.version_info >= (3, 12):
pass # pragma: no cover
else:
pass # pragma: no cover


# region Tests with ChatHistoryTruncationReducer


async def test_group_chat_with_chat_history_parameter():
"""Test GroupChatOrchestration accepts chat_history parameter."""
agent = MockAgent(name="TestAgent", description="Test agent")
chat_history = ChatHistory()

# Should not raise an error
orchestration = GroupChatOrchestration(
members=[agent], manager=RoundRobinGroupChatManager(), chat_history=chat_history
)

assert orchestration._chat_history is chat_history


async def test_group_chat_with_truncation_reducer():
"""Test GroupChatOrchestration with ChatHistoryTruncationReducer."""
agent = MockAgent(name="TestAgent", description="Test agent")

# Create truncation reducer that keeps only 2 messages
reducer = ChatHistoryTruncationReducer(target_count=2, threshold_count=0, auto_reduce=True)

runtime = InProcessRuntime()
runtime.start()

try:
orchestration = GroupChatOrchestration(
members=[agent], manager=RoundRobinGroupChatManager(max_rounds=3), chat_history=reducer
)

orchestration_result = await orchestration.invoke(task="Test message", runtime=runtime)
await orchestration_result.get(timeout=1.0)

# Verify the reducer was used and has messages
assert len(reducer.messages) > 0, "Reducer should have received messages"

# Verify truncation is working - should not exceed target_count + threshold_count + buffer
# (The exact count may vary due to "transfer" messages and timing)
assert len(reducer.messages) <= 5, "Reducer should limit message count"

finally:
await runtime.stop_when_idle()


async def test_group_chat_fallback_to_regular_chat_history():
"""Test that GroupChat falls back to add_message for regular ChatHistory."""
agent = MockAgent(name="TestAgent", description="Test agent")

# Use regular ChatHistory (no add_message_async method)
regular_history = ChatHistory()

runtime = InProcessRuntime()
runtime.start()

try:
# Should work without errors
orchestration = GroupChatOrchestration(
members=[agent], manager=RoundRobinGroupChatManager(max_rounds=1), chat_history=regular_history
)

orchestration_result = await orchestration.invoke(task="Test message", runtime=runtime)
await orchestration_result.get(timeout=1.0)

# Verify messages were added to history
assert len(regular_history.messages) > 0, "Messages should be added to regular ChatHistory"

finally:
await runtime.stop_when_idle()


async def test_group_chat_no_chat_history_parameter():
"""Test GroupChatOrchestration works when no chat_history provided."""
agent = MockAgent(name="TestAgent", description="Test agent")

orchestration = GroupChatOrchestration(members=[agent], manager=RoundRobinGroupChatManager())

# Should not set chat_history in constructor when None provided
assert orchestration._chat_history is None


async def test_empty_reducer_not_replaced_by_default_history():
"""Test that empty ChatHistoryReducer instances are not replaced by default ChatHistory.

This is a regression test for the bug where empty reducers evaluated to False
and were replaced by ChatHistory() due to the 'chat_history or ChatHistory()' pattern.
"""
agent = MockAgent(name="TestAgent", description="Test agent")

# Create empty reducer (should evaluate to False in boolean context but still be used)
empty_reducer = ChatHistoryTruncationReducer(target_count=5, threshold_count=0, auto_reduce=True)

# Verify it's empty and evaluates to False initially
assert len(empty_reducer.messages) == 0
assert bool(empty_reducer) is False

runtime = InProcessRuntime()
runtime.start()

try:
orchestration = GroupChatOrchestration(
members=[agent], manager=RoundRobinGroupChatManager(max_rounds=1), chat_history=empty_reducer
)

orchestration_result = await orchestration.invoke(task="Test message", runtime=runtime)
await orchestration_result.get(timeout=1.0)

# Verify messages were added to our reducer (not a default ChatHistory)
assert len(empty_reducer.messages) > 0, "Messages should be added to the reducer instance"

# Verify we can access reducer-specific functionality (proves it's our reducer)
assert hasattr(empty_reducer, "auto_reduce"), "Should still be our reducer instance"
assert empty_reducer.auto_reduce is True, "Should have reducer-specific properties"
assert empty_reducer.target_count == 5, "Should have our original target_count"

finally:
await runtime.stop_when_idle()


# endregion Tests with ChatHistoryTruncationReducer
Loading