diff --git a/agents/s13_background_tasks.py b/agents/s13_background_tasks.py index ea19e6dc2..4fc0483d9 100644 --- a/agents/s13_background_tasks.py +++ b/agents/s13_background_tasks.py @@ -91,6 +91,7 @@ def __init__(self): self.tasks = {} # task_id -> {status, result, command, started_at} self._notification_queue = [] # completed task results self._lock = threading.Lock() + self._condition = threading.Condition(self._lock) def _record_path(self, task_id: str) -> Path: return self.dir / f"{task_id}.json" @@ -156,7 +157,7 @@ def _execute(self, task_id: str, command: str): self.tasks[task_id]["finished_at"] = time.time() self.tasks[task_id]["result_preview"] = preview self._persist_task(task_id) - with self._lock: + with self._condition: self._notification_queue.append({ "task_id": task_id, "status": status, @@ -164,6 +165,7 @@ def _execute(self, task_id: str, command: str): "preview": preview, "output_file": str(output_path.relative_to(WORKDIR)), }) + self._condition.notify_all() def check(self, task_id: str = None) -> str: """Check status of one task or list all.""" @@ -189,7 +191,22 @@ def check(self, task_id: str = None) -> str: def drain_notifications(self) -> list: """Return and clear all pending completion notifications.""" - with self._lock: + with self._condition: + notifs = list(self._notification_queue) + self._notification_queue.clear() + return notifs + + def _has_running_tasks_locked(self) -> bool: + return any(task["status"] == "running" for task in self.tasks.values()) + + def has_running_tasks(self) -> bool: + with self._condition: + return self._has_running_tasks_locked() + + def wait_for_notifications(self) -> list: + with self._condition: + while not self._notification_queue and self._has_running_tasks_locked(): + self._condition.wait() notifs = list(self._notification_queue) self._notification_queue.clear() return notifs @@ -286,24 +303,41 @@ def run_edit(path: str, old_text: str, new_text: str) -> str: ] +def inject_background_results(messages: list, notifs: list) -> bool: + if notifs and messages: + lines = [] + for notif in notifs: + suffix = "" + if notif.get("output_file"): + suffix = f" (output_file={notif['output_file']})" + lines.append( + f"[bg:{notif['task_id']}] {notif['status']}: " + f"{notif.get('preview') or '(no output)'}{suffix}" + ) + notif_text = "\n".join(lines) + messages.append( + { + "role": "user", + "content": f"\n{notif_text}\n", + } + ) + return True + return False + + def agent_loop(messages: list): while True: - # Drain background notifications and inject as a synthetic user/assistant - # transcript pair before the next model call (teaching demo behavior). - notifs = BG.drain_notifications() - if notifs and messages: - notif_text = "\n".join( - f"[bg:{n['task_id']}] {n['status']}: {n['preview']} " - f"(output_file={n['output_file']})" - for n in notifs - ) - messages.append({"role": "user", "content": f"\n{notif_text}\n"}) + inject_background_results(messages, BG.drain_notifications()) response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": + if BG.has_running_tasks() and inject_background_results( + messages, BG.wait_for_notifications() + ): + continue return results = [] for block in response.content: diff --git a/agents/s_full.py b/agents/s_full.py index 42eaddbbf..ada23a39e 100644 --- a/agents/s_full.py +++ b/agents/s_full.py @@ -742,6 +742,21 @@ def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> st ] +def inject_background_results(messages: list, notifs: list) -> bool: + if notifs: + txt = "\n".join( + f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs + ) + messages.append( + { + "role": "user", + "content": f"\n{txt}\n", + } + ) + return True + return False + + # === SECTION: agent_loop === def agent_loop(messages: list): rounds_without_todo = 0 @@ -752,11 +767,7 @@ def agent_loop(messages: list): print("[auto-compact triggered]") messages[:] = auto_compact(messages) # s08: drain background notifications - notifs = BG.drain() - if notifs: - txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs) - messages.append({"role": "user", "content": f"\n{txt}\n"}) - messages.append({"role": "assistant", "content": "Noted background results."}) + inject_background_results(messages, BG.drain()) # s10: check lead inbox inbox = BUS.read_inbox("lead") if inbox: @@ -769,6 +780,10 @@ def agent_loop(messages: list): ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": + if BG.has_running_tasks() and inject_background_results( + messages, BG.wait_for_notifications() + ): + continue return # Tool execution results = [] diff --git a/tests/test_background_notifications.py b/tests/test_background_notifications.py new file mode 100644 index 000000000..b321c888c --- /dev/null +++ b/tests/test_background_notifications.py @@ -0,0 +1,156 @@ +import os +import sys +import types +import unittest +from pathlib import Path +from types import SimpleNamespace + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +os.environ.setdefault("MODEL_ID", "test-model") + +fake_anthropic = types.ModuleType("anthropic") + + +class FakeAnthropic: + def __init__(self, *args, **kwargs): + self.messages = SimpleNamespace(create=None) + + +setattr(fake_anthropic, "Anthropic", FakeAnthropic) +sys.modules.setdefault("anthropic", fake_anthropic) + +fake_dotenv = types.ModuleType("dotenv") +setattr(fake_dotenv, "load_dotenv", lambda *args, **kwargs: None) +sys.modules.setdefault("dotenv", fake_dotenv) + +import agents.s13_background_tasks as s13_background_tasks +import agents.s_full as s_full + + +class FakeMessagesAPI: + def __init__(self, responses): + self._responses = iter(responses) + self.call_count = 0 + + def create(self, **kwargs): + self.call_count += 1 + return next(self._responses) + + +class FakeS13BackgroundManager: + def __init__(self): + self._running = True + self.wait_called = False + + def drain_notifications(self): + return [] + + def has_running_tasks(self): + return self._running + + def wait_for_notifications(self): + self.wait_called = True + self._running = False + return [ + { + "task_id": "bg-1", + "status": "completed", + "preview": "done", + "output_file": ".runtime-tasks/bg-1.log", + } + ] + + +class FakeSFullBackgroundManager: + def __init__(self): + self._running = True + self.wait_called = False + + def drain(self): + return [] + + def has_running_tasks(self): + return self._running + + def wait_for_notifications(self): + self.wait_called = True + self._running = False + return [{"task_id": "bg-1", "status": "completed", "result": "done"}] + + +class BackgroundNotificationTests(unittest.TestCase): + def test_s13_agent_loop_waits_for_background_results_after_end_turn(self): + messages = [{"role": "user", "content": "Run tests in the background"}] + fake_bg = FakeS13BackgroundManager() + fake_api = FakeMessagesAPI( + [ + SimpleNamespace( + stop_reason="end_turn", content="Started background work." + ), + SimpleNamespace( + stop_reason="end_turn", content="Background work completed." + ), + ] + ) + original_bg = s13_background_tasks.BG + original_client = s13_background_tasks.client + try: + s13_background_tasks.BG = fake_bg + s13_background_tasks.client = SimpleNamespace(messages=fake_api) + s13_background_tasks.agent_loop(messages) + finally: + s13_background_tasks.BG = original_bg + s13_background_tasks.client = original_client + + self.assertTrue(fake_bg.wait_called) + self.assertEqual(fake_api.call_count, 2) + self.assertTrue( + any( + message["role"] == "user" + and isinstance(message["content"], str) + and "" in message["content"] + for message in messages + ) + ) + + def test_s_full_agent_loop_waits_for_background_results_after_end_turn(self): + messages = [{"role": "user", "content": "Run tests in the background"}] + fake_bg = FakeSFullBackgroundManager() + fake_api = FakeMessagesAPI( + [ + SimpleNamespace( + stop_reason="end_turn", content="Started background work." + ), + SimpleNamespace( + stop_reason="end_turn", content="Background work completed." + ), + ] + ) + original_bg = s_full.BG + original_client = s_full.client + try: + s_full.BG = fake_bg + s_full.client = SimpleNamespace(messages=fake_api) + s_full.agent_loop(messages) + finally: + s_full.BG = original_bg + s_full.client = original_client + + self.assertTrue(fake_bg.wait_called) + self.assertEqual(fake_api.call_count, 2) + self.assertTrue( + any( + message["role"] == "user" + and isinstance(message["content"], str) + and "" in message["content"] + for message in messages + ) + ) + + +if __name__ == "__main__": + unittest.main()