From d5882e0954f1911110346775bed8774da3c6bf99 Mon Sep 17 00:00:00 2001 From: MacMini_Kay Date: Tue, 28 Apr 2026 12:18:34 +0900 Subject: [PATCH] fix(bus): wire injection_prompt in async inter-agent result delivery (rr#18) from_interagent_result() never set envelope.prompt, so bus._process() always skipped injection and delivered raw sub-agent text instead of passing it through the calling agent's active CLI session. Fix: add injection_prompt kwarg to from_interagent_result(); callers (telegram/app.py, matrix/bot.py) build the full [ASYNC INTER-AGENT RESPONSE] prompt and pass it in. bus._process() then calls inject_prompt() as intended, resuming the parent session with the result and replacing result_text with the processed response. Also adds 10 regression tests (test_interagent_inject_rr18.py) covering idle delivery, lock-contention queueing (Main busy scenario), no-prompt raw-deliver path, and concurrent multi-task isolation. Co-Authored-By: Claude Sonnet 4.6 --- ductor_bot/bus/adapters.py | 14 +- ductor_bot/messenger/matrix/bot.py | 28 ++- ductor_bot/messenger/telegram/app.py | 36 ++- tests/bus/test_adapters.py | 16 +- tests/bus/test_interagent_inject_rr18.py | 279 +++++++++++++++++++++++ uv.lock | 2 +- 6 files changed, 368 insertions(+), 7 deletions(-) create mode 100644 tests/bus/test_interagent_inject_rr18.py diff --git a/ductor_bot/bus/adapters.py b/ductor_bot/bus/adapters.py index 2e518171..e1160e79 100644 --- a/ductor_bot/bus/adapters.py +++ b/ductor_bot/bus/adapters.py @@ -141,7 +141,12 @@ def from_webhook_wake(chat_id: int, prompt: str) -> Envelope: # -- Inter-agent --------------------------------------------------------------- -def from_interagent_result(result: AsyncInterAgentResult, chat_id: int) -> Envelope: +def from_interagent_result( + result: AsyncInterAgentResult, + chat_id: int, + *, + injection_prompt: str = "", +) -> Envelope: """Convert an async inter-agent result. Uses ``result.chat_id`` / ``result.topic_id`` when available so that @@ -150,6 +155,10 @@ def from_interagent_result(result: AsyncInterAgentResult, chat_id: int) -> Envel Error results are delivered without lock or injection. Success results acquire the lock and inject into the active session. + ``injection_prompt`` must be supplied by the caller (e.g. + ``app.on_async_interagent_result``) so that ``bus._process`` can + actually invoke the CLI injection step. When empty, injection is + skipped and only the raw ``result_text`` is delivered. """ delivery_chat_id = result.chat_id or chat_id meta = { @@ -181,12 +190,13 @@ def from_interagent_result(result: AsyncInterAgentResult, chat_id: int) -> Envel origin=Origin.INTERAGENT, chat_id=delivery_chat_id, topic_id=result.topic_id, + prompt=injection_prompt, prompt_preview=result.message_preview, result_text=result.result_text, status="success", delivery=DeliveryMode.UNICAST, lock_mode=LockMode.REQUIRED, - needs_injection=True, + needs_injection=bool(injection_prompt), elapsed_seconds=result.elapsed_seconds, session_name=result.session_name, metadata=meta, diff --git a/ductor_bot/messenger/matrix/bot.py b/ductor_bot/messenger/matrix/bot.py index d741c149..7795c13f 100644 --- a/ductor_bot/messenger/matrix/bot.py +++ b/ductor_bot/messenger/matrix/bot.py @@ -1206,7 +1206,33 @@ async def on_async_interagent_result(self, result: AsyncInterAgentResult) -> Non text = result.result_text or f"Inter-agent result from {result.recipient}" await self._notification_service.notify_all(text) return - await self._bus.submit(from_interagent_result(result, chat_id)) + + injection_prompt = "" + if result.success: + recipient = result.recipient or result.sender + session_hint = ( + f"\nThe recipient processed this in session `{result.session_name}`. " + f"via `@{result.session_name} `." + if result.session_name + else "" + ) + task_context = ( + f"\n\nOriginal task you sent to '{recipient}':\n{result.original_message}" + if result.original_message + else "" + ) + injection_prompt = ( + f"[ASYNC INTER-AGENT RESPONSE from '{recipient}'" + f" (task {result.task_id})]\n" + f"{result.result_text}\n" + f"[END ASYNC INTER-AGENT RESPONSE]{session_hint}{task_context}\n\n" + f"You are agent '{self._agent_name}'. Process this response from agent " + f"'{recipient}' and communicate the relevant results to the user." + ) + + await self._bus.submit( + from_interagent_result(result, chat_id, injection_prompt=injection_prompt) + ) async def on_task_result(self, result: TaskResult) -> None: from ductor_bot.bus.adapters import from_task_result diff --git a/ductor_bot/messenger/telegram/app.py b/ductor_bot/messenger/telegram/app.py index 975abcee..de68511e 100644 --- a/ductor_bot/messenger/telegram/app.py +++ b/ductor_bot/messenger/telegram/app.py @@ -1474,7 +1474,41 @@ async def on_async_interagent_result(self, result: AsyncInterAgentResult) -> Non logger.warning("No chat_id available for async interagent result delivery") return set_log_context(operation="ia-async", chat_id=chat_id) - await self._bus.submit(from_interagent_result(result, chat_id)) + + injection_prompt = "" + if result.success: + recipient = result.recipient or result.sender + session_hint = ( + f"\nThe recipient processed this in session `{result.session_name}`. " + f"The user can continue this session in the recipient's Telegram chat " + f"via `@{result.session_name} `." + if result.session_name + else "" + ) + task_context = ( + f"\n\nOriginal task you sent to '{recipient}':\n{result.original_message}" + if result.original_message + else "" + ) + injection_prompt = ( + f"[ASYNC INTER-AGENT RESPONSE from '{recipient}'" + f" (task {result.task_id})]\n" + f"{result.result_text}\n" + f"[END ASYNC INTER-AGENT RESPONSE]{session_hint}{task_context}\n\n" + f"You are agent '{self._agent_name}'. Process this response from agent " + f"'{recipient}' and communicate the relevant results to the user " + f"in your Telegram chat." + ) + logger.info( + "ia-async inject: task=%s from=%s prompt_len=%d", + result.task_id, + recipient, + len(injection_prompt), + ) + + await self._bus.submit( + from_interagent_result(result, chat_id, injection_prompt=injection_prompt) + ) async def on_task_result(self, result: TaskResult) -> None: """Handle background task result via the message bus.""" diff --git a/tests/bus/test_adapters.py b/tests/bus/test_adapters.py index d2594a6e..f7fa7d90 100644 --- a/tests/bus/test_adapters.py +++ b/tests/bus/test_adapters.py @@ -160,17 +160,29 @@ def test_from_webhook_wake() -> None: assert env.lock_mode == LockMode.REQUIRED -def test_from_interagent_success() -> None: +def test_from_interagent_success_without_prompt_no_injection() -> None: + """rr#18: without injection_prompt, needs_injection must be False (raw deliver).""" env = from_interagent_result(_FakeInterAgentResult(), chat_id=100) assert env.origin == Origin.INTERAGENT assert env.chat_id == 100 assert env.status == "success" assert env.delivery == DeliveryMode.UNICAST assert env.lock_mode == LockMode.REQUIRED - assert env.needs_injection + assert not env.needs_injection + assert env.prompt == "" assert env.metadata["sender"] == "agent-a" +def test_from_interagent_success_with_prompt_enables_injection() -> None: + """rr#18: injection_prompt wires needs_injection=True and sets envelope.prompt.""" + prompt = "[ASYNC INTER-AGENT RESPONSE from 'dev' (task t1)]\nresult\n[END]" + env = from_interagent_result(_FakeInterAgentResult(), chat_id=100, injection_prompt=prompt) + assert env.origin == Origin.INTERAGENT + assert env.needs_injection + assert env.prompt == prompt + assert env.lock_mode == LockMode.REQUIRED + + def test_from_interagent_error() -> None: env = from_interagent_result(_FakeInterAgentResult(success=False, error="timeout"), chat_id=100) assert env.status == "error" diff --git a/tests/bus/test_interagent_inject_rr18.py b/tests/bus/test_interagent_inject_rr18.py new file mode 100644 index 00000000..e1dcaf71 --- /dev/null +++ b/tests/bus/test_interagent_inject_rr18.py @@ -0,0 +1,279 @@ +"""rr#18 regression tests — async inter-agent result injection. + +Verifies that from_interagent_result + MessageBus correctly injects the +inter-agent response into the active CLI session (bus._process calls +injector.inject_prompt when prompt is set). + +Root cause: from_interagent_result() previously left envelope.prompt="" +so bus._process() always skipped injection and delivered raw Dev text. +Fix: caller builds injection_prompt and passes it to the adapter. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ductor_bot.bus.adapters import from_interagent_result +from ductor_bot.bus.bus import MessageBus +from ductor_bot.bus.envelope import LockMode, Origin +from ductor_bot.bus.lock_pool import LockPool + + +# -- Shared fixtures ----------------------------------------------------------- + + +@dataclass +class _FakeIAResult: + task_id: str = "task-rr18" + sender: str = "dev" + recipient: str = "main" + message_preview: str = "glossary task done" + result_text: str = "L3 priority_override implemented" + success: bool = True + error: str | None = None + elapsed_seconds: float = 312.0 + session_name: str = "ia-dev" + provider_switch_notice: str = "" + original_message: str = "implement §13.C glossary" + chat_id: int = 0 + topic_id: int | None = None + + +def _build_injection_prompt(result: _FakeIAResult, agent_name: str = "main") -> str: + recipient = result.recipient or result.sender + session_hint = ( + f"\nThe recipient processed this in session `{result.session_name}`." + if result.session_name + else "" + ) + task_context = ( + f"\n\nOriginal task you sent to '{recipient}':\n{result.original_message}" + if result.original_message + else "" + ) + return ( + f"[ASYNC INTER-AGENT RESPONSE from '{recipient}'" + f" (task {result.task_id})]\n" + f"{result.result_text}\n" + f"[END ASYNC INTER-AGENT RESPONSE]{session_hint}{task_context}\n\n" + f"You are agent '{agent_name}'. Process this response from agent " + f"'{recipient}' and communicate the relevant results to the user " + f"in your Telegram chat." + ) + + +def _mock_transport(name: str = "tg") -> AsyncMock: + t = AsyncMock() + t.transport_name = name + t.deliver = AsyncMock() + t.deliver_broadcast = AsyncMock() + return t + + +# -- Pattern 1: idle — injection fires immediately ---------------------------- + + +async def test_idle_injection_fires(monkeypatch: pytest.MonkeyPatch) -> None: + """Pattern 1: Main idle, ia-async result → inject_prompt called once.""" + bus = MessageBus() + injector = AsyncMock() + injector.inject_prompt = AsyncMock(return_value="processed by main") + bus.set_injector(injector) + bus.register_transport(_mock_transport()) + + result = _FakeIAResult() + prompt = _build_injection_prompt(result) + env = from_interagent_result(result, chat_id=8452932024, injection_prompt=prompt) + + await bus.submit(env) + + injector.inject_prompt.assert_awaited_once() + call_args = injector.inject_prompt.call_args + assert call_args.args[0] == prompt + assert call_args.args[1] == 8452932024 + + +async def test_idle_envelope_result_replaced_by_injected_response() -> None: + """Pattern 1: envelope.result_text is replaced by inject_prompt's return value.""" + bus = MessageBus() + injector = AsyncMock() + injector.inject_prompt = AsyncMock(return_value="main's processed reply") + bus.set_injector(injector) + bus.register_transport(_mock_transport()) + + result = _FakeIAResult() + prompt = _build_injection_prompt(result) + env = from_interagent_result(result, chat_id=8452932024, injection_prompt=prompt) + original_raw = env.result_text + + await bus.submit(env) + + assert env.result_text == "main's processed reply" + assert env.result_text != original_raw + + +# -- Pattern 2: Main busy — injection queued behind lock, not dropped ---------- + + +async def test_busy_main_injection_not_dropped() -> None: + """Pattern 2 (rr#18 regression core): when Main lock is held by another + coroutine, the ia-async envelope must wait and eventually inject — + never silently drop. + """ + lock_pool = LockPool() + bus = MessageBus(lock_pool=lock_pool) + + inject_called = asyncio.Event() + injector = AsyncMock() + + async def _inject(prompt: str, chat_id: int, label: str, **_: object) -> str: + inject_called.set() + return "injected while busy" + + injector.inject_prompt = AsyncMock(side_effect=_inject) + bus.set_injector(injector) + bus.register_transport(_mock_transport()) + + chat_lock = lock_pool.get((8452932024, None)) + + # Simulate Main being busy by holding the lock in a background task + lock_held = asyncio.Event() + lock_release = asyncio.Event() + + async def _hold_lock() -> None: + async with chat_lock: + lock_held.set() + await lock_release.wait() + + holder = asyncio.create_task(_hold_lock()) + await lock_held.wait() # lock is now held + + result = _FakeIAResult() + prompt = _build_injection_prompt(result) + env = from_interagent_result(result, chat_id=8452932024, injection_prompt=prompt) + + # Submit ia-async while lock is held — should queue, not drop + submit_task = asyncio.create_task(bus.submit(env)) + + # Give the event loop a moment; inject must NOT have fired yet + await asyncio.sleep(0) + assert not inject_called.is_set(), "inject_prompt fired while lock was held — race!" + + # Release the lock + lock_release.set() + await holder + + # Now the envelope should proceed + await submit_task + + assert inject_called.is_set(), "inject_prompt never called after lock released — silent drop!" + assert env.result_text == "injected while busy" + + +# -- Pattern 3: no prompt → no injection, raw text delivered ------------------ + + +async def test_no_injection_prompt_skips_injection() -> None: + """Raw deliver path: without injection_prompt, inject_prompt is never called.""" + bus = MessageBus() + injector = AsyncMock() + injector.inject_prompt = AsyncMock(return_value="should not be called") + bus.set_injector(injector) + bus.register_transport(_mock_transport()) + + result = _FakeIAResult() + env = from_interagent_result(result, chat_id=8452932024) # no injection_prompt + + await bus.submit(env) + + injector.inject_prompt.assert_not_awaited() + assert env.result_text == result.result_text # raw text unchanged + + +# -- Pattern 4: multiple concurrent tasks, each injected independently --------- + + +async def test_multiple_concurrent_tasks_all_injected() -> None: + """Pattern 4: dev + strategy + reviewer all return async results concurrently. + Each must be injected; none dropped or cross-contaminated. + """ + lock_pool = LockPool() + bus = MessageBus(lock_pool=lock_pool) + + injected: list[str] = [] + + async def _inject(prompt: str, chat_id: int, label: str, **_: object) -> str: + await asyncio.sleep(0) # yield to allow interleaving + injected.append(prompt[:30]) + return f"processed:{prompt[:20]}" + + injector = AsyncMock() + injector.inject_prompt = AsyncMock(side_effect=_inject) + bus.set_injector(injector) + bus.register_transport(_mock_transport()) + + senders = ["dev", "strategy", "reviewer"] + results = [_FakeIAResult(sender=s, task_id=f"task-{s}") for s in senders] + prompts = [_build_injection_prompt(r) for r in results] + envs = [ + from_interagent_result(r, chat_id=8452932024, injection_prompt=p) + for r, p in zip(results, prompts) + ] + + # Submit all three concurrently + await asyncio.gather(*[bus.submit(e) for e in envs]) + + assert len(injected) == 3, f"Expected 3 injections, got {len(injected)}" + assert injector.inject_prompt.await_count == 3 + + +# -- Adapter unit tests for rr#18 fix ----------------------------------------- + + +def test_adapter_prompt_empty_no_injection() -> None: + """from_interagent_result without injection_prompt → needs_injection=False.""" + env = from_interagent_result(_FakeIAResult(), chat_id=100) + assert not env.needs_injection + assert env.prompt == "" + assert env.lock_mode == LockMode.REQUIRED # lock still required + + +def test_adapter_prompt_set_enables_injection() -> None: + """from_interagent_result with injection_prompt → needs_injection=True.""" + prompt = _build_injection_prompt(_FakeIAResult()) + env = from_interagent_result(_FakeIAResult(), chat_id=100, injection_prompt=prompt) + assert env.needs_injection + assert env.prompt == prompt + assert env.origin == Origin.INTERAGENT + assert env.lock_mode == LockMode.REQUIRED + + +def test_adapter_error_result_ignores_injection_prompt() -> None: + """Error results are never injected regardless of injection_prompt.""" + prompt = "should be ignored" + env = from_interagent_result( + _FakeIAResult(success=False, error="timeout"), + chat_id=100, + injection_prompt=prompt, + ) + assert env.is_error + assert not env.needs_injection + assert env.lock_mode == LockMode.NONE + + +def test_adapter_prompt_contains_result_text() -> None: + """Prompt built from result must embed the raw result_text.""" + result = _FakeIAResult(result_text="OAuth flow complete: tokens stored") + prompt = _build_injection_prompt(result) + env = from_interagent_result(result, chat_id=100, injection_prompt=prompt) + assert result.result_text in env.prompt + + +def test_adapter_prompt_contains_task_id() -> None: + result = _FakeIAResult(task_id="oauth-task-75") + prompt = _build_injection_prompt(result) + assert "oauth-task-75" in prompt diff --git a/uv.lock b/uv.lock index 71950ef7..4edf8998 100644 --- a/uv.lock +++ b/uv.lock @@ -385,7 +385,7 @@ wheels = [ [[package]] name = "ductor" -version = "0.15.0" +version = "0.16.1" source = { editable = "." } dependencies = [ { name = "aiogram" },