Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions agents/s08_background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ class BackgroundManager:
def __init__(self):
self.tasks = {} # task_id -> {status, result, command}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()
self._tasks_lock = threading.Lock() # protect self.tasks
self._queue_lock = threading.Lock() # protect _notification_queue

def run(self, command: str) -> str:
"""Start a background thread, return task_id immediately."""
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {"status": "running", "result": None, "command": command}
with self._tasks_lock:
self.tasks[task_id] = {"status": "running", "result": None, "command": command}
thread = threading.Thread(
target=self._execute, args=(task_id, command), daemon=True
)
Expand All @@ -78,9 +80,10 @@ def _execute(self, task_id: str, command: str):
except Exception as e:
output = f"Error: {e}"
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._lock:
with self._tasks_lock:
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._queue_lock:
self._notification_queue.append({
"task_id": task_id,
"status": status,
Expand All @@ -90,19 +93,20 @@ def _execute(self, task_id: str, command: str):

def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
if task_id:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
lines = []
for tid, t in self.tasks.items():
lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
with self._tasks_lock:
if task_id:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
lines = []
for tid, t in self.tasks.items():
lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
return "\n".join(lines) if lines else "No background tasks."

def drain_notifications(self) -> list:
"""Return and clear all pending completion notifications."""
with self._lock:
with self._queue_lock:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
Expand Down