diff --git a/orchestrator/api/harness_commands.py b/orchestrator/api/harness_commands.py new file mode 100644 index 000000000..65284654e --- /dev/null +++ b/orchestrator/api/harness_commands.py @@ -0,0 +1,307 @@ +"""HARNESS self-management command handler (PRD-141 US-025). + +Processes inbound ``/approve`` and ``/reject`` commands for queued high-risk +HARNESS prescriptions (the ones US-024 escalates to a channel). Every command +is gated behind two checks, in this order, BEFORE any state is touched: + + 1. ``HARNESS_SELF_MANAGEMENT_ENABLED`` — Phase 5 stays inert by default, so a + disabled platform answers every command with a plain "disabled" message + and changes nothing (mirrors ``_apply_approved_board_tasks``). + 2. A workspace-ADMIN authorization check on the *calling user*. A caller who + is not an active ``owner``/``admin`` member of the workspace can never + mutate state — the command is refused before any task or agent is read or + written. This is the security boundary US-026 reviews. + +``/approve`` short-circuits the weekly tick: it applies the prescription now via +the existing ``HarnessService._auto_apply_prescription`` and records the board +task id in the shared US-021 ledger, so the next tick never re-applies it. +``/reject`` marks the board task ``rejected`` so ``_get_rejected_signatures`` +suppresses the same prescription on future ticks. + +Wiring into the live channel inbound path is done in US-026 (the gate that +enables the flag on a canary workspace and exercises this path end-to-end). +""" +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any, Dict, Optional +from uuid import UUID + +from services.harness_service import get_harness_service + +logger = logging.getLogger(__name__) + + +def _make_executor(db, workspace_id: UUID): + """Construct a PlatformActionExecutor (lazy import keeps the heavy action / + RAG dependency chain out of module load — and is the test injection seam).""" + from modules.tools.discovery.platform_executor import PlatformActionExecutor + + return PlatformActionExecutor(db=db, workspace_id=workspace_id) + + +def _caller_user_id(caller_identity: Optional[Dict[str, Any]]) -> Optional[int]: + """Extract a positive integer platform user_id from caller_identity, or None. + + Fail-closed and deterministic: rejects a non-dict identity, a missing/None + user_id, a bool (``True`` would otherwise coerce to user 1), a non-numeric + value, and any non-positive id. Centralising the coercion here means the + authz query and the audit fields all see the exact same validated id rather + than relying on DB-driver coercion of caller-controlled input. + """ + if not isinstance(caller_identity, dict): + return None + raw = caller_identity.get("user_id") + if raw is None or isinstance(raw, bool): + return None + try: + uid = int(raw) + except (TypeError, ValueError): + return None + return uid if uid > 0 else None + + +def _caller_is_workspace_admin( + db, workspace_id: UUID, caller_identity: Optional[Dict[str, Any]] +) -> bool: + """Return True only if caller_identity is an active owner/admin member. + + The single authorization primitive for every HARNESS command mutation, and + deliberately fail-closed: a non-dict identity, a missing ``user_id``, an + unknown user, a non-admin role, or any error all yield False. Mirrors the + membership check the PlatformActionExecutor admin gate uses, but keyed to + the *specific* calling user rather than "the workspace has some admin". + """ + user_id = _caller_user_id(caller_identity) + if user_id is None: + return False + try: + from core.workspaces.models import WorkspaceMember + + member = ( + db.query(WorkspaceMember) + .filter( + WorkspaceMember.workspace_id == workspace_id, + WorkspaceMember.user_id == user_id, + WorkspaceMember.role.in_(("owner", "admin")), + WorkspaceMember.is_active.is_(True), + ) + .first() + ) + return member is not None + except Exception: + logger.exception( + "[HARNESS] authorization check failed for workspace=%s user=%s", + workspace_id, user_id, + ) + return False + + +def _find_task_by_rx(tasks: Any, rx_id: str) -> Optional[Dict[str, Any]]: + """Find the queued HARNESS board task carrying the ``rx:{rx_id}`` tag. + + US-024 tags every queued prescription with ``harness`` and ``rx:{rx_id}``; + the lookup keys on both so an unrelated ``harness`` task can never match. + """ + rx_tag = f"rx:{rx_id}" + if not isinstance(tasks, list): + return None + for task in tasks: + if not isinstance(task, dict): + continue + tags = task.get("tags") or [] + if "harness" in tags and rx_tag in tags: + return task + return None + + +async def handle_harness_command( + db, + workspace_id: UUID, + command: str, + rx_id: str, + caller_identity: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + """Process a ``/approve`` or ``/reject`` command for a HARNESS prescription. + + Returns ``{"success": bool, "message": str, ...}``. Authorization is enforced + FIRST: an unauthorized caller is refused before any state is read or written. + """ + from config import config + + # Phase 5 stays dark unless explicitly enabled. No command does anything. + if not config.HARNESS_SELF_MANAGEMENT_ENABLED: + return {"success": False, "message": "HARNESS self-management is disabled"} + + # Authorization is non-negotiable and happens before ANY mutation. + if not _caller_is_workspace_admin(db, workspace_id, caller_identity): + logger.warning( + "[HARNESS] Unauthorized '%s' for rx=%s in workspace=%s by user=%s", + command, rx_id, workspace_id, + (caller_identity or {}).get("user_id", "unknown") + if isinstance(caller_identity, dict) else "unknown", + ) + return { + "success": False, + "unauthorized": True, + "message": "Only a workspace admin can approve or reject HARNESS changes", + } + + cmd = (command or "").lstrip("/").strip().lower() + if cmd not in ("approve", "reject"): + return {"success": False, "message": f"Unknown command: {command}"} + + if not rx_id: + return {"success": False, "message": "Missing prescription id"} + + svc = get_harness_service() + executor = _make_executor(db, workspace_id) + + # Locate the queued board task for this prescription (tag-keyed). + list_result = await executor.execute("platform_list_tasks", {"tags": ["harness"]}) + tasks: Any = [] + if isinstance(list_result, dict): + tasks = list_result.get("data", list_result.get("tasks", [])) or [] + task = _find_task_by_rx(tasks, rx_id) + if not task: + return { + "success": False, + "message": f"No pending HARNESS change found for {rx_id}", + } + + # A rejected prescription must never be resurrected by a later /approve: + # the board task keeps its 'harness'/'rx:' tags after rejection, so it still + # surfaces in the tag-filtered list and _find_task_by_rx still matches it. + # /reject of an already-rejected task is a harmless idempotent no-op. + if cmd == "approve" and str(task.get("status") or "").lower() == "rejected": + return { + "success": False, + "message": f"{rx_id} was already rejected and cannot be approved", + } + + if cmd == "approve": + return await _approve(svc, executor, workspace_id, task, rx_id, caller_identity) + return await _reject(db, workspace_id, task, rx_id, caller_identity) + + +async def _approve( + svc, + executor: Any, + workspace_id: UUID, + task: Dict[str, Any], + rx_id: str, + caller_identity: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + """Apply an approved prescription now and record it in the US-021 ledger.""" + task_id = str(task.get("id")) + user_id = _caller_user_id(caller_identity) + + # Idempotency: the shared applied-tasks ledger is the source of truth, so a + # second /approve (or a later tick) never re-applies the same change. + ledger = svc._read_applied_tasks(workspace_id) + applied_ids = {str(i) for i in ledger.get("applied_task_ids", [])} + if task_id in applied_ids: + return { + "success": True, + "already_applied": True, + "message": f"{rx_id} was already applied", + } + + agents_by_name = await svc._resolve_agents_by_name(executor) + rx = svc._parse_harness_task(task, agents_by_name=agents_by_name) + if not rx or rx.get("target_id") is None: + # Never apply a change to an unresolved / guessed target. + return { + "success": False, + "message": f"Could not resolve the target for {rx_id}; nothing applied", + } + + current_before = svc._snapshot_current_value(rx) + apply_result = await svc._auto_apply_prescription(executor, rx) + if not apply_result.get("success"): + return { + "success": False, + "message": f"Failed to apply {rx_id}: {apply_result.get('error', 'unknown')}", + } + + # Apply succeeded — only now mark the board task done (so a failed apply + # never leaves a task falsely completed). 'done' is a valid action status, + # so this also sets completed_at via the handler. + await executor.execute( + "platform_update_task_status", {"task_id": task.get("id"), "status": "done"} + ) + + entry = { + "task_id": task_id, + "prescription_id": rx.get("prescription_id"), + "target_id": rx.get("target_id"), + "target_name": rx.get("target_name"), + "change_type": rx.get("change_type"), + "current_value_before": current_before, + "proposed_value": rx.get("proposed_value"), + "applied_at": datetime.now(timezone.utc).isoformat(), + "approved_via": "command", + "approved_by": user_id, + } + applied_ids.add(task_id) + svc._write_applied_tasks(workspace_id, ledger, applied_ids, [entry]) + + logger.info( + "[HARNESS] APPROVED rx=%s (%s for %s) in workspace=%s by user=%s", + rx_id, rx.get("change_type"), rx.get("target_name"), workspace_id, user_id, + ) + return { + "success": True, + "message": f"Applied {rx.get('change_type')} for {rx.get('target_name')}", + "change_type": rx.get("change_type"), + "target_name": rx.get("target_name"), + } + + +async def _reject( + db, + workspace_id: UUID, + task: Dict[str, Any], + rx_id: str, + caller_identity: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + """Mark the board task rejected so it is never proposed or applied again.""" + from core.models.core import BoardTask + + raw_id = task.get("id") + try: + row = ( + db.query(BoardTask) + .filter( + BoardTask.id == int(raw_id), + BoardTask.workspace_id == workspace_id, + ) + .first() + ) + except (TypeError, ValueError): + row = None + if not row: + return { + "success": False, + "message": f"No pending HARNESS change found for {rx_id}", + } + + # The board action layer only permits the five kanban statuses, so set + # 'rejected' on the ORM row directly (this is internal server code, not an + # LLM tool call). The title is left untouched: it carries the signature + # _get_rejected_signatures matches on to suppress re-proposal next tick. + user_id = _caller_user_id(caller_identity) + row.status = "rejected" + row.blocked_reason = f"Rejected by workspace admin (user {user_id}) via command" + row.blocked_at = datetime.now(timezone.utc) + db.commit() + + logger.info( + "[HARNESS] REJECTED rx=%s in workspace=%s by user=%s", rx_id, workspace_id, user_id, + ) + return { + "success": True, + "rejected": True, + "message": f"Rejected {rx_id}; HARNESS will not propose it again", + } diff --git a/orchestrator/modules/tools/discovery/handlers_board_tasks.py b/orchestrator/modules/tools/discovery/handlers_board_tasks.py index 6906c3732..695e235bb 100644 --- a/orchestrator/modules/tools/discovery/handlers_board_tasks.py +++ b/orchestrator/modules/tools/discovery/handlers_board_tasks.py @@ -120,6 +120,16 @@ async def list_board_tasks(db: Session, workspace_id: UUID, params: Dict[str, An if priority: query = query.filter(BoardTask.priority == priority) + # Tag filter — return tasks whose tags include ALL requested tags. BoardTask.tags + # is JSONB, so .contains([...]) compiles to `tags @> [...]` (array containment). + # HARNESS self-management relies on this to find its own '[HARNESS]' tasks + # (tagged 'harness' + 'rx:{id}'); without it the lookup returns unrelated tasks. + tags = params.get("tags") + if isinstance(tags, str): + tags = [tags] + if isinstance(tags, list) and tags: + query = query.filter(BoardTask.tags.contains(tags)) + agent_name = params.get("assigned_agent_name") if agent_name: from core.models import Agent @@ -149,8 +159,10 @@ async def list_board_tasks(db: Session, workspace_id: UUID, params: Dict[str, An result.append({ "id": t.id, "title": t.title, + "description": t.description, "status": t.status, "priority": t.priority, + "tags": t.tags or [], "assigned_agent": agents_map.get(t.assigned_agent_id, "unassigned"), "created_at": str(t.created_at) if t.created_at else None, "started_at": str(t.started_at) if t.started_at else None, diff --git a/orchestrator/services/harness_service.py b/orchestrator/services/harness_service.py index 7542f7477..e711834bf 100644 --- a/orchestrator/services/harness_service.py +++ b/orchestrator/services/harness_service.py @@ -753,6 +753,7 @@ async def _phase_apply( "applied": [], "queued": [], "failed": [], + "escalated": [], } if not prescriptions: @@ -800,7 +801,9 @@ async def _phase_apply( f"**Rationale:** {rx.get('rationale', '')}\n\n" f"**Expected Improvement:** {rx.get('expected_improvement', '')}" ), - "tags": ["harness", "org-review", f"risk-{risk}"], + # rx:{rx_id} lets US-025 /approve resolve the board task + # back to its prescription by tag. + "tags": ["harness", "org-review", f"risk-{risk}", f"rx:{rx_id}"], "priority": priority, }) changelog["queued"].append({ @@ -809,6 +812,9 @@ async def _phase_apply( "change_type": change_type, "board_task_id": task_result.get("data", {}).get("id") if isinstance(task_result, dict) else None, }) + # US-024: nudge a human for high-risk changes (board task is + # the durable record; the notification is best-effort). + await self._maybe_escalate(db, workspace_id, rx, changelog) except Exception as exc: logger.error("[HARNESS] Failed to queue rx %s: %s", rx_id, exc, exc_info=True) changelog["failed"].append({ @@ -820,11 +826,76 @@ async def _phase_apply( await self._apply_approved_board_tasks(executor, workspace_id, changelog) logger.info( - "[HARNESS] APPLY done — %d applied, %d queued, %d failed", - len(changelog["applied"]), len(changelog["queued"]), len(changelog["failed"]), + "[HARNESS] APPLY done — %d applied, %d queued, %d failed, %d escalated", + len(changelog["applied"]), len(changelog["queued"]), + len(changelog["failed"]), len(changelog.get("escalated", [])), ) return changelog + async def _maybe_escalate( + self, + db: "Session", + workspace_id: UUID, + rx: Dict[str, Any], + changelog: Dict[str, List[Dict[str, Any]]], + ) -> None: + """Notify a human to approve/reject a high-risk queued prescription. + + Only fires for risk >= _HIGH_PRIORITY_RISK, and only when the workspace + has a connected channel — otherwise there is nobody to notify and the + board task already stands for in-app review (so we skip silently rather + than record a phantom escalation). Records an 'escalated' changelog + entry with whether delivery actually succeeded. Never raises. + """ + risk = rx.get("risk_score", 5) + if risk < _HIGH_PRIORITY_RISK: + return + if not self._workspace_has_channel(db, workspace_id): + return + from core.services.notification_service import send_workspace_notification + + notified = await send_workspace_notification( + str(workspace_id), self._build_escalation_message(rx, risk), channel="default" + ) + changelog.setdefault("escalated", []).append({ + "prescription_id": rx.get("prescription_id"), + "target_name": rx.get("target_name", "unknown"), + "change_type": rx.get("change_type", "unknown"), + "risk_score": risk, + "notified": notified, + }) + + def _build_escalation_message(self, rx: Dict[str, Any], risk: int) -> str: + """Plain-text approval request with the /approve|/reject instructions.""" + rx_id = rx.get("prescription_id", "unknown") + return ( + f"HARNESS needs approval (risk {risk}/5)\n" + f"{rx.get('change_type', 'unknown')} for {rx.get('target_name', 'unknown')}\n" + f"Current: {json.dumps(rx.get('current_value', {}))}\n" + f"Proposed: {json.dumps(rx.get('proposed_value', {}))}\n" + f"Reply /approve {rx_id} or /reject {rx_id}" + ) + + def _workspace_has_channel(self, db: "Session", workspace_id: UUID) -> bool: + """True if the workspace has any channel_connections row. + + Matches how channels.sender resolves a channel — by row presence, not + by the status column (which is 'active' in channels.manager but + 'connected' elsewhere) — so 'can escalate' agrees with 'can deliver'. + """ + try: + from sqlalchemy import text + row = db.execute( + text("SELECT 1 FROM channel_connections WHERE workspace_id = :ws LIMIT 1"), + {"ws": str(workspace_id)}, + ).fetchone() + return row is not None + except Exception: + logger.debug( + "[HARNESS] channel-presence check failed for ws=%s", workspace_id, exc_info=True + ) + return False + # ------------------------------------------------------------------ # Phase 5: BASELINE # ------------------------------------------------------------------ @@ -913,10 +984,7 @@ async def _phase_baseline( artifacts: Dict[str, str] = {} for label, path, content in files_to_write: try: - await executor.execute("workspace_write_file", { - "path": path, - "content": content, - }) + self._write_workspace_file(workspace_id, path, content) artifacts[label] = "ok" except Exception as exc: logger.error("[HARNESS] Failed to write %s (%s): %s", label, path, exc, exc_info=True) @@ -1067,6 +1135,30 @@ def _write_last_run( workspace_id, exc, ) + def _write_workspace_file(self, workspace_id: UUID, rel_path: str, content: str) -> None: + """Persist a HARNESS artifact directly under the workspace volume. + + HARNESS reads (_read_baseline / _read_applied_tasks / _read_last_run) go + straight to the workspace volume on disk, so writes MUST hit the same + store. ``workspace_write_file`` is an agent tool-execution primitive, not + a registered platform action, so routing harness writes through + ``executor.execute("workspace_write_file", ...)`` silently returns an + "unknown action" error and the ledger / baseline never persist. Writing + directly (like _write_last_run) keeps reads and writes on one store and + raises on a real I/O error so callers record the failure, never mask it. + """ + from config import config + import os + + abs_path = os.path.join( + config.WORKSPACE_VOLUME_PATH, + str(workspace_id), + rel_path.lstrip("/"), + ) + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + with open(abs_path, "w") as f: + f.write(content) + @staticmethod def _resolve_auto_agent( db: "Session", workspace_id: UUID @@ -1409,8 +1501,8 @@ async def _apply_approved_board_tasks( }) if newly_applied: - await self._write_applied_tasks( - executor, workspace_id, ledger, applied_ids, newly_applied + self._write_applied_tasks( + workspace_id, ledger, applied_ids, newly_applied ) except Exception as exc: # A failure here means human-approved changes were silently dropped — @@ -1499,23 +1591,28 @@ def _read_applied_tasks(self, workspace_id: UUID) -> Dict[str, Any]: ) return {"applied_task_ids": [], "entries": []} - async def _write_applied_tasks( + def _write_applied_tasks( self, - executor: "PlatformActionExecutor", workspace_id: UUID, ledger: Dict[str, Any], applied_ids: set, newly_applied: List[Dict[str, Any]], ) -> None: - """Persist the applied-tasks ledger via the workspace file store.""" + """Persist the applied-tasks ledger to the workspace volume on disk. + + Writes to the same path _read_applied_tasks reads, so the idempotency + key round-trips. See _write_workspace_file for why this is not routed + through the executor. + """ ledger["applied_task_ids"] = sorted(applied_ids) ledger.setdefault("entries", []).extend(newly_applied) ledger["updated_at"] = datetime.now(timezone.utc).isoformat() try: - await executor.execute("workspace_write_file", { - "path": "/harness/applied_tasks.json", - "content": json.dumps(ledger, indent=2), - }) + self._write_workspace_file( + workspace_id, + "/harness/applied_tasks.json", + json.dumps(ledger, indent=2), + ) except Exception as exc: logger.warning( "[HARNESS] Failed to persist applied_tasks ledger for %s: %s", diff --git a/orchestrator/tests/test_board_task_handlers.py b/orchestrator/tests/test_board_task_handlers.py new file mode 100644 index 000000000..cff3774a3 --- /dev/null +++ b/orchestrator/tests/test_board_task_handlers.py @@ -0,0 +1,149 @@ +"""PRD-141: list_board_tasks projection must surface `tags` and `description`. + +The HARNESS command path keys on these two fields: rx_id lives ONLY in a task's +tags (`rx:{id}`), and _parse_harness_task reads Current/Proposed out of the +description. Before this fix the projection dropped both, so _find_task_by_rx +always returned "no pending change" and _parse_harness_task saw an empty body — +the bug only stayed hidden because the unit suites used an in-memory fake +executor instead of this real handler. + +The handler is loaded directly from its file: importing the `modules.tools` +package pulls in the RAG/multimodal chain (camelot) that isn't installed in the +unit env. The function's `from core.models.core import BoardTask` runs at call +time and resolves the real ORM class, so the SQLAlchemy class-attribute +expressions are genuine — the fake query just ignores them. The JSONB `tags` +*filter* (`tags @> [...]`) needs a real Postgres and is exercised at the +US-026 live gate, not here. +""" +import asyncio +import importlib.util +import os +import sys +import types +from uuid import UUID + +os.environ.setdefault("POSTGRES_USER", "test") +os.environ.setdefault("POSTGRES_PASSWORD", "test") +os.environ.setdefault("POSTGRES_HOST", "localhost") +os.environ.setdefault("POSTGRES_PORT", "5432") +os.environ.setdefault("POSTGRES_DB", "test") + + +def _install_fake_apscheduler(): + if "apscheduler" in sys.modules: + return + aps = types.ModuleType("apscheduler") + schedulers = types.ModuleType("apscheduler.schedulers") + asyncio_mod = types.ModuleType("apscheduler.schedulers.asyncio") + asyncio_mod.AsyncIOScheduler = type("AsyncIOScheduler", (), {}) + jobstores = types.ModuleType("apscheduler.jobstores") + memory_mod = types.ModuleType("apscheduler.jobstores.memory") + memory_mod.MemoryJobStore = type("MemoryJobStore", (), {}) + aps.schedulers = schedulers + aps.jobstores = jobstores + schedulers.asyncio = asyncio_mod + jobstores.memory = memory_mod + sys.modules.update({ + "apscheduler": aps, + "apscheduler.schedulers": schedulers, + "apscheduler.schedulers.asyncio": asyncio_mod, + "apscheduler.jobstores": jobstores, + "apscheduler.jobstores.memory": memory_mod, + }) + + +_install_fake_apscheduler() + + +def _load_handler(): + """Load handlers_board_tasks.py directly, skipping the package __init__.""" + here = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + path = os.path.join(here, "modules", "tools", "discovery", "handlers_board_tasks.py") + spec = importlib.util.spec_from_file_location("hbt_under_test", path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +_HANDLER = _load_handler() +_WS_ID = UUID("00000000-0000-0000-0000-000000000001") + + +class _FakeRow: + """A BoardTask-shaped row carrying exactly the attrs the projection reads.""" + + def __init__(self, **kw): + self.id = kw.get("id") + self.title = kw.get("title") + self.description = kw.get("description") + self.status = kw.get("status", "review") + self.priority = kw.get("priority", "medium") + self.tags = kw.get("tags") + self.assigned_agent_id = kw.get("assigned_agent_id") # None -> skip Agent query + self.created_at = kw.get("created_at") + self.started_at = kw.get("started_at") + self.completed_at = kw.get("completed_at") + self.error_message = kw.get("error_message") + + +class _FakeQuery: + def __init__(self, rows): + self._rows = rows + + def filter(self, *a, **k): + return self + + def order_by(self, *a, **k): + return self + + def limit(self, *a, **k): + return self + + def all(self): + return self._rows + + +class _FakeDB: + def __init__(self, rows): + self._rows = rows + + def query(self, model): + return _FakeQuery(self._rows) + + +def test_list_board_tasks_projects_tags_and_description(): + """A queued HARNESS task survives the projection with its tags + description + intact, so the command path can find it by rx tag and parse its body.""" + rows = [ + _FakeRow( + id=7, + title="[HARNESS] heartbeat_tune for ScribeAgent", + description="**Current:** {...}\n**Proposed:** {...}", + status="review", + tags=["harness", "org-review", "risk-4", "rx:rx-esc-1"], + assigned_agent_id=None, + ) + ] + db = _FakeDB(rows) + + result = asyncio.run( + _HANDLER.list_board_tasks(db, _WS_ID, {"tags": ["harness"]}) + ) + + assert result["success"] is True + assert result["total"] == 1 + task = result["tasks"][0] + # The two fields the HARNESS command path depends on must be present. + assert task["tags"] == ["harness", "org-review", "risk-4", "rx:rx-esc-1"] + assert task["description"] == "**Current:** {...}\n**Proposed:** {...}" + + +def test_list_board_tasks_tags_default_empty_list(): + """A row with NULL tags projects to [] (never None), so `"harness" in tags` + in _find_task_by_rx is always a safe membership test.""" + rows = [_FakeRow(id=8, title="plain task", description="d", tags=None, assigned_agent_id=None)] + db = _FakeDB(rows) + + result = asyncio.run(_HANDLER.list_board_tasks(db, _WS_ID, {})) + + assert result["tasks"][0]["tags"] == [] diff --git a/orchestrator/tests/test_harness_commands.py b/orchestrator/tests/test_harness_commands.py new file mode 100644 index 000000000..a3b483364 --- /dev/null +++ b/orchestrator/tests/test_harness_commands.py @@ -0,0 +1,331 @@ +"""PRD-141 US-025: /approve /reject command handler WITH authorization. + +The security-critical story: every command must enforce a workspace-ADMIN check +before any mutation, and an unauthorized caller must change nothing. These are +unit tests — the DB is faked down to the two queries the handler makes +(WorkspaceMember for authz, BoardTask for reject), and the PlatformActionExecutor +is replaced with the in-memory fake from the US-021 suite. Dummy POSTGRES_* and +the apscheduler stub let the harness_service import chain load without a real DB +or the prod-only scheduler dependency. +""" +import asyncio +import json +import os +import sys +import types + +os.environ.pop("HARNESS_SELF_MANAGEMENT_ENABLED", None) +os.environ.setdefault("POSTGRES_USER", "test") +os.environ.setdefault("POSTGRES_PASSWORD", "test") +os.environ.setdefault("POSTGRES_HOST", "localhost") +os.environ.setdefault("POSTGRES_PORT", "5432") +os.environ.setdefault("POSTGRES_DB", "test") + + +def _install_fake_apscheduler(): + if "apscheduler" in sys.modules: + return + aps = types.ModuleType("apscheduler") + schedulers = types.ModuleType("apscheduler.schedulers") + asyncio_mod = types.ModuleType("apscheduler.schedulers.asyncio") + asyncio_mod.AsyncIOScheduler = type("AsyncIOScheduler", (), {}) + jobstores = types.ModuleType("apscheduler.jobstores") + memory_mod = types.ModuleType("apscheduler.jobstores.memory") + memory_mod.MemoryJobStore = type("MemoryJobStore", (), {}) + aps.schedulers = schedulers + aps.jobstores = jobstores + schedulers.asyncio = asyncio_mod + jobstores.memory = memory_mod + sys.modules.update({ + "apscheduler": aps, + "apscheduler.schedulers": schedulers, + "apscheduler.schedulers.asyncio": asyncio_mod, + "apscheduler.jobstores": jobstores, + "apscheduler.jobstores.memory": memory_mod, + }) + + +_install_fake_apscheduler() + +from config import config +import api.harness_commands as hc + +_WS_ID = "00000000-0000-0000-0000-000000000001" +_RX_ID = "rx-esc-1" + + +def _ledger_path(volume, workspace_id=_WS_ID): + """The applied-tasks ledger path — _write_applied_tasks writes it and + _read_applied_tasks reads it (the command path persists here on /approve).""" + return os.path.join(str(volume), str(workspace_id), "harness", "applied_tasks.json") + + +def _harness_task( + change_type="heartbeat_tune", + target_name="ScribeAgent", + current=None, + proposed=None, + risk=4, + task_id=7, + rx_id=_RX_ID, +): + """A queued [HARNESS] board task exactly as US-024 tags it for escalation.""" + current = {"interval_minutes": 30} if current is None else current + proposed = {"interval_minutes": 90} if proposed is None else proposed + return { + "id": task_id, + "title": f"[HARNESS] {change_type} for {target_name}", + "description": ( + f"**Risk Score:** {risk}/5\n\n" + f"**Change Type:** {change_type}\n\n" + f"**Current:** {json.dumps(current)}\n\n" + f"**Proposed:** {json.dumps(proposed)}\n\n" + f"**Rationale:** because reasons\n\n" + f"**Expected Improvement:** save tokens" + ), + "tags": ["harness", "org-review", f"risk-{risk}", f"rx:{rx_id}"], + } + + +class _FakeExecutor: + """Records execute() calls; returns canned results for the actions the + command handler invokes (list tasks/agents, apply, update status, write).""" + + def __init__(self, tasks, agents): + self._tasks = tasks + self._agents = agents + self.calls = [] + + async def execute(self, action, params): + self.calls.append((action, params)) + if action == "platform_list_tasks": + return {"data": self._tasks} + if action == "platform_list_agents": + return {"data": self._agents} + return {"success": True} + + def actions(self): + return [action for action, _ in self.calls] + + +class _FakeQuery: + def __init__(self, result): + self._result = result + + def filter(self, *args, **kwargs): + return self + + def first(self): + return self._result + + +class _FakeBoardTask: + def __init__(self, task_id): + self.id = task_id + self.workspace_id = _WS_ID + self.status = "review" + self.blocked_reason = None + self.blocked_at = None + + +class _FakeDB: + """Answers the two queries the handler makes, by model name.""" + + def __init__(self, member=None, board_task=None): + self._member = member + self._board_task = board_task + self.committed = False + + def query(self, model): + name = getattr(model, "__name__", "") + if name == "WorkspaceMember": + return _FakeQuery(self._member) + if name == "BoardTask": + return _FakeQuery(self._board_task) + return _FakeQuery(None) + + def commit(self): + self.committed = True + + +_ADMIN_MEMBER = object() # truthy sentinel: an active owner/admin row exists +_ADMIN = {"user_id": 5} +_NON_ADMIN = {"user_id": 99} + + +def _patch_executor(monkeypatch, ex): + monkeypatch.setattr(hc, "_make_executor", lambda db, workspace_id: ex) + + +def test_handle_approve_command(monkeypatch, tmp_path): + """Admin /approve applies the prescription now and records it in the ledger.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) + task = _harness_task(task_id=7) + ex = _FakeExecutor(tasks=[task], agents=[{"id": 42, "name": "ScribeAgent"}]) + _patch_executor(monkeypatch, ex) + db = _FakeDB(member=_ADMIN_MEMBER) + + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/approve", _RX_ID, _ADMIN) + ) + + assert result["success"] is True + # The change was actually applied to the resolved agent id. + assert ( + "platform_configure_agent_heartbeat", + {"agent_id": 42, "interval_minutes": 90}, + ) in ex.calls + # The board task was marked done and the ledger persisted to disk — the same + # store _read_applied_tasks reads, so a second /approve is a no-op (idempotent). + assert ("platform_update_task_status", {"task_id": 7, "status": "done"}) in ex.calls + assert os.path.exists(_ledger_path(tmp_path)) + ledger = hc.get_harness_service()._read_applied_tasks(_WS_ID) + assert "7" in {str(i) for i in ledger["applied_task_ids"]} + + +def test_handle_reject_command(monkeypatch): + """Admin /reject flips the board task to 'rejected' and commits, so + _get_rejected_signatures suppresses re-proposal next tick.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + task = _harness_task(task_id=7) + ex = _FakeExecutor(tasks=[task], agents=[{"id": 42, "name": "ScribeAgent"}]) + _patch_executor(monkeypatch, ex) + board_row = _FakeBoardTask(7) + db = _FakeDB(member=_ADMIN_MEMBER, board_task=board_row) + + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/reject", _RX_ID, _ADMIN) + ) + + assert result["success"] is True + assert board_row.status == "rejected" + assert db.committed is True + # Nothing was applied on a reject. + assert "platform_configure_agent_heartbeat" not in ex.actions() + + +def test_handle_unknown_rx_id(monkeypatch, tmp_path): + """An rx id with no matching queued task returns not-found, no mutation.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) + task = _harness_task(task_id=7, rx_id="rx-different") + ex = _FakeExecutor(tasks=[task], agents=[{"id": 42, "name": "ScribeAgent"}]) + _patch_executor(monkeypatch, ex) + db = _FakeDB(member=_ADMIN_MEMBER) + + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/approve", "rx-missing", _ADMIN) + ) + + assert result["success"] is False + assert "no pending harness change" in result["message"].lower() + assert "platform_configure_agent_heartbeat" not in ex.actions() + + +def test_handle_already_applied(monkeypatch, tmp_path): + """A second /approve is idempotent: the ledger already has the task id, so + nothing is applied again.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) + task = _harness_task(task_id=7) + ex = _FakeExecutor(tasks=[task], agents=[{"id": 42, "name": "ScribeAgent"}]) + _patch_executor(monkeypatch, ex) + db = _FakeDB(member=_ADMIN_MEMBER) + # The shared US-021 ledger already records this task as applied. + monkeypatch.setattr( + hc.get_harness_service(), + "_read_applied_tasks", + lambda workspace_id: {"applied_task_ids": ["7"], "entries": []}, + ) + + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/approve", _RX_ID, _ADMIN) + ) + + assert result["success"] is True + assert result.get("already_applied") is True + assert "platform_configure_agent_heartbeat" not in ex.actions() + # Nothing re-applied -> no fresh ledger write to disk. + assert not os.path.exists(_ledger_path(tmp_path)) + + +def test_handle_approve_rejected_task_refused(monkeypatch, tmp_path): + """A /reject keeps the board task's harness/rx tags, so it still surfaces in + the tag-filtered list and _find_task_by_rx still matches it. A later /approve + must NOT resurrect it: refused before any apply or ledger write.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) + task = _harness_task(task_id=7) + task["status"] = "rejected" # already rejected; tags intact + ex = _FakeExecutor(tasks=[task], agents=[{"id": 42, "name": "ScribeAgent"}]) + _patch_executor(monkeypatch, ex) + db = _FakeDB(member=_ADMIN_MEMBER) + + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/approve", _RX_ID, _ADMIN) + ) + + assert result["success"] is False + assert "already rejected" in result["message"].lower() + # Nothing applied, no ledger written to disk. + assert "platform_configure_agent_heartbeat" not in ex.actions() + assert not os.path.exists(_ledger_path(tmp_path)) + + +def test_unauthorized_caller_rejected(monkeypatch): + """A non-admin caller is refused before ANY state is read or written — + no task lookup, no apply, no commit. This is the US-026 security boundary.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + task = _harness_task(task_id=7) + ex = _FakeExecutor(tasks=[task], agents=[{"id": 42, "name": "ScribeAgent"}]) + _patch_executor(monkeypatch, ex) + # member=None -> the WorkspaceMember admin query finds no row -> not authorized. + board_row = _FakeBoardTask(7) + db = _FakeDB(member=None, board_task=board_row) + + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/approve", _RX_ID, _NON_ADMIN) + ) + + assert result["success"] is False + assert result.get("unauthorized") is True + # Refused before any executor call or DB mutation. + assert ex.calls == [] + assert db.committed is False + assert board_row.status == "review" + + +def test_malformed_identity_shapes_refused(monkeypatch): + """Every malformed/hostile caller_identity is fail-closed refused before any + state is touched — including a bool (which would coerce to user 1) and a + non-positive id. An admin member row exists, so only the identity shape can + grant or deny access here.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + for identity in (None, {}, {"user_id": None}, {"user_id": 0}, {"user_id": True}, + {"user_id": -1}, {"user_id": "abc"}, "not-a-dict"): + ex = _FakeExecutor(tasks=[_harness_task()], agents=[{"id": 42, "name": "ScribeAgent"}]) + _patch_executor(monkeypatch, ex) + db = _FakeDB(member=_ADMIN_MEMBER) + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/approve", _RX_ID, identity) + ) + assert result["success"] is False, identity + assert result.get("unauthorized") is True, identity + assert ex.calls == [], identity + + +def test_disabled_flag_is_noop(monkeypatch): + """Flag off -> every command is inert, regardless of caller.""" + monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", False) + ex = _FakeExecutor(tasks=[_harness_task()], agents=[]) + _patch_executor(monkeypatch, ex) + db = _FakeDB(member=_ADMIN_MEMBER) + + result = asyncio.run( + hc.handle_harness_command(db, _WS_ID, "/approve", _RX_ID, _ADMIN) + ) + + assert result["success"] is False + assert "disabled" in result["message"].lower() + assert ex.calls == [] diff --git a/orchestrator/tests/test_harness_self_management.py b/orchestrator/tests/test_harness_self_management.py index 2c4249771..001631619 100644 --- a/orchestrator/tests/test_harness_self_management.py +++ b/orchestrator/tests/test_harness_self_management.py @@ -129,15 +129,19 @@ def test_flag_defaults_false(): # --------------------------------------------------------------------------- _WS_ID = "00000000-0000-0000-0000-000000000001" -# A path that does not exist, so _read_applied_tasks finds no ledger and treats -# every task as un-applied. The ledger WRITE goes through the fake executor's -# workspace_write_file (in-memory), so no real filesystem I/O occurs. -_MISSING_VOLUME = "/tmp/harness-self-mgmt-test-no-such-volume" + + +def _ledger_path(volume, workspace_id=_WS_ID): + """The applied-tasks ledger path — _write_applied_tasks writes it and + _read_applied_tasks reads it (proves they share one on-disk store).""" + return os.path.join(str(volume), str(workspace_id), "harness", "applied_tasks.json") class _FakeExecutor: """Records every execute() call and returns canned results for the actions - _apply_approved_board_tasks invokes (list tasks/agents, apply, write file).""" + _apply_approved_board_tasks invokes (list tasks/agents, apply). The ledger + write no longer goes through the executor — it is a direct disk write — so + the fake never sees a workspace_write_file call.""" def __init__(self, tasks, agents): self._tasks = tasks @@ -150,7 +154,7 @@ async def execute(self, action, params): return {"data": self._tasks} if action == "platform_list_agents": return {"data": self._agents} - # apply actions + workspace_write_file all report success + # apply + status-update actions all report success return {"success": True} def actions(self): @@ -172,11 +176,12 @@ def test_approved_tasks_noop_when_flag_off(monkeypatch): assert changelog == {} -def test_approved_tasks_are_executed(monkeypatch): +def test_approved_tasks_are_executed(monkeypatch, tmp_path): """Flag on -> a done [HARNESS] task is parsed, its target resolved, and the - change applied via _auto_apply_prescription, then the ledger is persisted.""" + change applied via _auto_apply_prescription, then the ledger is persisted to + disk — the same store _read_applied_tasks reads, so the next tick is idempotent.""" monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) - monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", _MISSING_VOLUME) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) svc = HarnessService() task = _harness_task( change_type="heartbeat_tune", @@ -199,15 +204,18 @@ def test_approved_tasks_are_executed(monkeypatch): assert applied[0]["task_id"] == "7" assert applied[0]["target_id"] == 42 assert applied[0]["change_type"] == "heartbeat_tune" - # Idempotency ledger was persisted via the workspace file store. - assert "workspace_write_file" in ex.actions() + # Idempotency ledger round-tripped to disk: the file exists at the path the + # reader uses, and _read_applied_tasks sees task 7 as applied. + assert os.path.exists(_ledger_path(tmp_path)) + ledger = svc._read_applied_tasks(_WS_ID) + assert "7" in {str(i) for i in ledger["applied_task_ids"]} -def test_snapshot_recorded_before_apply(monkeypatch): +def test_snapshot_recorded_before_apply(monkeypatch, tmp_path): """The pre-change value is captured as current_value_before so US-022 has a rollback target.""" monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) - monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", _MISSING_VOLUME) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) svc = HarnessService() task = _harness_task( change_type="heartbeat_tune", @@ -225,9 +233,10 @@ def test_snapshot_recorded_before_apply(monkeypatch): assert entry["proposed_value"] == {"interval_minutes": 90} -def test_already_applied_task_is_skipped(monkeypatch): +def test_already_applied_task_is_skipped(monkeypatch, tmp_path): """A task whose id is in the ledger is never re-applied (idempotency).""" monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) svc = HarnessService() monkeypatch.setattr( svc, "_read_applied_tasks", @@ -240,15 +249,15 @@ def test_already_applied_task_is_skipped(monkeypatch): assert "platform_configure_agent_heartbeat" not in ex.actions() assert changelog.get("applied_from_approved", []) == [] - # Nothing applied -> no ledger write. - assert "workspace_write_file" not in ex.actions() + # Nothing applied -> the ledger file is never written. + assert not os.path.exists(_ledger_path(tmp_path)) -def test_unresolved_target_is_skipped_not_applied(monkeypatch): +def test_unresolved_target_is_skipped_not_applied(monkeypatch, tmp_path): """A task whose target_name is not in the agents map is skipped, never applied against a guessed/null target.""" monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) - monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", _MISSING_VOLUME) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) svc = HarnessService() task = _harness_task(change_type="heartbeat_tune", target_name="GhostAgent", task_id=9) ex = _FakeExecutor(tasks=[task], agents=[{"id": 42, "name": "ScribeAgent"}]) @@ -261,11 +270,11 @@ def test_unresolved_target_is_skipped_not_applied(monkeypatch): assert changelog["skipped"][0]["task_id"] == "9" -def test_placeholder_proposed_value_is_refused(monkeypatch): +def test_placeholder_proposed_value_is_refused(monkeypatch, tmp_path): """An approved task whose proposed_value is the 'review_needed' placeholder is never applied — applying it literally would corrupt the agent.""" monkeypatch.setattr(config, "HARNESS_SELF_MANAGEMENT_ENABLED", True) - monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", _MISSING_VOLUME) + monkeypatch.setattr(config, "WORKSPACE_VOLUME_PATH", str(tmp_path)) svc = HarnessService() task = _harness_task( change_type="description_update", @@ -282,8 +291,8 @@ def test_placeholder_proposed_value_is_refused(monkeypatch): assert changelog.get("applied_from_approved", []) == [] assert len(changelog.get("failed", [])) == 1 assert "placeholder" in changelog["failed"][0]["error"] - # Not applied -> not ledgered. - assert "workspace_write_file" not in ex.actions() + # Not applied -> the ledger file is never written. + assert not os.path.exists(_ledger_path(tmp_path)) # --------------------------------------------------------------------------- @@ -524,3 +533,93 @@ def test_model_change_uses_model_id_param(): "platform_update_agent", {"agent_id": 42, "model_id": "claude-sonnet-4-6"}, ) in ex.calls + + +# --------------------------------------------------------------------------- +# US-024: high-risk escalation +# --------------------------------------------------------------------------- + + +def _high_risk_rx(rx_id="rx-esc", risk=4): + return { + "prescription_id": rx_id, + "change_type": "model_change_same_tier", + "target_name": "ScribeAgent", + "current_value": {"model": "haiku"}, + "proposed_value": {"model": "opus"}, + "risk_score": risk, + } + + +def test_escalation_sends_telegram(monkeypatch): + """risk>=4 with a connected channel -> notification sent + escalated entry.""" + import core.services.notification_service as notif_mod + sent = [] + + async def _fake_send(workspace_id, message, channel=None): + sent.append((workspace_id, message, channel)) + return True + + monkeypatch.setattr(notif_mod, "send_workspace_notification", _fake_send) + + svc = HarnessService() + svc._workspace_has_channel = lambda db, ws: True + changelog = {"escalated": []} + asyncio.run(svc._maybe_escalate(None, _WS_ID, _high_risk_rx(), changelog)) + + assert len(sent) == 1 + _, message, _ = sent[0] + assert "/approve rx-esc" in message and "/reject rx-esc" in message + assert len(changelog["escalated"]) == 1 + assert changelog["escalated"][0]["notified"] is True + assert changelog["escalated"][0]["prescription_id"] == "rx-esc" + + +def test_escalation_skipped_no_channel(monkeypatch): + """No connected channel -> no send, no phantom escalated entry.""" + import core.services.notification_service as notif_mod + sent = [] + + async def _fake_send(workspace_id, message, channel=None): + sent.append((workspace_id, message, channel)) + return True + + monkeypatch.setattr(notif_mod, "send_workspace_notification", _fake_send) + + svc = HarnessService() + svc._workspace_has_channel = lambda db, ws: False + changelog = {"escalated": []} + asyncio.run(svc._maybe_escalate(None, _WS_ID, _high_risk_rx(), changelog)) + + assert sent == [] + assert changelog["escalated"] == [] + + +def test_low_risk_is_not_escalated(monkeypatch): + """A channel exists but risk < 4 -> not escalated (only high-risk nags).""" + import core.services.notification_service as notif_mod + sent = [] + + async def _fake_send(workspace_id, message, channel=None): + sent.append(1) + return True + + monkeypatch.setattr(notif_mod, "send_workspace_notification", _fake_send) + + svc = HarnessService() + svc._workspace_has_channel = lambda db, ws: True + changelog = {"escalated": []} + asyncio.run(svc._maybe_escalate(None, _WS_ID, _high_risk_rx(risk=2), changelog)) + + assert sent == [] + assert changelog["escalated"] == [] + + +def test_escalation_message_has_approve_reject(): + """The message carries the /approve|/reject instructions US-025 parses.""" + svc = HarnessService() + msg = svc._build_escalation_message(_high_risk_rx(rx_id="rx-99", risk=5), 5) + assert "/approve rx-99" in msg + assert "/reject rx-99" in msg + assert "risk 5/5" in msg + assert "model_change_same_tier" in msg