Skip to content
69 changes: 69 additions & 0 deletions orchestrator/alembic/versions/prd142_wave0_error_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""PRD-142 Wave 0 US-001: error_events queryable sink

Append-only table backing the dashboard's "error rate by subsystem" tile.
Mirrors the PRD-008-A widget_event_log pattern: single table, JSONB
payload, two indexes that match the dashboard rollup queries.

Online-safe: creates a brand-new table only. No backfill, no NOT NULL
added to an existing large table, no data migration.

Standalone migration (down_revision = None) — the orchestrator alembic
config has many heads and this matches the established convention for
add-a-table changes (see add_job_title_to_agents.py).

Revision ID: prd142_wave0_error_events
Create Date: 2026-05-29
"""
from __future__ import annotations

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID as PGUUID


revision = "prd142_wave0_error_events"
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"error_events",
sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True),
sa.Column("subsystem", sa.String(64), nullable=False),
sa.Column("operation", sa.String(128), nullable=False),
sa.Column("error_type", sa.String(128), nullable=True),
sa.Column("error_message", sa.String(500), nullable=True),
sa.Column("workspace_id", PGUUID(as_uuid=True), nullable=True),
sa.Column("agent_id", sa.Integer, nullable=True),
sa.Column("action_name", sa.String(128), nullable=True),
sa.Column(
"event_data",
JSONB,
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column(
"created_at",
sa.DateTime,
nullable=False,
server_default=sa.func.now(),
),
)
op.create_index(
"idx_error_events_subsystem_created",
"error_events",
["subsystem", "created_at"],
)
op.create_index(
"idx_error_events_workspace_created",
"error_events",
["workspace_id", "created_at"],
)


def downgrade() -> None:
op.drop_index("idx_error_events_workspace_created", table_name="error_events")
op.drop_index("idx_error_events_subsystem_created", table_name="error_events")
op.drop_table("error_events")
205 changes: 205 additions & 0 deletions orchestrator/api/analytics_real.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
AgentStatistics, SystemMetrics
)
from core.models.core import LLMUsage
from core.models.error_event import ErrorEvent
from core.models.orchestration import OrchestrationRun, OrchestrationTask
from core.models.orchestration_enums import RunState, TaskState, TERMINAL_RUN_STATES
from core.models.sites import Site
from core.models.widget_event_log import WIDGET_EVENT_TYPES, WidgetEventLog
from core.models.workspaces import Workspace
import logging
import psutil
import time
Expand Down Expand Up @@ -108,6 +112,207 @@ async def get_agent_success_rate(ctx: RequestContext = Depends(get_request_conte
logger.error(f"Error calculating success rate: {e}")
return {"value": 0, "trend": 0, "total_executions": 0, "successful_executions": 0, "error": str(e)}

def _parse_window(window: str) -> timedelta:
"""Parse a window string like '24h' or '7d' into a timedelta.

Supports ``h`` (hours) and ``d`` (days). Falls back to 24h on malformed
input — dashboard queries should not 400 because a UI sent a stale param.
"""
try:
if not window:
return timedelta(hours=24)
unit = window[-1].lower()
value = int(window[:-1])
if value <= 0:
return timedelta(hours=24)
if unit == "h":
return timedelta(hours=value)
if unit == "d":
return timedelta(days=value)
except (ValueError, IndexError):
pass
return timedelta(hours=24)


@router.get("/errors/by-subsystem")
async def get_errors_by_subsystem(
window: str = "24h",
ctx: RequestContext = Depends(get_request_context_hybrid),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
"""Error count by subsystem over a rolling window (PRD-142 Wave 0 US-002).

Backs the dashboard "Error rate by subsystem" tile. Reads from the
``error_events`` sink populated by ``record_error`` (US-001). Filters
by the caller's workspace; system-level rows (``workspace_id IS NULL``)
are excluded from this workspace-scoped view by design — the
dashboard tile shows per-tenant errors only.

Index path: ``idx_error_events_subsystem_created`` /
``idx_error_events_workspace_created`` cover the ``(workspace_id,
created_at)`` filter + ``GROUP BY subsystem`` — no full-table scan.

Returns ``{window, total, by_subsystem: [{subsystem, count, rate}],
generated_at}``. ``rate = count / total`` over the window; 0 when
total is 0 (no divide-by-zero).
"""
window_start = datetime.utcnow() - _parse_window(window)

rows = (
db.query(
ErrorEvent.subsystem,
func.count(ErrorEvent.id).label("count"),
)
.filter(
ErrorEvent.workspace_id == ctx.workspace_id,
ErrorEvent.created_at >= window_start,
)
.group_by(ErrorEvent.subsystem)
.all()
)

total = int(sum(int(r.count or 0) for r in rows))
by_subsystem = [
{
"subsystem": r.subsystem,
"count": int(r.count or 0),
"rate": (int(r.count or 0) / total) if total > 0 else 0,
}
for r in rows
]

return {
"window": window,
"total": total,
"by_subsystem": by_subsystem,
"generated_at": datetime.utcnow().isoformat(),
}


@router.get("/widget-engagement")
async def get_widget_engagement(
window: str = "7d",
ctx: RequestContext = Depends(get_request_context_hybrid),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
"""Widget engagement counts by event_type + distinct sessions (PRD-142 Wave 0 US-004).

Backs the dashboard "Widget engagement" tile. Read-only aggregation
over ``widget_event_log`` (writer: ``modules/widgets/telemetry.py``;
schema: ``core/models/widget_event_log.py``). This endpoint does NOT
construct ``WidgetEventLog`` rows — telemetry's writer remains the
single source of truth.

Tenant isolation: ``widget_event_log`` has no ``workspace_id``
column, so we resolve the caller's ``sites`` first (one workspace,
many sites — PRD-008-A) and restrict the aggregation to that set.
A workspace with zero sites short-circuits to an empty payload.

Index path: the aggregation filters
``event_type IN WIDGET_EVENT_TYPES`` so
``idx_widget_event_log_type_created`` is eligible alongside the
``created_at >= cutoff`` window — no full-table scan.

Returns ``{window, by_event_type: [{event_type, count}], sessions,
generated_at}``.
"""
window_start = datetime.utcnow() - _parse_window(window)

site_rows = (
db.query(Site.id).filter(Site.workspace_id == ctx.workspace_id).all()
)
site_ids = [row[0] for row in site_rows]

if not site_ids:
return {
"window": window,
"by_event_type": [],
"sessions": 0,
"generated_at": datetime.utcnow().isoformat(),
}

agg_rows = (
db.query(
WidgetEventLog.event_type,
func.count(WidgetEventLog.id).label("count"),
)
.filter(
WidgetEventLog.site_id.in_(site_ids),
WidgetEventLog.event_type.in_(WIDGET_EVENT_TYPES),
WidgetEventLog.created_at >= window_start,
)
.group_by(WidgetEventLog.event_type)
.all()
)

by_event_type = [
{"event_type": r.event_type, "count": int(r.count or 0)}
for r in agg_rows
]

sessions = (
db.query(func.count(func.distinct(WidgetEventLog.session_id)))
.filter(
WidgetEventLog.site_id.in_(site_ids),
WidgetEventLog.event_type.in_(WIDGET_EVENT_TYPES),
WidgetEventLog.created_at >= window_start,
)
.scalar()
) or 0

return {
"window": window,
"by_event_type": by_event_type,
"sessions": int(sessions),
"generated_at": datetime.utcnow().isoformat(),
}


@router.get("/activation")
async def get_activation(
ctx: RequestContext = Depends(get_request_context_hybrid),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
"""Platform-wide activation rate (PRD-142 Wave 0 US-005).

Definition: a workspace is "activated" when it has >=1
``OrchestrationRun`` with ``state == RunState.COMPLETED.value`` —
i.e. at least one mission has reached the canonical terminal success
state (``core/models/orchestration_enums.py``). The activation rate
is ``activated_workspaces / provisioned_workspaces``.

This is the one Wave 0 tile that is intentionally NOT filtered to a
single ``workspace_id`` — it answers a platform-level founder
question ("are new workspaces reaching first value?"), not a tenant
question. The endpoint still requires authentication via
``get_request_context_hybrid`` to gate access to the aggregate.

Computed from ``OrchestrationRun`` only — NO new table, NO
``WorkflowExecution`` reads (Wave 0 scope; the ``WorkflowExecution``
drop is owned by Wave 3 per PLAYBOOK-ENGINE-DESIGN.md §4.2).

Returns ``{activated, total_workspaces, rate, generated_at}``;
``rate = activated / total_workspaces``, 0 when ``total_workspaces``
is 0 (no divide-by-zero, no fake fallback value).
"""
activated = (
db.query(func.count(func.distinct(OrchestrationRun.workspace_id)))
.filter(OrchestrationRun.state == RunState.COMPLETED.value)
.scalar()
) or 0

total_workspaces = db.query(func.count(Workspace.id)).scalar() or 0

rate = (activated / total_workspaces) if total_workspaces > 0 else 0

return {
"activated": int(activated),
"total_workspaces": int(total_workspaces),
"rate": rate,
"generated_at": datetime.utcnow().isoformat(),
}


@router.get("/dashboard/task-completion-time")
async def get_avg_task_completion_time(ctx: RequestContext = Depends(get_request_context_hybrid), db: Session = Depends(get_db)) -> Dict[str, Any]:
"""Get average task completion time (UNION: workflows + missions)"""
Expand Down
70 changes: 70 additions & 0 deletions orchestrator/core/models/error_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
ErrorEvent ORM model (PRD-142 Wave 0 US-001).

Append-only queryable sink for ``record_error`` records. The
``automatos.errors`` logger emit is the source of truth for *log* sinks;
this table is the source of truth for *dashboard* rollups — specifically
the "error rate by subsystem" tile defined in PRD-142 Wave 0.

Pattern follows PRD-008-A ``WidgetEventLog`` / PRD-139 ``ToolExecutionLog``:
single table, JSONB payload, fire-and-forget writer that never propagates
failures (see ``core/utils/exception_telemetry.py``).
"""

from __future__ import annotations

from sqlalchemy import BigInteger, Column, DateTime, Index, Integer, String
from sqlalchemy.dialects.postgresql import JSONB, UUID as PGUUID
from sqlalchemy.sql import func

from core.database.base import Base


class ErrorEvent(Base):
__tablename__ = "error_events"
__table_args__ = (
Index("idx_error_events_subsystem_created", "subsystem", "created_at"),
Index("idx_error_events_workspace_created", "workspace_id", "created_at"),
{"extend_existing": True},
)

id = Column(BigInteger, primary_key=True, autoincrement=True)

# Coarse origin of the failure (e.g. "memory", "tools", "harness").
# Dashboard groups by this column.
subsystem = Column(String(64), nullable=False)

# The specific operation that failed (e.g. "add_memory").
operation = Column(String(128), nullable=False)

# Python exception class name. Nullable because record_error tolerates
# arbitrary objects in `error` and we never want a sink write to be the
# thing that fails persistence.
error_type = Column(String(128), nullable=True)

# Truncated exception message — VARCHAR(500) to bound storage and
# protect downstream JSON parsers from pathological strings.
error_message = Column(String(500), nullable=True)

# Owning workspace, if known. NULL for system-level errors raised
# before workspace context is established (startup, cron, etc.).
workspace_id = Column(PGUUID(as_uuid=True), nullable=True)

# Owning agent, if known.
agent_id = Column(Integer, nullable=True)

# Platform action involved, if any.
action_name = Column(String(128), nullable=True)

# Caller-supplied extras (correlation IDs, tool/plumbing context, ...).
# Keep small (<2KB typical). Schema is per-subsystem and intentionally
# loose so this table can outlive any one PRD's metadata choices.
event_data = Column(JSONB, nullable=False, server_default="{}")

created_at = Column(DateTime, server_default=func.now(), nullable=False)

def __repr__(self) -> str:
return (
f"<ErrorEvent id={self.id} subsystem={self.subsystem!r} "
f"operation={self.operation!r} type={self.error_type!r}>"
)
Loading
Loading