From ffdd3ffda3e9dc815615b9b0cadfd4c37cb73cb4 Mon Sep 17 00:00:00 2001 From: Anirudh Gangwal Date: Tue, 12 Aug 2025 00:12:09 +0100 Subject: [PATCH 1/2] Add ChatHistoryReducer support to GroupChatOrchestration Enable GroupChatOrchestration to accept ChatHistoryReducer instances for context management in multi-agent conversations. - Add GroupChatHistorySyncMessage for history synchronization - Support chat_history parameter in GroupChatManagerActor and GroupChatOrchestration - Implement async/sync handling for reducers with add_message_async - Add 10ms delay to resolve race condition in concurrent message handlers - Maintain backward compatibility with existing ChatHistory usage --- .../agents/orchestration/group_chat.py | 81 ++++++++++++++++--- 1 file changed, 71 insertions(+), 10 deletions(-) diff --git a/python/semantic_kernel/agents/orchestration/group_chat.py b/python/semantic_kernel/agents/orchestration/group_chat.py index 65c4640e72a5..781f72ea25d1 100644 --- a/python/semantic_kernel/agents/orchestration/group_chat.py +++ b/python/semantic_kernel/agents/orchestration/group_chat.py @@ -57,6 +57,13 @@ class GroupChatResponseMessage(KernelBaseModel): body: ChatMessageContent +@experimental +class GroupChatHistorySyncMessage(KernelBaseModel): + """A message type to synchronize chat history from manager to agents.""" + + history: list[ChatMessageContent] + + _TGroupChatManagerResult = TypeVar("_TGroupChatManagerResult", ChatMessageContent, str, bool) @@ -119,6 +126,14 @@ async def _handle_response_message(self, message: GroupChatResponseMessage, ctx: logger.debug(f"{self.id}: Received group chat response message.") self._message_cache.add_message(message.body) + @message_handler + async def _handle_history_sync_message(self, message: GroupChatHistorySyncMessage, ctx: MessageContext) -> None: + """Handle chat history synchronization from the manager.""" + logger.debug(f"{self.id}: Received group chat history sync message with {len(message.history)} messages.") + self._message_cache.clear() + for msg in message.history: + self._message_cache.add_message(msg) + @message_handler async def _handle_request_message(self, message: GroupChatRequestMessage, ctx: MessageContext) -> None: if message.agent_name != self._agent.name: @@ -259,6 +274,7 @@ def __init__( participant_descriptions: dict[str, str], exception_callback: Callable[[BaseException], None], result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None, + chat_history: ChatHistory | None = None, ): """Initialize the group chat manager actor. @@ -268,40 +284,74 @@ def __init__( participant_descriptions (dict[str, str]): The descriptions of the participants in the group chat. exception_callback (Callable[[BaseException], None]): A function that is called when an exception occurs. result_callback (Callable | None): A function that is called when the group chat manager produces a result. + chat_history (ChatHistory | None): A chat history instance to use. If None, creates a new ChatHistory. + This can be a ChatHistoryReducer subclass for context management. """ self._manager = manager self._internal_topic_type = internal_topic_type - self._chat_history = ChatHistory() + self._chat_history = chat_history or ChatHistory() self._participant_descriptions = participant_descriptions self._result_callback = result_callback super().__init__("An actor for the group chat manager.", exception_callback) + async def _sync_history_to_agents(self, cancellation_token: CancellationToken) -> None: + """Synchronize the manager's chat history to all agents.""" + # FIXED: Publish sync message and add small delay to ensure processing completes + # before request message is published (avoids race condition in concurrent handlers) + await self.publish_message( + GroupChatHistorySyncMessage(history=self._chat_history.messages), + TopicId(self._internal_topic_type, self.id.key), + cancellation_token=cancellation_token, + ) + + # Small delay to allow sync message processing to complete + # This ensures agent caches are populated before request processing + await asyncio.sleep(0.01) # 10ms should be sufficient for message processing + @message_handler async def _handle_start_message(self, message: GroupChatStartMessage, ctx: MessageContext) -> None: """Handle the start message for the group chat.""" logger.debug(f"{self.id}: Received group chat start message.") if isinstance(message.body, ChatMessageContent): - self._chat_history.add_message(message.body) + # Use add_message_async for reducers, fallback to add_message for regular ChatHistory + if hasattr(self._chat_history, "add_message_async"): + await self._chat_history.add_message_async(message.body) + else: + self._chat_history.add_message(message.body) elif isinstance(message.body, list) and all(isinstance(m, ChatMessageContent) for m in message.body): for m in message.body: - self._chat_history.add_message(m) + if hasattr(self._chat_history, "add_message_async"): + await self._chat_history.add_message_async(m) + else: + self._chat_history.add_message(m) else: raise ValueError(f"Invalid message body type: {type(message.body)}. Expected {DefaultTypeAlias}.") + # Sync history to agents after adding messages + await self._sync_history_to_agents(ctx.cancellation_token) await self._determine_state_and_take_action(ctx.cancellation_token) @message_handler async def _handle_response_message(self, message: GroupChatResponseMessage, ctx: MessageContext) -> None: if message.body.role != AuthorRole.USER: - self._chat_history.add_message( - ChatMessageContent( - role=AuthorRole.USER, - content=f"Transferred to {message.body.name}", - ) + transfer_message = ChatMessageContent( + role=AuthorRole.USER, + content=f"Transferred to {message.body.name}", ) - self._chat_history.add_message(message.body) + if hasattr(self._chat_history, "add_message_async"): + await self._chat_history.add_message_async(transfer_message) + else: + self._chat_history.add_message(transfer_message) + + # Add the main message + if hasattr(self._chat_history, "add_message_async"): + await self._chat_history.add_message_async(message.body) + else: + self._chat_history.add_message(message.body) + # Sync history to agents after adding messages + await self._sync_history_to_agents(ctx.cancellation_token) await self._determine_state_and_take_action(ctx.cancellation_token) @ActorBase.exception_handler @@ -314,7 +364,13 @@ async def _determine_state_and_take_action(self, cancellation_token: Cancellatio if should_request_user_input.result and self._manager.human_response_function: logger.debug(f"Group chat manager requested user input. Reason: {should_request_user_input.reason}") user_input_message = await self._call_human_response_function() - self._chat_history.add_message(user_input_message) + # Add user input message with proper async handling for reducers + if hasattr(self._chat_history, "add_message_async"): + await self._chat_history.add_message_async(user_input_message) + else: + self._chat_history.add_message(user_input_message) + # Sync history to agents after adding user input + await self._sync_history_to_agents(cancellation_token) await self.publish_message( GroupChatResponseMessage(body=user_input_message), TopicId(self._internal_topic_type, self.id.key), @@ -377,6 +433,7 @@ def __init__( agent_response_callback: Callable[[DefaultTypeAlias], Awaitable[None] | None] | None = None, streaming_agent_response_callback: Callable[[StreamingChatMessageContent, bool], Awaitable[None] | None] | None = None, + chat_history: ChatHistory | None = None, ) -> None: """Initialize the group chat orchestration. @@ -392,8 +449,11 @@ def __init__( by the agents. streaming_agent_response_callback (Callable | None): A function that is called when a streaming response is produced by the agents. + chat_history (ChatHistory | None): A chat history instance to use for the manager. If None, creates a new + ChatHistory. This can be a ChatHistoryReducer subclass for context management. """ self._manager = manager + self._chat_history = chat_history for member in members: if member.description is None: @@ -496,6 +556,7 @@ async def _register_manager( participant_descriptions={agent.name: agent.description for agent in self._members}, # type: ignore[misc] exception_callback=exception_callback, result_callback=result_callback, + chat_history=self._chat_history, ), ) From e562bc1b974e030e20cfd6ad73c0e923bb9211c9 Mon Sep 17 00:00:00 2001 From: Anirudh Gangwal Date: Tue, 12 Aug 2025 00:57:05 +0100 Subject: [PATCH 2/2] Apply ruff formatting --- .../agents/orchestration/group_chat.py | 2 +- .../test_group_chat_with_reducers.py | 139 ++++++++++++++++++ 2 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 python/tests/unit/agents/orchestration/test_group_chat_with_reducers.py diff --git a/python/semantic_kernel/agents/orchestration/group_chat.py b/python/semantic_kernel/agents/orchestration/group_chat.py index 781f72ea25d1..f1fc1677e49a 100644 --- a/python/semantic_kernel/agents/orchestration/group_chat.py +++ b/python/semantic_kernel/agents/orchestration/group_chat.py @@ -289,7 +289,7 @@ def __init__( """ self._manager = manager self._internal_topic_type = internal_topic_type - self._chat_history = chat_history or ChatHistory() + self._chat_history = chat_history if chat_history is not None else ChatHistory() self._participant_descriptions = participant_descriptions self._result_callback = result_callback diff --git a/python/tests/unit/agents/orchestration/test_group_chat_with_reducers.py b/python/tests/unit/agents/orchestration/test_group_chat_with_reducers.py new file mode 100644 index 000000000000..ee4423131206 --- /dev/null +++ b/python/tests/unit/agents/orchestration/test_group_chat_with_reducers.py @@ -0,0 +1,139 @@ +# Copyright (c) Microsoft. All rights reserved. + +import sys + +from semantic_kernel.agents.orchestration.group_chat import ( + GroupChatOrchestration, + RoundRobinGroupChatManager, +) +from semantic_kernel.agents.runtime.in_process.in_process_runtime import InProcessRuntime +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.history_reducer.chat_history_truncation_reducer import ChatHistoryTruncationReducer +from tests.unit.agents.orchestration.conftest import MockAgent + +if sys.version_info >= (3, 12): + pass # pragma: no cover +else: + pass # pragma: no cover + + +# region Tests with ChatHistoryTruncationReducer + + +async def test_group_chat_with_chat_history_parameter(): + """Test GroupChatOrchestration accepts chat_history parameter.""" + agent = MockAgent(name="TestAgent", description="Test agent") + chat_history = ChatHistory() + + # Should not raise an error + orchestration = GroupChatOrchestration( + members=[agent], manager=RoundRobinGroupChatManager(), chat_history=chat_history + ) + + assert orchestration._chat_history is chat_history + + +async def test_group_chat_with_truncation_reducer(): + """Test GroupChatOrchestration with ChatHistoryTruncationReducer.""" + agent = MockAgent(name="TestAgent", description="Test agent") + + # Create truncation reducer that keeps only 2 messages + reducer = ChatHistoryTruncationReducer(target_count=2, threshold_count=0, auto_reduce=True) + + runtime = InProcessRuntime() + runtime.start() + + try: + orchestration = GroupChatOrchestration( + members=[agent], manager=RoundRobinGroupChatManager(max_rounds=3), chat_history=reducer + ) + + orchestration_result = await orchestration.invoke(task="Test message", runtime=runtime) + await orchestration_result.get(timeout=1.0) + + # Verify the reducer was used and has messages + assert len(reducer.messages) > 0, "Reducer should have received messages" + + # Verify truncation is working - should not exceed target_count + threshold_count + buffer + # (The exact count may vary due to "transfer" messages and timing) + assert len(reducer.messages) <= 5, "Reducer should limit message count" + + finally: + await runtime.stop_when_idle() + + +async def test_group_chat_fallback_to_regular_chat_history(): + """Test that GroupChat falls back to add_message for regular ChatHistory.""" + agent = MockAgent(name="TestAgent", description="Test agent") + + # Use regular ChatHistory (no add_message_async method) + regular_history = ChatHistory() + + runtime = InProcessRuntime() + runtime.start() + + try: + # Should work without errors + orchestration = GroupChatOrchestration( + members=[agent], manager=RoundRobinGroupChatManager(max_rounds=1), chat_history=regular_history + ) + + orchestration_result = await orchestration.invoke(task="Test message", runtime=runtime) + await orchestration_result.get(timeout=1.0) + + # Verify messages were added to history + assert len(regular_history.messages) > 0, "Messages should be added to regular ChatHistory" + + finally: + await runtime.stop_when_idle() + + +async def test_group_chat_no_chat_history_parameter(): + """Test GroupChatOrchestration works when no chat_history provided.""" + agent = MockAgent(name="TestAgent", description="Test agent") + + orchestration = GroupChatOrchestration(members=[agent], manager=RoundRobinGroupChatManager()) + + # Should not set chat_history in constructor when None provided + assert orchestration._chat_history is None + + +async def test_empty_reducer_not_replaced_by_default_history(): + """Test that empty ChatHistoryReducer instances are not replaced by default ChatHistory. + + This is a regression test for the bug where empty reducers evaluated to False + and were replaced by ChatHistory() due to the 'chat_history or ChatHistory()' pattern. + """ + agent = MockAgent(name="TestAgent", description="Test agent") + + # Create empty reducer (should evaluate to False in boolean context but still be used) + empty_reducer = ChatHistoryTruncationReducer(target_count=5, threshold_count=0, auto_reduce=True) + + # Verify it's empty and evaluates to False initially + assert len(empty_reducer.messages) == 0 + assert bool(empty_reducer) is False + + runtime = InProcessRuntime() + runtime.start() + + try: + orchestration = GroupChatOrchestration( + members=[agent], manager=RoundRobinGroupChatManager(max_rounds=1), chat_history=empty_reducer + ) + + orchestration_result = await orchestration.invoke(task="Test message", runtime=runtime) + await orchestration_result.get(timeout=1.0) + + # Verify messages were added to our reducer (not a default ChatHistory) + assert len(empty_reducer.messages) > 0, "Messages should be added to the reducer instance" + + # Verify we can access reducer-specific functionality (proves it's our reducer) + assert hasattr(empty_reducer, "auto_reduce"), "Should still be our reducer instance" + assert empty_reducer.auto_reduce is True, "Should have reducer-specific properties" + assert empty_reducer.target_count == 5, "Should have our original target_count" + + finally: + await runtime.stop_when_idle() + + +# endregion Tests with ChatHistoryTruncationReducer