Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,13 @@ def COORDINATOR_CONSISTENCY_CHECK(self) -> bool:
TOOL_ROUTING_GRAPH_AGENT_SAMPLE_FLOOR: int = int(os.getenv("TOOL_ROUTING_GRAPH_AGENT_SAMPLE_FLOOR", "50"))
EDGE_BUILDER_HOUR_UTC: int = int(os.getenv("EDGE_BUILDER_HOUR_UTC", "3"))
EDGE_BUILDER_WINDOW_DAYS: int = int(os.getenv("EDGE_BUILDER_WINDOW_DAYS", "30"))
# PRD-141 US-019: batched incremental tool-execution signal recorder.
# Opt-in (default off). Drains an in-process queue with ONE DB session per
# flush — never a DB session or task per tool call.
TOOL_SIGNAL_RECORDER_ENABLED: bool = os.getenv("TOOL_SIGNAL_RECORDER_ENABLED", "false").lower() == "true"
TOOL_SIGNAL_FLUSH_BATCH_SIZE: int = int(os.getenv("TOOL_SIGNAL_FLUSH_BATCH_SIZE", "50"))
TOOL_SIGNAL_FLUSH_INTERVAL_SECONDS: float = float(os.getenv("TOOL_SIGNAL_FLUSH_INTERVAL_SECONDS", "5.0"))
TOOL_SIGNAL_QUEUE_MAXSIZE: int = int(os.getenv("TOOL_SIGNAL_QUEUE_MAXSIZE", "10000"))

# =============================================================================
# AWS S3 VECTORS (PRD-42: Cloud Document Sync)
Expand Down
175 changes: 148 additions & 27 deletions orchestrator/core/services/edge_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class EdgeBuildSummary:
"""Summary returned after an edge-build run."""

edges_built: int = 0
failed_edges_built: int = 0
affinities_built: int = 0
intent_clusters: int = 0
logs_processed: int = 0
Expand Down Expand Up @@ -91,6 +92,13 @@ async def build_edges(window: timedelta = timedelta(days=30)) -> EdgeBuildSummar
edge_data = _compute_used_after_edges(logs)
summary.edges_built = _upsert_edges(db, edge_data)

# 2b. Compute failed_after edges (PRD-141 US-018): A succeeded then a
# tool within 2 steps errored. Same table, distinct edge_type;
# GraphRouter._query_edges only follows used_after, so these are
# recorded for analysis / de-ranking but never expanded into chains.
failed_data = _compute_failed_after_edges(logs)
summary.failed_edges_built = _upsert_failed_after_edges(db, failed_data)

# 3. Compute intent clusters from query embeddings
cluster_map = await _compute_and_upsert_clusters(db, logs)
summary.intent_clusters = len(cluster_map)
Expand All @@ -103,7 +111,8 @@ async def build_edges(window: timedelta = timedelta(days=30)) -> EdgeBuildSummar
summary.duration_ms = elapsed_ms

logger.info(
f"EdgeBuilder: built {summary.edges_built} edges, "
f"EdgeBuilder: built {summary.edges_built} used_after edges, "
f"{summary.failed_edges_built} failed_after edges, "
f"{summary.affinities_built} affinities across "
f"{summary.intent_clusters} intent clusters"
)
Expand Down Expand Up @@ -199,6 +208,67 @@ def _compute_used_after_edges(
return dict(edge_counts)


def _compute_failed_after_edges(
logs: List[Dict[str, Any]],
) -> Dict[Tuple[str, str, Optional[str], Optional[int]], Tuple[int, int]]:
"""Compute failed_after(A, B) edges.

Within a session, when tool A SUCCEEDS and a later tool B (within the next
2 steps) ERRORS, that is evidence the A->B transition is risky. We track
BOTH the failure count and the total number of (A-succeeded, B-within-2)
co-occurrences, so the edge confidence can be the Wilson lower bound of the
failure RATE (failed / total) rather than a raw count -- a pair that fails
3-of-100 times should not look as dangerous as one that fails 30-of-40.

Same grouping/windowing as used_after. failed_after edges are written to the
same table under a distinct edge_type; GraphRouter._query_edges only follows
'used_after', so these never become recommended chains.

Returns:
Dict mapping (from_action, to_action, workspace_id, agent_id)
-> (failed_count, total_count), only for pairs with >=1 failure.
"""
sessions: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for log in logs:
key = _derive_session_key(log)
sessions[key].append(log)

failed_counts: Dict[Tuple[str, str, Optional[str], Optional[int]], int] = defaultdict(int)
total_counts: Dict[Tuple[str, str, Optional[str], Optional[int]], int] = defaultdict(int)

for session_key, session_logs in sessions.items():
if session_key.startswith("agent:"):
windows = _split_by_time_window(session_logs, window_seconds=300)
else:
windows = [session_logs]

for window_logs in windows:
for i in range(len(window_logs)):
a = window_logs[i]
if a.get("status") != "success":
continue # A must have succeeded to originate a failed_after edge
# Look ahead up to 2 steps within the same window
for j in range(i + 1, min(i + 3, len(window_logs))):
b = window_logs[j]
if a["action_name"] == b["action_name"]:
continue # Skip self-edges (chains never loop)
edge_key = (
a["action_name"],
b["action_name"],
a.get("workspace_id"),
a.get("agent_id"),
)
total_counts[edge_key] += 1
if b.get("status") != "success":
failed_counts[edge_key] += 1

return {
key: (failed_counts[key], total)
for key, total in total_counts.items()
if failed_counts.get(key, 0) > 0
}


def _split_by_time_window(
logs: List[Dict[str, Any]], window_seconds: int = 300
) -> List[List[Dict[str, Any]]]:
Expand All @@ -216,11 +286,55 @@ def _split_by_time_window(
return windows


def _upsert_edge_row(
db: Session,
from_action: str,
to_action: str,
edge_type: str,
workspace_id: Optional[str],
agent_id: Optional[int],
weight: float,
confidence: float,
sample_count: int,
now: datetime,
) -> None:
"""Upsert a single routing edge (any edge_type) using ON CONFLICT UPDATE.

The unique key uq_tre_full_key includes edge_type, so used_after and
failed_after rows for the same (from, to, scope) coexist without clobbering.
"""
stmt = text("""
INSERT INTO tool_routing_edges
(from_action, to_action, edge_type, workspace_id, agent_id,
weight, confidence, sample_count, last_updated)
VALUES
(:from_action, :to_action, :edge_type, :workspace_id, :agent_id,
:weight, :confidence, :sample_count, :last_updated)
ON CONFLICT ON CONSTRAINT uq_tre_full_key
DO UPDATE SET
weight = :weight,
confidence = :confidence,
sample_count = :sample_count,
last_updated = :last_updated
""")
db.execute(stmt, {
"from_action": from_action,
"to_action": to_action,
"edge_type": edge_type,
"workspace_id": workspace_id,
"agent_id": agent_id,
"weight": weight,
"confidence": confidence,
"sample_count": sample_count,
"last_updated": now,
})


def _upsert_edges(
db: Session,
edge_data: Dict[Tuple[str, str, Optional[str], Optional[int]], int],
) -> int:
"""Upsert edges into tool_routing_edges using ON CONFLICT UPDATE."""
"""Upsert used_after edges into tool_routing_edges."""
count = 0
now = datetime.utcnow()

Expand All @@ -230,32 +344,39 @@ def _upsert_edges(

weight = float(sample_count)
confidence = wilson_lower_bound(sample_count, sample_count)
_upsert_edge_row(
db, from_action, to_action, "used_after",
workspace_id, agent_id, weight, confidence, sample_count, now,
)
count += 1

# Use raw SQL for ON CONFLICT upsert
stmt = text("""
INSERT INTO tool_routing_edges
(from_action, to_action, edge_type, workspace_id, agent_id,
weight, confidence, sample_count, last_updated)
VALUES
(:from_action, :to_action, 'used_after', :workspace_id, :agent_id,
:weight, :confidence, :sample_count, :last_updated)
ON CONFLICT ON CONSTRAINT uq_tre_full_key
DO UPDATE SET
weight = :weight,
confidence = :confidence,
sample_count = :sample_count,
last_updated = :last_updated
""")
db.execute(stmt, {
"from_action": from_action,
"to_action": to_action,
"workspace_id": workspace_id,
"agent_id": agent_id,
"weight": weight,
"confidence": confidence,
"sample_count": sample_count,
"last_updated": now,
})
db.flush()
return count


def _upsert_failed_after_edges(
db: Session,
failed_data: Dict[Tuple[str, str, Optional[str], Optional[int]], Tuple[int, int]],
) -> int:
"""Upsert failed_after edges into tool_routing_edges.

weight = failure count; sample_count = total co-occurrences; confidence =
Wilson lower bound of the failure rate (failed / total). The _SAMPLE_FLOOR
is applied to the total co-occurrences, matching used_after's floor.
"""
count = 0
now = datetime.utcnow()

for (from_action, to_action, workspace_id, agent_id), (failed, total) in failed_data.items():
if total < _SAMPLE_FLOOR:
continue

weight = float(failed)
confidence = wilson_lower_bound(failed, total)
_upsert_edge_row(
db, from_action, to_action, "failed_after",
workspace_id, agent_id, weight, confidence, total, now,
)
count += 1

db.flush()
Expand Down
42 changes: 27 additions & 15 deletions orchestrator/modules/tools/discovery/graph_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _expand_with_graph(
all_action_names = set(entry_action_names)
for edge in edges:
all_action_names.add(edge["to_action"])
affinities = self._query_affinities(
positive_boosts, negative_penalties = self._query_affinities(
db, list(all_action_names),
agent_id if use_agent_scope else None,
)
Expand All @@ -204,12 +204,15 @@ def _expand_with_graph(
cosine = cosine_by_name.get(from_action, 0.0)
edge_confidence = edge["confidence"]

# Affinity boosts for the chain actions
# Affinity boosts (succeeds/prefers) lift the chain; negative
# penalties (fails_for_intent) lower it — PRD-141 US-017.
boost = 0.0
penalty = 0.0
for action in (from_action, to_action):
boost += affinities.get(action, 0.0)
boost += positive_boosts.get(action, 0.0)
penalty += negative_penalties.get(action, 0.0)

score = cosine * edge_confidence + boost
score = cosine * edge_confidence + boost - penalty
chains.append((from_action, score, [from_action, to_action]))
expanded += 1

Expand Down Expand Up @@ -288,13 +291,23 @@ def _query_affinities(
db,
action_names: List[str],
agent_id: Optional[int],
) -> dict:
"""Query tool_routing_affinities and return action_name -> total boost."""
) -> Tuple[dict, dict]:
"""Query tool_routing_affinities, returning (positive_boosts, negative_penalties).

PRD-141 US-017: positive and negative signals are kept in separate dicts
rather than netted into one, so the caller can apply them explicitly as
``score = cosine * edge_confidence + boost - penalty``.

* ``succeeds_for_intent`` / ``agent_prefers`` -> positive_boosts[action]
+= weight*confidence.
* ``fails_for_intent`` -> negative_penalties[action] += weight*confidence,
recorded as a POSITIVE magnitude (the caller subtracts it).
"""
from sqlalchemy import and_, or_
from core.models.tool_routing import ToolRoutingAffinity

if not action_names:
return {}
return {}, {}

filters = [
ToolRoutingAffinity.action_name.in_(action_names),
Expand All @@ -316,17 +329,16 @@ def _query_affinities(
.all()
)

boosts: dict = {}
positive_boosts: dict = {}
negative_penalties: dict = {}
for r in rows:
# succeeds_for_intent adds positive boost; fails subtracts
if r.affinity_type == "succeeds_for_intent":
boosts[r.action_name] = boosts.get(r.action_name, 0.0) + r.weight * r.confidence
magnitude = r.weight * r.confidence
if r.affinity_type in ("succeeds_for_intent", "agent_prefers"):
positive_boosts[r.action_name] = positive_boosts.get(r.action_name, 0.0) + magnitude
elif r.affinity_type == "fails_for_intent":
boosts[r.action_name] = boosts.get(r.action_name, 0.0) - r.weight * r.confidence
elif r.affinity_type == "agent_prefers":
boosts[r.action_name] = boosts.get(r.action_name, 0.0) + r.weight * r.confidence
negative_penalties[r.action_name] = negative_penalties.get(r.action_name, 0.0) + magnitude

return boosts
return positive_boosts, negative_penalties

# ------------------------------------------------------------------
# Helpers
Expand Down
Loading
Loading