From b1fce8b77cd6928dfc72f1f2509a7075247b7601 Mon Sep 17 00:00:00 2001 From: Gerard Kavanagh Date: Thu, 28 May 2026 23:29:47 +0100 Subject: [PATCH 1/3] feat(routing): GraphRouter penalizes negative affinities (PRD-141 US-017) Split _query_affinities() to return explicit (positive_boosts, negative_penalties) dicts instead of a single netted value: succeeds_for_intent and agent_prefers feed positive_boosts; fails_for_intent feeds negative_penalties as a positive magnitude. _expand_with_graph now scores edge chains as cosine*edge_confidence + boost - penalty, so a tool that historically fails for an intent ranks lower. Behaviour was previously netted implicitly; this makes the negative signal explicit and independently testable. --- .../modules/tools/discovery/graph_router.py | 42 ++-- .../tests/test_graph_router_negative.py | 208 ++++++++++++++++++ 2 files changed, 235 insertions(+), 15 deletions(-) create mode 100644 orchestrator/tests/test_graph_router_negative.py diff --git a/orchestrator/modules/tools/discovery/graph_router.py b/orchestrator/modules/tools/discovery/graph_router.py index 155a6bf09..39ac5068a 100644 --- a/orchestrator/modules/tools/discovery/graph_router.py +++ b/orchestrator/modules/tools/discovery/graph_router.py @@ -188,7 +188,7 @@ def _expand_with_graph( all_action_names = set(entry_action_names) for edge in edges: all_action_names.add(edge["to_action"]) - affinities = self._query_affinities( + positive_boosts, negative_penalties = self._query_affinities( db, list(all_action_names), agent_id if use_agent_scope else None, ) @@ -204,12 +204,15 @@ def _expand_with_graph( cosine = cosine_by_name.get(from_action, 0.0) edge_confidence = edge["confidence"] - # Affinity boosts for the chain actions + # Affinity boosts (succeeds/prefers) lift the chain; negative + # penalties (fails_for_intent) lower it — PRD-141 US-017. boost = 0.0 + penalty = 0.0 for action in (from_action, to_action): - boost += affinities.get(action, 0.0) + boost += positive_boosts.get(action, 0.0) + penalty += negative_penalties.get(action, 0.0) - score = cosine * edge_confidence + boost + score = cosine * edge_confidence + boost - penalty chains.append((from_action, score, [from_action, to_action])) expanded += 1 @@ -288,13 +291,23 @@ def _query_affinities( db, action_names: List[str], agent_id: Optional[int], - ) -> dict: - """Query tool_routing_affinities and return action_name -> total boost.""" + ) -> Tuple[dict, dict]: + """Query tool_routing_affinities, returning (positive_boosts, negative_penalties). + + PRD-141 US-017: positive and negative signals are kept in separate dicts + rather than netted into one, so the caller can apply them explicitly as + ``score = cosine * edge_confidence + boost - penalty``. + + * ``succeeds_for_intent`` / ``agent_prefers`` -> positive_boosts[action] + += weight*confidence. + * ``fails_for_intent`` -> negative_penalties[action] += weight*confidence, + recorded as a POSITIVE magnitude (the caller subtracts it). + """ from sqlalchemy import and_, or_ from core.models.tool_routing import ToolRoutingAffinity if not action_names: - return {} + return {}, {} filters = [ ToolRoutingAffinity.action_name.in_(action_names), @@ -316,17 +329,16 @@ def _query_affinities( .all() ) - boosts: dict = {} + positive_boosts: dict = {} + negative_penalties: dict = {} for r in rows: - # succeeds_for_intent adds positive boost; fails subtracts - if r.affinity_type == "succeeds_for_intent": - boosts[r.action_name] = boosts.get(r.action_name, 0.0) + r.weight * r.confidence + magnitude = r.weight * r.confidence + if r.affinity_type in ("succeeds_for_intent", "agent_prefers"): + positive_boosts[r.action_name] = positive_boosts.get(r.action_name, 0.0) + magnitude elif r.affinity_type == "fails_for_intent": - boosts[r.action_name] = boosts.get(r.action_name, 0.0) - r.weight * r.confidence - elif r.affinity_type == "agent_prefers": - boosts[r.action_name] = boosts.get(r.action_name, 0.0) + r.weight * r.confidence + negative_penalties[r.action_name] = negative_penalties.get(r.action_name, 0.0) + magnitude - return boosts + return positive_boosts, negative_penalties # ------------------------------------------------------------------ # Helpers diff --git a/orchestrator/tests/test_graph_router_negative.py b/orchestrator/tests/test_graph_router_negative.py new file mode 100644 index 000000000..62f767f17 --- /dev/null +++ b/orchestrator/tests/test_graph_router_negative.py @@ -0,0 +1,208 @@ +""" +PRD-141 US-017: GraphRouter penalizes negative affinities. +========================================================== + +``_query_affinities`` now returns an explicit ``(positive_boosts, +negative_penalties)`` pair instead of a single netted dict: + + * ``succeeds_for_intent`` / ``agent_prefers`` -> positive_boosts[action] += weight*confidence + * ``fails_for_intent`` -> negative_penalties[action] += weight*confidence (stored as a POSITIVE magnitude) + +``_expand_with_graph`` then scores each edge-expanded chain as +``score = cosine * edge_confidence + boost - penalty`` so a tool that +historically fails for an intent ranks lower. + +``modules.tools.__init__`` eagerly imports the DB-backed executor chain, so we +leaf-load ``graph_router.py`` under a synthetic package and inject a fake +``action_semantic_index`` (module-top import) plus a fake +``core.database.database`` (lazy import inside ``_expand_with_graph``) into +``sys.modules``. ``core.models.tool_routing`` imports cleanly (only needs the +declarative Base), so ``_query_affinities`` runs against the REAL model with a +fake DB session. +""" +import importlib.util +import sys +import types +from contextlib import contextmanager +from pathlib import Path +from types import SimpleNamespace + +import pytest + +_orchestrator_root = Path(__file__).resolve().parent.parent +if str(_orchestrator_root) not in sys.path: + sys.path.insert(0, str(_orchestrator_root)) + +_discovery_dir = _orchestrator_root / "modules" / "tools" / "discovery" +_PKG = "_us017_graph" + + +def _load_graph_router(): + if _PKG not in sys.modules: + pkg = types.ModuleType(_PKG) + pkg.__path__ = [str(_discovery_dir)] + sys.modules[_PKG] = pkg + + # Fake the module-top `from .action_semantic_index import get_action_semantic_index` + # so loading graph_router doesn't pull numpy + the registry/embedding stack. + asi_name = f"{_PKG}.action_semantic_index" + if asi_name not in sys.modules: + fake_asi = types.ModuleType(asi_name) + fake_asi.get_action_semantic_index = lambda: SimpleNamespace( + rank_actions=lambda *a, **k: [] + ) + sys.modules[asi_name] = fake_asi + + full = f"{_PKG}.graph_router" + if full in sys.modules: + return sys.modules[full] + spec = importlib.util.spec_from_file_location(full, _discovery_dir / "graph_router.py") + module = importlib.util.module_from_spec(spec) + module.__package__ = _PKG + sys.modules[full] = module + spec.loader.exec_module(module) + return module + + +_graph_mod = _load_graph_router() +GraphRouter = _graph_mod.GraphRouter + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + +class _FakeQuery: + def __init__(self, rows): + self._rows = rows + + def filter(self, *a, **k): + return self + + def all(self): + return self._rows + + +class _FakeAffinityDB: + """Minimal stand-in: query(...).filter(...).all() yields the canned rows.""" + + def __init__(self, rows): + self._rows = rows + + def query(self, *a, **k): + return _FakeQuery(self._rows) + + +def _aff(action_name, affinity_type, weight, confidence): + return SimpleNamespace( + action_name=action_name, + affinity_type=affinity_type, + weight=weight, + confidence=confidence, + ) + + +@pytest.fixture +def router(monkeypatch): + """A GraphRouter with the lazy `core.database.database` import faked out.""" + fake_db_mod = types.ModuleType("core.database.database") + + @contextmanager + def _fake_session(): + yield object() + + fake_db_mod.get_db_session = _fake_session + monkeypatch.setitem(sys.modules, "core.database.database", fake_db_mod) + return GraphRouter() + + +# --------------------------------------------------------------------------- +# _query_affinities now returns (positive_boosts, negative_penalties) +# --------------------------------------------------------------------------- + +def test_query_affinities_splits_positive_and_negative(): + rows = [ + _aff("good", "succeeds_for_intent", weight=1.0, confidence=0.8), + _aff("pref", "agent_prefers", weight=0.5, confidence=0.6), + _aff("bad", "fails_for_intent", weight=1.0, confidence=0.5), + ] + positive, negative = GraphRouter._query_affinities( + _FakeAffinityDB(rows), ["good", "pref", "bad"], None + ) + + assert positive["good"] == pytest.approx(0.8) + assert positive["pref"] == pytest.approx(0.3) + # fails_for_intent is recorded as a POSITIVE magnitude in the penalties dict + assert negative["bad"] == pytest.approx(0.5) + # no cross-contamination between the two dicts + assert "bad" not in positive + assert "good" not in negative and "pref" not in negative + + +def test_negative_affinity_penalizes_score(router, monkeypatch): + """A fails_for_intent action subtracts its penalty from the chain score.""" + monkeypatch.setattr( + router, "_query_edges", + lambda db, names, conf, aid: [ + {"from_action": "bad_tool", "to_action": "next", "confidence": 1.0, + "weight": 1.0, "agent_id": None}, + ], + ) + monkeypatch.setattr( + router, "_query_affinities", + lambda db, names, aid: ({}, {"bad_tool": 0.5}), + ) + + chains = router._expand_with_graph([("bad_tool", 0.8)], agent_id=None) + + expanded = [c for c in chains if c[2] == ["bad_tool", "next"]] + assert len(expanded) == 1 + # 0.8 (cosine) * 1.0 (edge_conf) + 0 (boost) - 0.5 (penalty) = 0.3 + assert expanded[0][1] == pytest.approx(0.3) + # penalty demonstrably lowered the score below the un-penalized cosine*conf + assert expanded[0][1] < 0.8 + + +def test_negative_signals_reduce_ranking(router, monkeypatch): + """Of two equal-cosine chains, the one whose action fails ranks lower.""" + monkeypatch.setattr( + router, "_query_edges", + lambda db, names, conf, aid: [ + {"from_action": "tool_a", "to_action": "x", "confidence": 1.0, + "weight": 1.0, "agent_id": None}, + {"from_action": "tool_b", "to_action": "y", "confidence": 1.0, + "weight": 1.0, "agent_id": None}, + ], + ) + monkeypatch.setattr( + router, "_query_affinities", + lambda db, names, aid: ({}, {"tool_b": 0.5}), + ) + + chains = router._expand_with_graph([("tool_a", 0.9), ("tool_b", 0.9)], agent_id=None) + + score = {c[0]: c[1] for c in chains if len(c[2]) == 2} + assert score["tool_a"] == pytest.approx(0.9) # no penalty + assert score["tool_b"] == pytest.approx(0.4) # 0.9 - 0.5 + assert score["tool_a"] > score["tool_b"] + + +def test_positive_boost_still_applies(router, monkeypatch): + """The refactor preserves positive boosts (regression guard).""" + monkeypatch.setattr( + router, "_query_edges", + lambda db, names, conf, aid: [ + {"from_action": "tool_a", "to_action": "x", "confidence": 1.0, + "weight": 1.0, "agent_id": None}, + ], + ) + monkeypatch.setattr( + router, "_query_affinities", + lambda db, names, aid: ({"tool_a": 0.3}, {}), + ) + + chains = router._expand_with_graph([("tool_a", 0.5)], agent_id=None) + + expanded = [c for c in chains if c[2] == ["tool_a", "x"]] + # 0.5 * 1.0 + 0.3 - 0 = 0.8 + assert expanded[0][1] == pytest.approx(0.8) From 7ad4698352bab6c187a06d9fd17b0284cbe093fa Mon Sep 17 00:00:00 2001 From: Gerard Kavanagh Date: Thu, 28 May 2026 23:45:29 +0100 Subject: [PATCH 2/3] feat(routing): add failed_after edge type to batch edge builder (PRD-141 US-018) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Record risky tool transitions in the nightly edge build: when tool A succeeds and a tool B within the next 2 steps of the same session errors, emit a failed_after(A, B) edge. Tracks both the failure count and the total (A-succeeded, B-within-2) co-occurrences so confidence is the Wilson lower bound of the failure RATE (failed/total), reusing the existing wilson_lower_bound() rather than a raw count. - _compute_failed_after_edges(): same session grouping/windowing as used_after; A must have succeeded; self-edges skipped; returns {(from,to,ws,agent): (failed, total)} only for pairs with >=1 failure. - _upsert_failed_after_edges(): writes edge_type='failed_after' to the same tool_routing_edges table. uq_tre_full_key includes edge_type, so these coexist with used_after rows (no migration needed; edge_type is String(50)). - Extracted _upsert_edge_row() shared by used_after and failed_after upserts. - EdgeBuildSummary.failed_edges_built + build_edges() wiring. GraphRouter._query_edges only follows edge_type=='used_after', so failed_after edges are recorded for analysis/de-ranking but NEVER expanded into chains; test_failed_after_edge_not_expanded guards this at the DB-filter layer. Also removes a dead, harmful MagicMock of core.database.base from test_prd139_edge_builder.py: it built Mock-based ToolRoutingEdge/Affinity classes that corrupted sibling tests sharing the process (the real cause of the only cross-file failures). The mock never helped — that file already requires DB creds to collect, and real Base imports cleanly. Tests: 86 passed across the routing/graph unit corpus (test_prd139_edge_builder + test_graph_router{,_negative} + us014/us015 + tool_routing_models). py_compile OK. --- orchestrator/core/services/edge_builder.py | 175 +++++++++++++++--- .../tests/test_graph_router_negative.py | 86 +++++++++ .../tests/test_prd139_edge_builder.py | 144 +++++++++++++- 3 files changed, 372 insertions(+), 33 deletions(-) diff --git a/orchestrator/core/services/edge_builder.py b/orchestrator/core/services/edge_builder.py index 9e6cabea0..f560b13a8 100644 --- a/orchestrator/core/services/edge_builder.py +++ b/orchestrator/core/services/edge_builder.py @@ -46,6 +46,7 @@ class EdgeBuildSummary: """Summary returned after an edge-build run.""" edges_built: int = 0 + failed_edges_built: int = 0 affinities_built: int = 0 intent_clusters: int = 0 logs_processed: int = 0 @@ -91,6 +92,13 @@ async def build_edges(window: timedelta = timedelta(days=30)) -> EdgeBuildSummar edge_data = _compute_used_after_edges(logs) summary.edges_built = _upsert_edges(db, edge_data) + # 2b. Compute failed_after edges (PRD-141 US-018): A succeeded then a + # tool within 2 steps errored. Same table, distinct edge_type; + # GraphRouter._query_edges only follows used_after, so these are + # recorded for analysis / de-ranking but never expanded into chains. + failed_data = _compute_failed_after_edges(logs) + summary.failed_edges_built = _upsert_failed_after_edges(db, failed_data) + # 3. Compute intent clusters from query embeddings cluster_map = await _compute_and_upsert_clusters(db, logs) summary.intent_clusters = len(cluster_map) @@ -103,7 +111,8 @@ async def build_edges(window: timedelta = timedelta(days=30)) -> EdgeBuildSummar summary.duration_ms = elapsed_ms logger.info( - f"EdgeBuilder: built {summary.edges_built} edges, " + f"EdgeBuilder: built {summary.edges_built} used_after edges, " + f"{summary.failed_edges_built} failed_after edges, " f"{summary.affinities_built} affinities across " f"{summary.intent_clusters} intent clusters" ) @@ -199,6 +208,67 @@ def _compute_used_after_edges( return dict(edge_counts) +def _compute_failed_after_edges( + logs: List[Dict[str, Any]], +) -> Dict[Tuple[str, str, Optional[str], Optional[int]], Tuple[int, int]]: + """Compute failed_after(A, B) edges. + + Within a session, when tool A SUCCEEDS and a later tool B (within the next + 2 steps) ERRORS, that is evidence the A->B transition is risky. We track + BOTH the failure count and the total number of (A-succeeded, B-within-2) + co-occurrences, so the edge confidence can be the Wilson lower bound of the + failure RATE (failed / total) rather than a raw count -- a pair that fails + 3-of-100 times should not look as dangerous as one that fails 30-of-40. + + Same grouping/windowing as used_after. failed_after edges are written to the + same table under a distinct edge_type; GraphRouter._query_edges only follows + 'used_after', so these never become recommended chains. + + Returns: + Dict mapping (from_action, to_action, workspace_id, agent_id) + -> (failed_count, total_count), only for pairs with >=1 failure. + """ + sessions: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for log in logs: + key = _derive_session_key(log) + sessions[key].append(log) + + failed_counts: Dict[Tuple[str, str, Optional[str], Optional[int]], int] = defaultdict(int) + total_counts: Dict[Tuple[str, str, Optional[str], Optional[int]], int] = defaultdict(int) + + for session_key, session_logs in sessions.items(): + if session_key.startswith("agent:"): + windows = _split_by_time_window(session_logs, window_seconds=300) + else: + windows = [session_logs] + + for window_logs in windows: + for i in range(len(window_logs)): + a = window_logs[i] + if a.get("status") != "success": + continue # A must have succeeded to originate a failed_after edge + # Look ahead up to 2 steps within the same window + for j in range(i + 1, min(i + 3, len(window_logs))): + b = window_logs[j] + if a["action_name"] == b["action_name"]: + continue # Skip self-edges (chains never loop) + edge_key = ( + a["action_name"], + b["action_name"], + a.get("workspace_id"), + a.get("agent_id"), + ) + total_counts[edge_key] += 1 + if b.get("status") != "success": + failed_counts[edge_key] += 1 + + return { + key: (failed_counts[key], total) + for key, total in total_counts.items() + if failed_counts.get(key, 0) > 0 + } + + def _split_by_time_window( logs: List[Dict[str, Any]], window_seconds: int = 300 ) -> List[List[Dict[str, Any]]]: @@ -216,11 +286,55 @@ def _split_by_time_window( return windows +def _upsert_edge_row( + db: Session, + from_action: str, + to_action: str, + edge_type: str, + workspace_id: Optional[str], + agent_id: Optional[int], + weight: float, + confidence: float, + sample_count: int, + now: datetime, +) -> None: + """Upsert a single routing edge (any edge_type) using ON CONFLICT UPDATE. + + The unique key uq_tre_full_key includes edge_type, so used_after and + failed_after rows for the same (from, to, scope) coexist without clobbering. + """ + stmt = text(""" + INSERT INTO tool_routing_edges + (from_action, to_action, edge_type, workspace_id, agent_id, + weight, confidence, sample_count, last_updated) + VALUES + (:from_action, :to_action, :edge_type, :workspace_id, :agent_id, + :weight, :confidence, :sample_count, :last_updated) + ON CONFLICT ON CONSTRAINT uq_tre_full_key + DO UPDATE SET + weight = :weight, + confidence = :confidence, + sample_count = :sample_count, + last_updated = :last_updated + """) + db.execute(stmt, { + "from_action": from_action, + "to_action": to_action, + "edge_type": edge_type, + "workspace_id": workspace_id, + "agent_id": agent_id, + "weight": weight, + "confidence": confidence, + "sample_count": sample_count, + "last_updated": now, + }) + + def _upsert_edges( db: Session, edge_data: Dict[Tuple[str, str, Optional[str], Optional[int]], int], ) -> int: - """Upsert edges into tool_routing_edges using ON CONFLICT UPDATE.""" + """Upsert used_after edges into tool_routing_edges.""" count = 0 now = datetime.utcnow() @@ -230,32 +344,39 @@ def _upsert_edges( weight = float(sample_count) confidence = wilson_lower_bound(sample_count, sample_count) + _upsert_edge_row( + db, from_action, to_action, "used_after", + workspace_id, agent_id, weight, confidence, sample_count, now, + ) + count += 1 - # Use raw SQL for ON CONFLICT upsert - stmt = text(""" - INSERT INTO tool_routing_edges - (from_action, to_action, edge_type, workspace_id, agent_id, - weight, confidence, sample_count, last_updated) - VALUES - (:from_action, :to_action, 'used_after', :workspace_id, :agent_id, - :weight, :confidence, :sample_count, :last_updated) - ON CONFLICT ON CONSTRAINT uq_tre_full_key - DO UPDATE SET - weight = :weight, - confidence = :confidence, - sample_count = :sample_count, - last_updated = :last_updated - """) - db.execute(stmt, { - "from_action": from_action, - "to_action": to_action, - "workspace_id": workspace_id, - "agent_id": agent_id, - "weight": weight, - "confidence": confidence, - "sample_count": sample_count, - "last_updated": now, - }) + db.flush() + return count + + +def _upsert_failed_after_edges( + db: Session, + failed_data: Dict[Tuple[str, str, Optional[str], Optional[int]], Tuple[int, int]], +) -> int: + """Upsert failed_after edges into tool_routing_edges. + + weight = failure count; sample_count = total co-occurrences; confidence = + Wilson lower bound of the failure rate (failed / total). The _SAMPLE_FLOOR + is applied to the total co-occurrences, matching used_after's floor. + """ + count = 0 + now = datetime.utcnow() + + for (from_action, to_action, workspace_id, agent_id), (failed, total) in failed_data.items(): + if total < _SAMPLE_FLOOR: + continue + + weight = float(failed) + confidence = wilson_lower_bound(failed, total) + _upsert_edge_row( + db, from_action, to_action, "failed_after", + workspace_id, agent_id, weight, confidence, total, now, + ) count += 1 db.flush() diff --git a/orchestrator/tests/test_graph_router_negative.py b/orchestrator/tests/test_graph_router_negative.py index 62f767f17..b5a0c2201 100644 --- a/orchestrator/tests/test_graph_router_negative.py +++ b/orchestrator/tests/test_graph_router_negative.py @@ -102,6 +102,69 @@ def _aff(action_name, affinity_type, weight, confidence): ) +def _extract_edge_type(filter_clause): + """Pull the `edge_type == X` literal out of the and_() expression that + GraphRouter._query_edges passes to db.query(...).filter(...). + + Walks the BooleanClauseList for the binary expression whose left column is + `edge_type` and returns its bound value, so the fake DB can honour the same + filter the real query would apply. + """ + for clause in getattr(filter_clause, "clauses", [filter_clause]): + left = getattr(clause, "left", None) + right = getattr(clause, "right", None) + if getattr(left, "key", None) == "edge_type" and right is not None: + return getattr(right, "value", None) + return None + + +class _FakeEdgeQuery: + """query(...).filter(...).order_by(...).limit(...).all() that respects the + edge_type filter built by _query_edges (so failed_after rows are excluded + exactly as the real SQL would exclude them).""" + + def __init__(self, rows): + self._rows = rows + self._edge_type = None + + def filter(self, *clauses): + for c in clauses: + et = _extract_edge_type(c) + if et is not None: + self._edge_type = et + return self + + def order_by(self, *a, **k): + return self + + def limit(self, *a, **k): + return self + + def all(self): + if self._edge_type is None: + return list(self._rows) + return [r for r in self._rows if r.edge_type == self._edge_type] + + +class _FakeEdgeDB: + def __init__(self, rows): + self._rows = rows + + def query(self, *a, **k): + return _FakeEdgeQuery(self._rows) + + +def _edge(from_action, to_action, edge_type, confidence=1.0, weight=1.0, agent_id=None): + return SimpleNamespace( + from_action=from_action, + to_action=to_action, + edge_type=edge_type, + confidence=confidence, + weight=weight, + agent_id=agent_id, + ) + + @pytest.fixture def router(monkeypatch): """A GraphRouter with the lazy `core.database.database` import faked out.""" @@ -206,3 +269,26 @@ def test_positive_boost_still_applies(router, monkeypatch): expanded = [c for c in chains if c[2] == ["tool_a", "x"]] # 0.5 * 1.0 + 0.3 - 0 = 0.8 assert expanded[0][1] == pytest.approx(0.8) + + +# --------------------------------------------------------------------------- +# PRD-141 US-018: failed_after edges are never expanded into chains +# --------------------------------------------------------------------------- + +def test_failed_after_edge_not_expanded(): + """_query_edges only ever requests edge_type == 'used_after', so a + failed_after edge sitting in the same table is filtered out at the DB layer + and can never become a recommended chain. + """ + rows = [ + _edge("good", "next", "used_after", confidence=1.0), + _edge("good", "bad", "failed_after", confidence=1.0), + ] + fake_db = _FakeEdgeDB(rows) + + edges = GraphRouter._query_edges(fake_db, ["good"], 0.6, None) + + to_actions = {e["to_action"] for e in edges} + assert "next" in to_actions # used_after IS followed + assert "bad" not in to_actions # failed_after is NOT followed + assert len(edges) == 1 diff --git a/orchestrator/tests/test_prd139_edge_builder.py b/orchestrator/tests/test_prd139_edge_builder.py index 8b2570eb0..16ada7077 100644 --- a/orchestrator/tests/test_prd139_edge_builder.py +++ b/orchestrator/tests/test_prd139_edge_builder.py @@ -16,7 +16,6 @@ from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional -from unittest.mock import MagicMock, patch, AsyncMock from uuid import uuid4 import numpy as np @@ -29,12 +28,13 @@ # --------------------------------------------------------------------------- -# Import modules under test (with mocks for DB/embedding dependencies) +# Import modules under test # --------------------------------------------------------------------------- - -# Mock the database and models before importing edge_builder -_mock_base = MagicMock() -sys.modules.setdefault("core.database.base", _mock_base) +# NOTE: do NOT mock core.database.base here. These tests exercise pure helpers +# but import the REAL ToolRoutingEdge/ToolRoutingAffinity models (via +# edge_builder), and a MagicMock Base would build Mock-based model classes that +# corrupt sibling tests sharing the same process (e.g. test_graph_router_negative +# uses the real models with a fake DB session). Real Base imports cleanly. # Import intent_clustering directly (pure numpy, no DB deps) @@ -48,6 +48,7 @@ from core.services.edge_builder import ( wilson_lower_bound, _compute_used_after_edges, + _compute_failed_after_edges, _derive_session_key, _split_by_time_window, _compute_affinities, @@ -93,6 +94,22 @@ def test_monotonic_in_successes(self): r3 = wilson_lower_bound(90, 100) assert r1 < r2 < r3 + def test_wilson_lower_bound_with_failures(self): + """PRD-141 US-018: failed_after confidence is the Wilson lower bound of + the FAILURE RATE (failures / co-occurrences), so failures < total. + + 3 failures out of 10 co-occurrences = point estimate 0.3; the + conservative lower bound sits below that, and more failures at the same + total raise the bound (so a consistently-failing transition outranks a + rarely-failing one when both are penalised). + """ + partial = wilson_lower_bound(3, 10) + assert 0.0 < partial < 0.3 # conservative: below the 0.3 point estimate + # Monotonic in the failure count at fixed total + assert wilson_lower_bound(2, 10) < wilson_lower_bound(8, 10) + # A single failure in many co-occurrences is barely-confident (near 0) + assert wilson_lower_bound(1, 50) < 0.1 + # --------------------------------------------------------------------------- # Test: Intent clustering @@ -282,6 +299,121 @@ def test_different_turns_separate(self): assert ("A", "B", "ws1", 1) not in edges +# --------------------------------------------------------------------------- +# Test: failed_after edge computation (PRD-141 US-018) +# --------------------------------------------------------------------------- + + +class TestFailedAfterEdges: + """failed_after(A, B): A SUCCEEDED and a tool B within the next 2 steps in + the same session ERRORED. Tracks (failed, total) co-occurrence so confidence + is the Wilson lower bound of the failure rate, not a raw count. + """ + + def _make_log(self, action: str, status: str, turn_id: str = "t1", + agent_id: int = 1, workspace_id: str = "ws1", + offset_seconds: int = 0) -> Dict[str, Any]: + return { + "id": offset_seconds, + "action_name": action, + "agent_id": agent_id, + "workspace_id": workspace_id, + "status": status, + "user_query": f"do {action}", + "turn_id": turn_id, + "conversation_id": None, + "executed_at": datetime(2026, 1, 1) + timedelta(seconds=offset_seconds), + } + + def test_basic_failed_after(self): + """A succeeds, B errors right after → (A,B) = (1 failed, 1 total).""" + logs = [ + self._make_log("A", "success", offset_seconds=0), + self._make_log("B", "error", offset_seconds=1), + ] + failed = _compute_failed_after_edges(logs) + assert failed[("A", "B", "ws1", 1)] == (1, 1) + + def test_requires_a_to_succeed(self): + """If A did not succeed, no failed_after edge originates from it.""" + logs = [ + self._make_log("A", "error", offset_seconds=0), + self._make_log("B", "error", offset_seconds=1), + ] + failed = _compute_failed_after_edges(logs) + assert ("A", "B", "ws1", 1) not in failed + + def test_within_two_steps(self): + """B two steps after a successful A still counts; the gap tool that + succeeded does not produce a failed edge.""" + logs = [ + self._make_log("A", "success", offset_seconds=0), + self._make_log("C", "success", offset_seconds=1), + self._make_log("B", "error", offset_seconds=2), + ] + failed = _compute_failed_after_edges(logs) + assert failed[("A", "B", "ws1", 1)] == (1, 1) # B is 2 steps after A + assert failed[("C", "B", "ws1", 1)] == (1, 1) # B is 1 step after C + assert ("A", "C", "ws1", 1) not in failed # C succeeded, no failure + + def test_beyond_two_steps_excluded(self): + """A failure 3+ steps after A is not attributed to A.""" + logs = [ + self._make_log("A", "success", offset_seconds=0), + self._make_log("X", "success", offset_seconds=1), + self._make_log("Y", "success", offset_seconds=2), + self._make_log("B", "error", offset_seconds=3), + ] + failed = _compute_failed_after_edges(logs) + assert ("A", "B", "ws1", 1) not in failed # B is 3 steps after A + assert failed[("Y", "B", "ws1", 1)] == (1, 1) # adjacent + assert failed[("X", "B", "ws1", 1)] == (1, 1) # 2 steps + + def test_failure_rate_accumulates(self): + """Repeated A→B pairs build a (failed, total) rate across sessions.""" + logs = [] + # 3 turns where B errors after A + for i in range(3): + logs.append(self._make_log("A", "success", turn_id=f"t{i}", offset_seconds=i * 10)) + logs.append(self._make_log("B", "error", turn_id=f"t{i}", offset_seconds=i * 10 + 1)) + # 7 turns where B succeeds after A + for i in range(3, 10): + logs.append(self._make_log("A", "success", turn_id=f"t{i}", offset_seconds=i * 10)) + logs.append(self._make_log("B", "success", turn_id=f"t{i}", offset_seconds=i * 10 + 1)) + + failed = _compute_failed_after_edges(logs) + # 3 failures out of 10 co-occurrences + assert failed[("A", "B", "ws1", 1)] == (3, 10) + + def test_self_edge_skipped(self): + """A→A is never an edge, even when the second A errors.""" + logs = [ + self._make_log("A", "success", offset_seconds=0), + self._make_log("A", "error", offset_seconds=1), + self._make_log("B", "error", offset_seconds=2), + ] + failed = _compute_failed_after_edges(logs) + assert ("A", "A", "ws1", 1) not in failed + # First A succeeded; B errors 2 steps later → (A,B) counted once + assert failed[("A", "B", "ws1", 1)] == (1, 1) + + def test_no_failures_returns_empty(self): + """All-success sessions yield no failed_after edges.""" + logs = [ + self._make_log("A", "success", offset_seconds=0), + self._make_log("B", "success", offset_seconds=1), + ] + assert _compute_failed_after_edges(logs) == {} + + def test_different_turns_isolated(self): + """A success and a B failure in different sessions are not linked.""" + logs = [ + self._make_log("A", "success", turn_id="t1", offset_seconds=0), + self._make_log("B", "error", turn_id="t2", offset_seconds=1), + ] + assert ("A", "B", "ws1", 1) not in _compute_failed_after_edges(logs) + + # --------------------------------------------------------------------------- # Test: Affinity computation # --------------------------------------------------------------------------- From 310c3985fbec865fb095c32ab90a1a7e4f70cf42 Mon Sep 17 00:00:00 2001 From: Gerard Kavanagh Date: Fri, 29 May 2026 00:06:41 +0100 Subject: [PATCH 3/3] feat(routing): batched incremental tool-execution signal recorder (PRD-141 US-019) Fold tool success/failure outcomes into the tool-routing graph in real time via an in-process asyncio.Queue drained by a single long-lived background task. record() is a non-blocking put_nowait on the hot path; DB access happens only inside the drain loop, ONE session per flushed batch. This replaces the original draft's per-call asyncio.ensure_future, which opened a new DB session on every tool call and exhausted the connection pool under load. - config: 4 opt-in settings (recorder off by default, batch size, flush interval, queue maxsize) - signal_recorder: aggregate batch -> used_after/failed_after edges + agent_prefers/fails_for_intent affinities. NULL-safe upsert (UPDATE ... IS NOT DISTINCT FROM -> INSERT-if-rowcount-0) because the unique constraints lack NULLS NOT DISTINCT and intent_cluster_id is always NULL. Recorder increments sample_count/weight on edges but never overwrites confidence; nightly edge_builder remains the authoritative Wilson recompute. - tool_router: non-blocking enqueue at both the success path and the except block; failures in the recorder can never break a tool call. 7 new tests (12 in file, 72 across the routing corpus) green. --- orchestrator/config.py | 7 + .../tools/discovery/signal_recorder.py | 389 ++++++++++++++++++ orchestrator/modules/tools/tool_router.py | 36 ++ .../tests/test_graph_router_negative.py | 237 +++++++++++ 4 files changed, 669 insertions(+) create mode 100644 orchestrator/modules/tools/discovery/signal_recorder.py diff --git a/orchestrator/config.py b/orchestrator/config.py index fc6b6cb01..56126445c 100644 --- a/orchestrator/config.py +++ b/orchestrator/config.py @@ -528,6 +528,13 @@ def COORDINATOR_CONSISTENCY_CHECK(self) -> bool: TOOL_ROUTING_GRAPH_AGENT_SAMPLE_FLOOR: int = int(os.getenv("TOOL_ROUTING_GRAPH_AGENT_SAMPLE_FLOOR", "50")) EDGE_BUILDER_HOUR_UTC: int = int(os.getenv("EDGE_BUILDER_HOUR_UTC", "3")) EDGE_BUILDER_WINDOW_DAYS: int = int(os.getenv("EDGE_BUILDER_WINDOW_DAYS", "30")) + # PRD-141 US-019: batched incremental tool-execution signal recorder. + # Opt-in (default off). Drains an in-process queue with ONE DB session per + # flush — never a DB session or task per tool call. + TOOL_SIGNAL_RECORDER_ENABLED: bool = os.getenv("TOOL_SIGNAL_RECORDER_ENABLED", "false").lower() == "true" + TOOL_SIGNAL_FLUSH_BATCH_SIZE: int = int(os.getenv("TOOL_SIGNAL_FLUSH_BATCH_SIZE", "50")) + TOOL_SIGNAL_FLUSH_INTERVAL_SECONDS: float = float(os.getenv("TOOL_SIGNAL_FLUSH_INTERVAL_SECONDS", "5.0")) + TOOL_SIGNAL_QUEUE_MAXSIZE: int = int(os.getenv("TOOL_SIGNAL_QUEUE_MAXSIZE", "10000")) # ============================================================================= # AWS S3 VECTORS (PRD-42: Cloud Document Sync) diff --git a/orchestrator/modules/tools/discovery/signal_recorder.py b/orchestrator/modules/tools/discovery/signal_recorder.py new file mode 100644 index 000000000..2eebc51bf --- /dev/null +++ b/orchestrator/modules/tools/discovery/signal_recorder.py @@ -0,0 +1,389 @@ +""" +ToolSignalRecorder (PRD-141 US-019) +==================================== + +Batched, incremental tool-execution signal recorder for the tool routing graph. + +When a tool runs (success or failure), the ToolRouter enqueues a lightweight +signal onto an in-process ``asyncio.Queue``. A SINGLE background drain task +batches these and applies incremental upserts to ``tool_routing_edges`` / +``tool_routing_affinities`` using exactly ONE DB session per flush. + +Why batched (NOT fire-and-forget) +---------------------------------- +The original PRD-141 draft opened a DB session via ``asyncio.ensure_future`` on +*every* tool call, which exhausts the connection pool under load — the exact +failure mode Phase 1 fixes. This recorder NEVER opens a DB session per call and +NEVER creates a task per call: the drain task is a process singleton spawned +once, and ``record()`` only does a non-blocking ``put_nowait``. + +Division of labour with the nightly edge_builder +------------------------------------------------- +* This recorder gives intra-day freshness. It ACCUMULATES evidence + (``sample_count``, and edge ``weight`` as a raw count) in real time and sets a + conservative PROVISIONAL confidence on brand-new rows. +* ``core/services/edge_builder.py`` is authoritative: nightly it RECOMPUTES + weight + Wilson confidence from ``tool_execution_logs`` and SETs absolute + values. So the recorder never overwrites a row's confidence on update (nightly + owns it), and never inflates an ``agent_prefers`` weight (a normalized + frequency the nightly edge_builder owns) — on update of an affinity it only bumps + ``sample_count``. + +Null-safe upsert +---------------- +``uq_tre_full_key`` / ``uq_tra_full_key`` include nullable columns +(``workspace_id``, ``agent_id``, ``intent_cluster_id``). Postgres treats NULLs +as distinct, so a plain ``ON CONFLICT`` would NOT match a row whose scope column +is NULL and would insert a duplicate. The recorder always writes a NULL +``intent_cluster_id`` for affinities, so it uses ``UPDATE ... WHERE col IS NOT +DISTINCT FROM :col`` (NULL-safe equality) and only ``INSERT``s when no row +matched — guaranteeing "increment, no duplicate rows". + +Leaf-loadable: module-top imports are stdlib-only; config / DB / wilson are +imported lazily inside methods so this module can be unit-tested under a +synthetic package without the DB-backed executor chain (matches graph_router.py). +""" +from __future__ import annotations + +import asyncio +import logging +import threading +from dataclasses import dataclass +from datetime import datetime +from typing import Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class ToolSignal: + """One tool-execution outcome to fold into the routing graph.""" + + action_name: str + success: bool + agent_id: Optional[int] = None + workspace_id: Optional[str] = None + prior_action: Optional[str] = None + + +def _wilson(successes: int, total: int) -> float: + """Provisional confidence for brand-new rows (nightly edge_builder recomputes). + + Reuses the canonical Wilson lower bound rather than reimplementing it. + """ + from core.services.edge_builder import wilson_lower_bound + + return wilson_lower_bound(successes, total) + + +class ToolSignalRecorder: + """Process-singleton batched recorder. See module docstring.""" + + def __init__(self) -> None: + self._queue: Optional[asyncio.Queue] = None + self._drain_task: Optional[asyncio.Task] = None + + # ------------------------------------------------------------------ + # Config accessors (lazy — keep this module leaf-loadable) + # ------------------------------------------------------------------ + + @staticmethod + def _enabled() -> bool: + try: + from config import config + + return bool(getattr(config, "TOOL_SIGNAL_RECORDER_ENABLED", False)) + except Exception: + return False + + @staticmethod + def _batch_size() -> int: + try: + from config import config + + return int(getattr(config, "TOOL_SIGNAL_FLUSH_BATCH_SIZE", 50)) + except Exception: + return 50 + + @staticmethod + def _interval_seconds() -> float: + try: + from config import config + + return float(getattr(config, "TOOL_SIGNAL_FLUSH_INTERVAL_SECONDS", 5.0)) + except Exception: + return 5.0 + + @staticmethod + def _queue_maxsize() -> int: + try: + from config import config + + return int(getattr(config, "TOOL_SIGNAL_QUEUE_MAXSIZE", 10000)) + except Exception: + return 10000 + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def record(self, signal: ToolSignal) -> None: + """NON-BLOCKING enqueue from the tool hot path. + + No DB, no per-call task. Drops the signal silently if the recorder is + disabled, if there is no running event loop, or if the bounded queue is + full — telemetry is best-effort and must never block or fail a tool call. + """ + if not self._enabled(): + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return # not on an event loop -> skip + self._ensure_started(loop) + try: + self._queue.put_nowait(signal) + except asyncio.QueueFull: + logger.debug( + "ToolSignalRecorder: queue full, dropping signal for %s", + signal.action_name, + ) + + def _ensure_started(self, loop: asyncio.AbstractEventLoop) -> None: + """Create the queue and the SINGLE drain task, once. + + The drain task is spawned exactly once per loop (guarded), NOT per tool + call — this is the whole point of the batched design. + """ + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._queue_maxsize()) + if self._drain_task is None or self._drain_task.done(): + self._drain_task = loop.create_task(self._drain_loop()) + + # ------------------------------------------------------------------ + # Background drain + # ------------------------------------------------------------------ + + async def _drain_loop(self) -> None: + while True: + try: + batch = await self._collect_batch() + if batch: + await self._flush(batch) + except asyncio.CancelledError: + raise + except Exception as e: # never let the drain loop die + self._record_flush_error(e) + await asyncio.sleep(1) + + async def _collect_batch(self) -> List[ToolSignal]: + """Block for the first signal, then drain up to batch_size or until + interval seconds elapse — whichever comes first.""" + batch_size = self._batch_size() + interval = self._interval_seconds() + + first = await self._queue.get() + batch: List[ToolSignal] = [first] + + loop = asyncio.get_running_loop() + deadline = loop.time() + interval + while len(batch) < batch_size: + remaining = deadline - loop.time() + if remaining <= 0: + break + try: + item = await asyncio.wait_for(self._queue.get(), timeout=remaining) + batch.append(item) + except asyncio.TimeoutError: + break + return batch + + # ------------------------------------------------------------------ + # Aggregation (pure) + flush (one session) + # ------------------------------------------------------------------ + + @staticmethod + def _aggregate( + batch: List[ToolSignal], + ) -> Tuple[Dict[tuple, int], Dict[tuple, int]]: + """Collapse a batch into incremental upsert counts (pure, no DB). + + Returns ``(edge_counts, affinity_counts)``: + edge_counts[(from_action, to_action, edge_type, agent_id, ws)] = inc + affinity_counts[(action_name, affinity_type, agent_id, ws)] = inc + + success -> used_after edge (prior->action) + agent_prefers affinity + failure -> failed_after edge (prior->action) + fails_for_intent affinity + + An edge is only produced when ``prior_action`` is present (an edge needs + two endpoints) and is not a self-transition (matches edge_builder); the + single-action affinity is always produced. + """ + edge_counts: Dict[tuple, int] = {} + aff_counts: Dict[tuple, int] = {} + for s in batch: + if s.success: + edge_type, affinity_type = "used_after", "agent_prefers" + else: + edge_type, affinity_type = "failed_after", "fails_for_intent" + + ak = (s.action_name, affinity_type, s.agent_id, s.workspace_id) + aff_counts[ak] = aff_counts.get(ak, 0) + 1 + + if s.prior_action and s.prior_action != s.action_name: + ek = (s.prior_action, s.action_name, edge_type, s.agent_id, s.workspace_id) + edge_counts[ek] = edge_counts.get(ek, 0) + 1 + + return edge_counts, aff_counts + + async def _flush(self, batch: List[ToolSignal]) -> None: + """Apply one batch with exactly ONE DB session.""" + edge_counts, aff_counts = self._aggregate(batch) + if not edge_counts and not aff_counts: + return + + now = datetime.utcnow() + try: + from core.database.database import get_db_session + + with get_db_session() as db: # exactly ONE session for the whole batch + for (from_action, to_action, edge_type, agent_id, ws), inc in edge_counts.items(): + self._upsert_edge(db, from_action, to_action, edge_type, ws, agent_id, inc, now) + for (action_name, affinity_type, agent_id, ws), inc in aff_counts.items(): + self._upsert_affinity(db, action_name, affinity_type, ws, agent_id, inc, now) + db.flush() + except Exception as e: + self._record_flush_error(e) + + # ------------------------------------------------------------------ + # Null-safe incremental upserts + # ------------------------------------------------------------------ + + @staticmethod + def _upsert_edge( + db, + from_action: str, + to_action: str, + edge_type: str, + workspace_id: Optional[str], + agent_id: Optional[int], + inc: int, + now: datetime, + ) -> None: + """Increment an existing edge or insert a new one. weight is a raw count + (matches edge_builder); confidence is left to the nightly recompute.""" + from sqlalchemy import text + + params = { + "from_action": from_action, + "to_action": to_action, + "edge_type": edge_type, + "workspace_id": workspace_id, + "agent_id": agent_id, + "inc": inc, + "now": now, + } + update_stmt = text(""" + UPDATE tool_routing_edges + SET sample_count = tool_routing_edges.sample_count + :inc, + weight = tool_routing_edges.weight + :inc, + last_updated = :now + WHERE from_action = :from_action + AND to_action = :to_action + AND edge_type = :edge_type + AND workspace_id IS NOT DISTINCT FROM :workspace_id + AND agent_id IS NOT DISTINCT FROM :agent_id + """) + result = db.execute(update_stmt, params) + if (getattr(result, "rowcount", 0) or 0) > 0: + return + + insert_stmt = text(""" + INSERT INTO tool_routing_edges + (from_action, to_action, edge_type, workspace_id, agent_id, + weight, confidence, sample_count, last_updated) + VALUES + (:from_action, :to_action, :edge_type, :workspace_id, :agent_id, + :inc, :confidence, :inc, :now) + """) + db.execute(insert_stmt, {**params, "confidence": _wilson(inc, inc)}) + + @staticmethod + def _upsert_affinity( + db, + action_name: str, + affinity_type: str, + workspace_id: Optional[str], + agent_id: Optional[int], + inc: int, + now: datetime, + ) -> None: + """Increment an existing affinity's sample_count or insert a new one. + + On update, weight + confidence are left untouched (the nightly edge_builder owns + them — agent_prefers is a normalized frequency that must not be inflated + by a raw real-time count). The recorder only ever writes a NULL + intent_cluster_id, so the match pins ``intent_cluster_id IS NULL``. + """ + from sqlalchemy import text + + params = { + "action_name": action_name, + "affinity_type": affinity_type, + "workspace_id": workspace_id, + "agent_id": agent_id, + "inc": inc, + "now": now, + } + update_stmt = text(""" + UPDATE tool_routing_affinities + SET sample_count = tool_routing_affinities.sample_count + :inc, + last_updated = :now + WHERE action_name = :action_name + AND affinity_type = :affinity_type + AND workspace_id IS NOT DISTINCT FROM :workspace_id + AND agent_id IS NOT DISTINCT FROM :agent_id + AND intent_cluster_id IS NULL + """) + result = db.execute(update_stmt, params) + if (getattr(result, "rowcount", 0) or 0) > 0: + return + + provisional = _wilson(inc, inc) + insert_stmt = text(""" + INSERT INTO tool_routing_affinities + (action_name, affinity_type, workspace_id, agent_id, + intent_cluster_id, weight, confidence, sample_count, last_updated) + VALUES + (:action_name, :affinity_type, :workspace_id, :agent_id, + NULL, :provisional, :provisional, :inc, :now) + """) + db.execute(insert_stmt, {**params, "provisional": provisional}) + + @staticmethod + def _record_flush_error(error: Exception) -> None: + try: + from core.utils.exception_telemetry import record_error + + record_error(subsystem="routing", operation="tool_signal_flush", error=error) + except Exception: + logger.warning("ToolSignalRecorder flush failed: %s", error) + + +# ====================================================================== +# Singleton factory +# ====================================================================== + +_instance_lock = threading.Lock() +_instance: Optional[ToolSignalRecorder] = None + + +def get_tool_signal_recorder() -> ToolSignalRecorder: + """Process-singleton factory (matches get_graph_router pattern).""" + global _instance + if _instance is not None: + return _instance + with _instance_lock: + if _instance is None: + _instance = ToolSignalRecorder() + return _instance diff --git a/orchestrator/modules/tools/tool_router.py b/orchestrator/modules/tools/tool_router.py index 53adcad81..34e14dcbc 100644 --- a/orchestrator/modules/tools/tool_router.py +++ b/orchestrator/modules/tools/tool_router.py @@ -29,6 +29,7 @@ from modules.tools.registry import ToolCategory, get_tool_registry as registry_get_tool_registry from modules.tools.execution import UnifiedToolExecutor from modules.tools.formatting.result_formatter import ToolResultFormatter +from modules.tools.discovery.signal_recorder import ToolSignal, get_tool_signal_recorder from core.database.database import SessionLocal # Capability-based filtering imports (PRD-37) @@ -608,6 +609,10 @@ async def execute_and_format( or bool(result.get("results")) ) + # PRD-141 US-019: fold this outcome into the routing graph via the + # batched recorder (non-blocking enqueue; no DB / no task per call). + self._record_tool_signal(tool_name, success, agent_id, workspace_id, caller_context) + if success: frontend_data = self.formatter.format_for_frontend(result, tool_name) llm_context = self.formatter.format_for_llm(result, tool_name) @@ -650,6 +655,8 @@ async def execute_and_format( else error_msg ) logger.error(f"[tool-trace {trace_id}] {tool_name} exception: {error_msg}") + # PRD-141 US-019: a thrown tool is a failure outcome too. + self._record_tool_signal(tool_name, False, agent_id, workspace_id, caller_context) return { "success": False, "frontend_data": {}, @@ -659,6 +666,35 @@ async def execute_and_format( "error_type": "dependency_missing" if fatal_error else None, } + @staticmethod + def _record_tool_signal( + tool_name: str, + success: bool, + agent_id: Optional[int], + workspace_id: Optional[UUID], + caller_context: Optional[Dict[str, Any]], + ) -> None: + """Enqueue a routing-graph signal. Best-effort: never raises into the + tool hot path. ``prior_action`` (the previous tool in the turn) is read + from caller_context when the caller threads it through.""" + try: + prior_action = ( + caller_context.get("prior_action") + if isinstance(caller_context, dict) + else None + ) + get_tool_signal_recorder().record( + ToolSignal( + action_name=tool_name, + success=bool(success), + agent_id=agent_id, + workspace_id=str(workspace_id) if workspace_id else None, + prior_action=prior_action, + ) + ) + except Exception: + pass # telemetry is best-effort; never break a tool call + def truncate_for_llm( self, result: Dict[str, Any], diff --git a/orchestrator/tests/test_graph_router_negative.py b/orchestrator/tests/test_graph_router_negative.py index b5a0c2201..76ad288fb 100644 --- a/orchestrator/tests/test_graph_router_negative.py +++ b/orchestrator/tests/test_graph_router_negative.py @@ -20,7 +20,9 @@ declarative Base), so ``_query_affinities`` runs against the REAL model with a fake DB session. """ +import asyncio import importlib.util +import math import sys import types from contextlib import contextmanager @@ -292,3 +294,238 @@ def test_failed_after_edge_not_expanded(): assert "next" in to_actions # used_after IS followed assert "bad" not in to_actions # failed_after is NOT followed assert len(edges) == 1 + + +# --------------------------------------------------------------------------- +# PRD-141 US-019: batched incremental tool-execution signal recorder +# --------------------------------------------------------------------------- +# +# signal_recorder.py is stdlib-only at module top, so we leaf-load it under the +# same synthetic package. _flush()/_upsert_*() lazily import +# core.database.database (the session) and core.services.edge_builder +# (wilson_lower_bound); both are injected as fakes per-test via monkeypatch so +# no DB creds are needed and there is no cross-file sys.modules pollution. + + +def _load_signal_recorder(): + full = f"{_PKG}.signal_recorder" + if full in sys.modules: + return sys.modules[full] + spec = importlib.util.spec_from_file_location( + full, _discovery_dir / "signal_recorder.py" + ) + module = importlib.util.module_from_spec(spec) + module.__package__ = _PKG + sys.modules[full] = module + spec.loader.exec_module(module) + return module + + +_signal_mod = _load_signal_recorder() +ToolSignalRecorder = _signal_mod.ToolSignalRecorder +ToolSignal = _signal_mod.ToolSignal + + +class _Result: + def __init__(self, rowcount: int): + self.rowcount = rowcount + + +class _CapturingDB: + """Records (sql, params) and returns a configurable rowcount for UPDATEs.""" + + def __init__(self, update_rowcount: int = 0): + self.executed = [] # list of (sql_text, params) + self._update_rowcount = update_rowcount + + def execute(self, stmt, params=None): + sql = str(stmt) + self.executed.append((sql, params or {})) + rc = self._update_rowcount if sql.strip().upper().startswith("UPDATE") else 1 + return _Result(rc) + + def flush(self): + pass + + +class _SessionFactory: + """Counts how many sessions are opened (must be exactly 1 per flush).""" + + def __init__(self, update_rowcount: int = 0): + self.db = _CapturingDB(update_rowcount=update_rowcount) + self.enter_count = 0 + + @contextmanager + def session(self): + self.enter_count += 1 + yield self.db + + +def _wilson_real(successes, total, z=1.96): + if total == 0: + return 0.0 + p = successes / total + denom = 1 + z**2 / total + centre = p + z**2 / (2 * total) + spread = z * math.sqrt((p * (1 - p) + z**2 / (4 * total)) / total) + return (centre - spread) / denom + + +def _recorder_with_fake_db(monkeypatch, update_rowcount: int = 0): + """A ToolSignalRecorder whose lazy core.* imports are faked.""" + factory = _SessionFactory(update_rowcount=update_rowcount) + + fake_db_mod = types.ModuleType("core.database.database") + fake_db_mod.get_db_session = factory.session + monkeypatch.setitem(sys.modules, "core.database.database", fake_db_mod) + + fake_eb = types.ModuleType("core.services.edge_builder") + fake_eb.wilson_lower_bound = _wilson_real + monkeypatch.setitem(sys.modules, "core.services.edge_builder", fake_eb) + + return ToolSignalRecorder(), factory + + +def _edge_stmts(db): + return [(s, p) for s, p in db.executed if "tool_routing_edges" in s] + + +def _aff_stmts(db): + return [(s, p) for s, p in db.executed if "tool_routing_affinities" in s] + + +def test_incremental_edge_update_success(monkeypatch): + """A success signal with a prior action -> used_after edge + agent_prefers + affinity. Fresh keys (no existing row) fall through to INSERT.""" + recorder, factory = _recorder_with_fake_db(monkeypatch, update_rowcount=0) + + asyncio.run( + recorder._flush( + [ToolSignal("b", True, agent_id=1, workspace_id="ws", prior_action="a")] + ) + ) + + edge_inserts = [(s, p) for s, p in _edge_stmts(factory.db) if "INSERT" in s.upper()] + assert len(edge_inserts) == 1 + _, ep = edge_inserts[0] + assert ep["from_action"] == "a" + assert ep["to_action"] == "b" + assert ep["edge_type"] == "used_after" + + aff_inserts = [(s, p) for s, p in _aff_stmts(factory.db) if "INSERT" in s.upper()] + assert len(aff_inserts) == 1 + _, ap = aff_inserts[0] + assert ap["action_name"] == "b" + assert ap["affinity_type"] == "agent_prefers" + + +def test_incremental_edge_update_failure(monkeypatch): + """A failure signal -> failed_after edge + fails_for_intent affinity.""" + recorder, factory = _recorder_with_fake_db(monkeypatch, update_rowcount=0) + + asyncio.run( + recorder._flush( + [ToolSignal("b", False, agent_id=1, workspace_id="ws", prior_action="a")] + ) + ) + + edge_inserts = [(s, p) for s, p in _edge_stmts(factory.db) if "INSERT" in s.upper()] + assert len(edge_inserts) == 1 + assert edge_inserts[0][1]["edge_type"] == "failed_after" + + aff_inserts = [(s, p) for s, p in _aff_stmts(factory.db) if "INSERT" in s.upper()] + assert len(aff_inserts) == 1 + assert aff_inserts[0][1]["affinity_type"] == "fails_for_intent" + + +def test_edge_upsert_increments_sample_count(monkeypatch): + """Repeated identical signals collapse to ONE upsert that INCREMENTS + sample_count (no duplicate rows). When the row already exists + (update_rowcount=1) only the UPDATE runs — never a second INSERT.""" + recorder, factory = _recorder_with_fake_db(monkeypatch, update_rowcount=1) + + asyncio.run( + recorder._flush( + [ToolSignal("b", True, agent_id=1, workspace_id="ws", prior_action="a") + for _ in range(3)] + ) + ) + + edge_stmts = _edge_stmts(factory.db) + updates = [(s, p) for s, p in edge_stmts if s.strip().upper().startswith("UPDATE")] + inserts = [(s, p) for s, p in edge_stmts if "INSERT" in s.upper()] + + # 3 dupes collapse to 1 update; row exists so NO insert (no duplicate edge) + assert len(updates) == 1 + assert len(inserts) == 0 + + sql, params = updates[0] + assert params["inc"] == 3 # aggregated count + assert "sample_count = tool_routing_edges.sample_count + :inc" in sql # increment + assert "IS NOT DISTINCT FROM" in sql # null-safe scope match + + +def test_flush_uses_single_session(monkeypatch): + """A mixed batch (multiple edges + affinities) opens exactly ONE DB session.""" + recorder, factory = _recorder_with_fake_db(monkeypatch, update_rowcount=0) + + batch = [ + ToolSignal("b", True, agent_id=1, workspace_id="ws", prior_action="a"), + ToolSignal("c", False, agent_id=1, workspace_id="ws", prior_action="b"), + ToolSignal("d", True, agent_id=2, workspace_id="ws2", prior_action="c"), + ] + asyncio.run(recorder._flush(batch)) + + assert factory.enter_count == 1 + + +def test_aggregate_collapses_and_skips_priorless_and_self_edges(): + """_aggregate is pure: dupes sum, missing/equal prior_action yields no edge + (affinity still emitted).""" + batch = [ + ToolSignal("b", True, agent_id=1, workspace_id="ws", prior_action="a"), + ToolSignal("b", True, agent_id=1, workspace_id="ws", prior_action="a"), + ToolSignal("x", True, agent_id=1, workspace_id="ws", prior_action=None), + ToolSignal("y", True, agent_id=1, workspace_id="ws", prior_action="y"), + ] + edges, affinities = ToolSignalRecorder._aggregate(batch) + + assert edges[("a", "b", "used_after", 1, "ws")] == 2 # dupes summed + # no edge for the prior-less signal or the self-transition + assert all(ek[1] != "x" for ek in edges) + assert ("y", "y", "used_after", 1, "ws") not in edges + # affinities are always produced (one per distinct action) + assert affinities[("b", "agent_prefers", 1, "ws")] == 2 + assert affinities[("x", "agent_prefers", 1, "ws")] == 1 + assert affinities[("y", "agent_prefers", 1, "ws")] == 1 + + +def test_record_is_noop_without_running_loop(monkeypatch): + """record() from a sync context (no event loop) must not raise and must not + create a queue/task — it silently drops.""" + monkeypatch.setitem( + sys.modules, "config", + types.ModuleType("config"), + ) + sys.modules["config"].config = SimpleNamespace(TOOL_SIGNAL_RECORDER_ENABLED=True) + + recorder = ToolSignalRecorder() + recorder.record(ToolSignal("b", True, agent_id=1, workspace_id="ws", prior_action="a")) + + assert recorder._queue is None + assert recorder._drain_task is None + + +def test_record_disabled_is_noop(monkeypatch): + """When the flag is off, record() does nothing even on an event loop.""" + monkeypatch.setitem(sys.modules, "config", types.ModuleType("config")) + sys.modules["config"].config = SimpleNamespace(TOOL_SIGNAL_RECORDER_ENABLED=False) + + recorder = ToolSignalRecorder() + + async def _drive(): + recorder.record(ToolSignal("b", True, agent_id=1)) + return recorder._queue, recorder._drain_task + + q, t = asyncio.run(_drive()) + assert q is None and t is None