Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions agents/s13_background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(self):
self.tasks = {} # task_id -> {status, result, command, started_at}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)

def _record_path(self, task_id: str) -> Path:
return self.dir / f"{task_id}.json"
Expand Down Expand Up @@ -156,14 +157,15 @@ def _execute(self, task_id: str, command: str):
self.tasks[task_id]["finished_at"] = time.time()
self.tasks[task_id]["result_preview"] = preview
self._persist_task(task_id)
with self._lock:
with self._condition:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"preview": preview,
"output_file": str(output_path.relative_to(WORKDIR)),
})
self._condition.notify_all()

def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
Expand All @@ -189,7 +191,22 @@ def check(self, task_id: str = None) -> str:

def drain_notifications(self) -> list:
"""Return and clear all pending completion notifications."""
with self._lock:
with self._condition:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs

def _has_running_tasks_locked(self) -> bool:
return any(task["status"] == "running" for task in self.tasks.values())

def has_running_tasks(self) -> bool:
with self._condition:
return self._has_running_tasks_locked()

def wait_for_notifications(self) -> list:
with self._condition:
while not self._notification_queue and self._has_running_tasks_locked():
self._condition.wait()
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
Expand Down Expand Up @@ -286,24 +303,41 @@ def run_edit(path: str, old_text: str, new_text: str) -> str:
]


def inject_background_results(messages: list, notifs: list) -> bool:
if notifs and messages:
lines = []
for notif in notifs:
suffix = ""
if notif.get("output_file"):
suffix = f" (output_file={notif['output_file']})"
lines.append(
f"[bg:{notif['task_id']}] {notif['status']}: "
f"{notif.get('preview') or '(no output)'}{suffix}"
)
notif_text = "\n".join(lines)
messages.append(
{
"role": "user",
"content": f"<background-results>\n{notif_text}\n</background-results>",
}
)
return True
return False


def agent_loop(messages: list):
while True:
# Drain background notifications and inject as a synthetic user/assistant
# transcript pair before the next model call (teaching demo behavior).
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['preview']} "
f"(output_file={n['output_file']})"
for n in notifs
)
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
inject_background_results(messages, BG.drain_notifications())
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
if BG.has_running_tasks() and inject_background_results(
messages, BG.wait_for_notifications()
):
continue
return
results = []
for block in response.content:
Expand Down
25 changes: 20 additions & 5 deletions agents/s_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,21 @@ def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> st
]


def inject_background_results(messages: list, notifs: list) -> bool:
if notifs:
txt = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append(
{
"role": "user",
"content": f"<background-results>\n{txt}\n</background-results>",
}
)
return True
return False


# === SECTION: agent_loop ===
def agent_loop(messages: list):
rounds_without_todo = 0
Expand All @@ -752,11 +767,7 @@ def agent_loop(messages: list):
print("[auto-compact triggered]")
messages[:] = auto_compact(messages)
# s08: drain background notifications
notifs = BG.drain()
if notifs:
txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs)
messages.append({"role": "user", "content": f"<background-results>\n{txt}\n</background-results>"})
messages.append({"role": "assistant", "content": "Noted background results."})
inject_background_results(messages, BG.drain())
# s10: check lead inbox
inbox = BUS.read_inbox("lead")
if inbox:
Expand All @@ -769,6 +780,10 @@ def agent_loop(messages: list):
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
if BG.has_running_tasks() and inject_background_results(
messages, BG.wait_for_notifications()
):
continue
return
# Tool execution
results = []
Expand Down
156 changes: 156 additions & 0 deletions tests/test_background_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import os
import sys
import types
import unittest
from pathlib import Path
from types import SimpleNamespace


REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))

os.environ.setdefault("MODEL_ID", "test-model")

fake_anthropic = types.ModuleType("anthropic")


class FakeAnthropic:
def __init__(self, *args, **kwargs):
self.messages = SimpleNamespace(create=None)


setattr(fake_anthropic, "Anthropic", FakeAnthropic)
sys.modules.setdefault("anthropic", fake_anthropic)

fake_dotenv = types.ModuleType("dotenv")
setattr(fake_dotenv, "load_dotenv", lambda *args, **kwargs: None)
sys.modules.setdefault("dotenv", fake_dotenv)

import agents.s13_background_tasks as s13_background_tasks
import agents.s_full as s_full


class FakeMessagesAPI:
def __init__(self, responses):
self._responses = iter(responses)
self.call_count = 0

def create(self, **kwargs):
self.call_count += 1
return next(self._responses)


class FakeS13BackgroundManager:
def __init__(self):
self._running = True
self.wait_called = False

def drain_notifications(self):
return []

def has_running_tasks(self):
return self._running

def wait_for_notifications(self):
self.wait_called = True
self._running = False
return [
{
"task_id": "bg-1",
"status": "completed",
"preview": "done",
"output_file": ".runtime-tasks/bg-1.log",
}
]


class FakeSFullBackgroundManager:
def __init__(self):
self._running = True
self.wait_called = False

def drain(self):
return []

def has_running_tasks(self):
return self._running

def wait_for_notifications(self):
self.wait_called = True
self._running = False
return [{"task_id": "bg-1", "status": "completed", "result": "done"}]


class BackgroundNotificationTests(unittest.TestCase):
def test_s13_agent_loop_waits_for_background_results_after_end_turn(self):
messages = [{"role": "user", "content": "Run tests in the background"}]
fake_bg = FakeS13BackgroundManager()
fake_api = FakeMessagesAPI(
[
SimpleNamespace(
stop_reason="end_turn", content="Started background work."
),
SimpleNamespace(
stop_reason="end_turn", content="Background work completed."
),
]
)
original_bg = s13_background_tasks.BG
original_client = s13_background_tasks.client
try:
s13_background_tasks.BG = fake_bg
s13_background_tasks.client = SimpleNamespace(messages=fake_api)
s13_background_tasks.agent_loop(messages)
finally:
s13_background_tasks.BG = original_bg
s13_background_tasks.client = original_client

self.assertTrue(fake_bg.wait_called)
self.assertEqual(fake_api.call_count, 2)
self.assertTrue(
any(
message["role"] == "user"
and isinstance(message["content"], str)
and "<background-results>" in message["content"]
for message in messages
)
)

def test_s_full_agent_loop_waits_for_background_results_after_end_turn(self):
messages = [{"role": "user", "content": "Run tests in the background"}]
fake_bg = FakeSFullBackgroundManager()
fake_api = FakeMessagesAPI(
[
SimpleNamespace(
stop_reason="end_turn", content="Started background work."
),
SimpleNamespace(
stop_reason="end_turn", content="Background work completed."
),
]
)
original_bg = s_full.BG
original_client = s_full.client
try:
s_full.BG = fake_bg
s_full.client = SimpleNamespace(messages=fake_api)
s_full.agent_loop(messages)
finally:
s_full.BG = original_bg
s_full.client = original_client

self.assertTrue(fake_bg.wait_called)
self.assertEqual(fake_api.call_count, 2)
self.assertTrue(
any(
message["role"] == "user"
and isinstance(message["content"], str)
and "<background-results>" in message["content"]
for message in messages
)
)


if __name__ == "__main__":
unittest.main()