diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 000000000..72b089595 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,635 @@ +# Orchestration Implementation Plan + +## Goal + +Land the full RFC vision of the Memoh orchestration layer. The current Postgres kernel is the foundation. The remaining work is to bring NATS event bus, blackboard, EnvSession runtime, resource accounting, and experience layer online so the system matches the architecture described in `RFC.md`. + +The orchestration layer should end up able to: + +- create durable runs from user goals +- plan/replan into DAGs through planner epochs +- dispatch task-scoped attempts with claim/lease fencing +- ingest worker and env facts through JetStream +- emit committed run events through a durable outbox +- coordinate task-scoped state through a reconstructable blackboard +- lease, snapshot, and recover external environment sessions +- account for resources through quota/ledger/holds +- promote verified outcomes into a long-term experience store +- present all of this through an operator-grade web UI + +This plan sequences that work in stages. Each stage is independently shippable and leaves the system in a correct state. + +## Current Repo Reality + +### Implemented Packages And Entrypoints + +- `internal/orchestration` + - core service + - run/task/attempt/checkpoint/event state machine + - planner intent processing + - scheduler/dispatch functions + - worker/verifier claim and lease fencing + - verification runtime + - executor contract interfaces + - service, integration, contract, and blackbox tests +- `internal/orchestrationexec` + - LLM planner + - LLM replanner + - LLM worker runtime + - LLM verifier runtime + - strict JSON decoders and prompt builders +- `cmd/workerd` + - always-on worker daemon + - claims ready attempts + - heartbeats worker/attempt leases + - submits completed/failed results +- `cmd/verifyd` + - always-on verifier daemon + - claims verification work + - heartbeats verifier leases + - submits verification decisions +- `internal/agent/tools/orchestration.go` + - user bot tool integration for starting/querying/cancelling orchestration runs +- `internal/handlers/orchestration.go` + - HTTP API used by frontend and tests +- `apps/web/src/pages/orchestration` + - orchestration page, DAG view, run selector, inspector, thinking/activity, outputs, checkpoints + +There is no `cmd/orchestrator` in this branch. The server process owns orchestration control-plane state, while `workerd` and `verifyd` are separate execution daemons. + +### Implemented Data Model + +Current durable tables cover: + +- `orchestration_runs` +- `orchestration_planning_intents` +- `orchestration_tasks` +- `orchestration_task_dependencies` +- `orchestration_task_attempts` +- `orchestration_task_results` +- `orchestration_input_manifests` +- `orchestration_events` +- `orchestration_artifacts` +- `orchestration_workers` +- `orchestration_task_verifications` +- `orchestration_human_checkpoints` +- `orchestration_projection_snapshots` +- `orchestration_idempotency_records` +- `orchestration_action_ledger` + +Current durable model does not yet include: + +- `orchestration_env_resources` +- `orchestration_env_sessions` +- `orchestration_env_bindings` +- `orchestration_env_snapshots` +- `orchestration_resource_quotas` +- `orchestration_resource_ledger` +- `orchestration_resource_holds` +- `orchestration_experience_records` + +## Confirmed Decisions + +- Postgres remains the only authoritative orchestration state source. NATS, blackboard, and artifact stores are delivery and projection layers, not truth. +- `workerd` and `verifyd` stay as always-on Docker Compose services through Stage 1. On-demand scheduling can be revisited after JetStream consumers are in place. +- NATS JetStream becomes the runtime delivery backbone in Stage 1. Existing Compose `nats` service is repurposed as the actual event bus. +- NATS KV blackboard becomes the runtime shared view in Stage 2. It must be reconstructable from Postgres at any time. +- EnvResource/EnvSession/EnvBinding/EnvSnapshot become first-class state in Stage 3, not ad-hoc fields on tasks. +- Resource quotas, ledger, and holds become authoritative in Stage 4 before any non-trivial multi-tenant capacity work. +- Experience records, promotion pipeline, and retrieval hooks become real in Stage 5. +- Frontend visibility for each stage's new state is part of that stage's done criteria, not a separate effort. + +## Current Runtime Model + +```mermaid +flowchart TB + Bot[User Bot] --> Tool[orchestration tool] + Tool --> API[HTTP handlers] + API --> Service[internal/orchestration Service] + Service --> PG[(Postgres)] + + Service --> Planner[orchestrationexec planner/replanner] + Workerd[cmd/workerd] --> Service + Workerd --> WorkerRuntime[orchestrationexec worker runtime] + Verifyd[cmd/verifyd] --> Service + Verifyd --> VerifierRuntime[orchestrationexec verifier runtime] + + Planner --> Agent[existing Agent runtime] + WorkerRuntime --> Agent + VerifierRuntime --> Agent + + Service -.committed events.-> Outbox[orchestrationoutbox dispatcher] + Outbox -.run.event.*.-> Bus[(NATS JetStream)] + Workerd -.attempt.fact.*.-> Bus + Verifyd -.verification.fact.*.-> Bus + Bus -.WatchRun subscribe.-> Service + Bus -.attempt.fact.* subscribe.-> Facts[orchestrationfacts consumer] + Facts -.validate.-> PG + + Web[Web orchestration UI] --> API +``` + +The dotted edges are the Stage 1 surface: Postgres still owns truth, but the outbox and the daemons publish through the bus so live consumers (`WatchRun`, future fact consumer) do not have to poll. + +## State Machines To Preserve + +### Run + +- `created` +- `running` +- `waiting_human` +- `cancelling` +- terminal: `completed`, `failed`, `cancelled` + +`planning_status`: + +- `idle` +- `active` + +### Task + +- `created` +- `ready` +- `dispatching` +- `running` +- `verifying` +- `waiting_human` +- terminal: `completed`, `failed`, `blocked`, `cancelled` + +### Attempt + +- `created` +- `claimed` +- `running` +- terminal: `completed`, `failed`, `timed_out`, `cancelled`, `lost` + +### Verification + +- claim/start/finalize flow is fenced like attempts. +- verifier decisions map to pass/retry/replan/fatal style behavior. + +### HumanCheckpoint + +- `open` +- `resolved` +- `timed_out` +- `cancelled` +- `superseded` + +Timeout rules: + +- `timeout_at` requires a valid `default_action`. +- timeout recovery marks the checkpoint `timed_out` +- timeout recovery emits `run.event.hitl.timed_out` +- timeout recovery enqueues `checkpoint_resume` + +## Completed Work + +### Kernel + +- Durable schema and `sqlc` integration. +- Root run creation with eager root task. +- Task dependencies and topological dispatch. +- Planning intents: + - `start_run` + - `checkpoint_resume` + - `attempt_finalize` + - `replan` +- Planner epoch and append/supersede semantics. +- Idempotency records for public control paths. +- Projection snapshots used by current read APIs/UI. + +### Planner / Replanner + +- LLM planner for `start_run`. +- LLM replanner for: + - `InjectRunHint(replan_request)` + - worker `request_replan` + - verifier replan requests +- Strict output contract: + - unknown keys rejected + - legacy aliases rejected + - empty replacement plans rejected + - invalid priorities rejected + - DAG validation before persistence +- Runtime limits through `control_policy.runtime_limits`: + - child tasks per plan + - dependency edges per plan + - total tasks per run + - replans per run + - replan depth + - task goal length + +### Executor / Worker / Verifier + +- Worker registration and heartbeat. +- Attempt claim/start/heartbeat/finalize. +- Verification claim/start/finalize. +- Claim token and claim epoch fencing. +- Lease expiry recovery. +- Retry policy integration. +- Completion replay protection. +- Replay conflict surfaced as non-retryable to daemons. +- `workerd` / `verifyd` run as separate Compose services. + +### HITL + +- Create checkpoint. +- Resolve checkpoint. +- Checkpoint resume planning intent. +- Timeout/default-action recovery. +- Integration test coverage for timeout/default-action resume. + +### Frontend + +- Orchestration page exists and is navigable. +- DAG is custom, not VueFlow. +- Root task is shown as L0. +- Child tasks are topologically layered. +- Node inspector can show thinking/activity, task details, inputs, outputs, logs, checkpoints, artifacts. +- Stop run UI exists. +- Many internal/debug strings have been hidden or localized. +- Unused VueFlow components and dependency have been removed. + +### Tests + +Current coverage includes: + +- service/unit tests for state-machine and contract behavior +- integration tests with PostgreSQL +- blackbox tests through HTTP and independent `workerd` / `verifyd` +- planner/replanner decoder tests +- LLM worker/verifier runtime tests +- migration up/down checks in CI + +## Stage 0. Stabilize The Current Kernel + +The existing Postgres kernel is the substrate every later stage depends on. Before adding JetStream or blackboard, lock down recovery, planner hardening, HITL semantics, and UI honesty so later stages do not paper over current bugs. + +### 0.1 Executor And Recovery Closure + +- Expand crash and lease-loss tests around: + - completion commit failed after worker exit + - ack lost after DB commit + - worker restarted with same attempt context + - worker submits after lease expiry + - verifier equivalents +- Make recovery distinguish four classes explicitly: safe replay, retry budget available, hard replay conflict, stale writer. +- Keep `workerd` and `verifyd` retry loops bounded; replay conflicts stop locally without spinning. +- Add code comments to executor contract types so external executors can later be plugged in without re-reading state machine internals. + +Done when stale writers cannot mutate terminal state, safe replay does not duplicate results or artifacts, and retry budget consistently produces the next attempt. + +### 0.2 Planner / Replanner Hardening + +- Add tests for: dependency cycle, unknown alias reference, over-limit total task count, over-limit replan count, over-limit replan depth, planner failure paths, and non-root replan failure isolation. +- Confirm planner/replanner prompt language matches user-visible result language. +- Keep prompts schema-first; remove any unused or aspirational fields. + +Done when invalid planner output always fails the planning intent cleanly, no invalid DAG rows are persisted, and UI never has to render a raw planner payload by default. + +### 0.3 HITL Barrier Semantics + +- Lock down task-level vs run-level checkpoint behavior. +- Add tests for multiple concurrent checkpoints in a single run. +- Add tests for resolving stale or superseded checkpoints. +- Implement run-wide retirement barrier so siblings are properly parked or cancelled when `blocks_run = true`. +- Persist the chosen barrier source through `(planner_epoch, id)` ordering so recovery is deterministic. + +Done when run-wide checkpoints behave per `RFC.md` invariants, and any sibling attempts are retired before the run is considered paused. + +### 0.4 Observability And UI Honesty + +- Normalize event/activity summaries for human reading. +- Keep activity rows short, expandable for details. +- Show tool and action result summaries before raw JSON. +- Distinguish task output, attempt logs, verifier notes, checkpoints, and artifacts in the inspector. +- Hide internal booleans and backend enums behind translated labels by default. + +Done when a reviewer can read run state from the UI without consulting server logs. + +### 0.5 Stabilization Hygiene + +- Backend regression: `go test ./cmd/workerd ./cmd/verifyd ./internal/orchestration ./internal/orchestrationexec -count=1` plus DB-backed integration/blackbox tests with `TEST_POSTGRES_DSN`. +- Migration up/down both pass. +- Frontend builds in a clean `pnpm` environment. +- Generated artifacts (`sqlc`, swagger, SDK) are consistent with code. +- No unrelated local files (`.codex`, blackbox bin directories, backup tarballs) are committed. + +## Stage 1. NATS Event Bus + +Goal: separate fact ingress from committed run events and make `attempt.fact.*` and `run.event.*` first-class delivery channels, while keeping Postgres authoritative. + +### 1.1 Stream Layout + +- [x] Provision JetStream streams for: + - `memoh.orch.run.event.*` (committed outbox) + - `memoh.orch.attempt.fact.*` (worker/verifier fact ingress) +- [x] Subject helpers in `internal/orchestrationbus/subjects.go` build per-run / per-attempt subjects so consumers can filter without parsing payloads. +- [x] Stream defaults: file storage, `LimitsPolicy`, 24h retention for run events, 1h for attempt facts, MsgID-based deduplication windows. +- [x] Subject constants, envelope schemas, and bus interface live in `internal/orchestrationbus`. The package ships an `InMemoryBus` (single-process / tests) and a `JetStreamBus`. +- [ ] Pending: `env.fact.*` and `artifact.intent.*` streams ship with Stage 3. + +### 1.2 Outbox Dispatcher + +- [x] `internal/orchestrationoutbox` polls `orchestration_events WHERE published_at IS NULL`, publishes each row through the bus, and stamps `published_at` once accepted. +- [x] Partial index `idx_orchestration_events_unpublished` (`db/postgres/migrations/0081_*`) keeps the scan cheap. +- [x] JetStream `WithMsgID(event_id)` means redelivery after a crash never duplicates events for subscribers. +- [x] `Service.SetEventCommittedHook(dispatcher.Notify)` is registered in `cmd/agent`; the dispatcher's `Notify()` channel is exercised by tests. Wiring `notifyEventCommitted()` into every kernel commit path is tracked as a small follow-up; the polling fall-back already keeps publish latency under one tick. + +### 1.3 Fact Ingress + +- [x] `cmd/workerd` connects to the configured bus and emits `attempt.claimed`, `attempt.started`, `attempt.start_failed`, `attempt.completed`, `attempt.failed`, `attempt.lost` envelopes per attempt lifecycle. +- [x] `cmd/verifyd` emits the equivalent `verification.*` facts. +- [x] Both daemons fall back to the in-process bus when `[nats].url` is empty so single-process tests keep working. +- [x] `internal/orchestrationfacts` runs a kernel-side consumer that subscribes globally to every `attempt.fact.*` envelope, validates `claim_epoch` / `claim_token` against Postgres, and surfaces structured outcomes (`accepted`, `stale`, `orphan`, `mismatch`, `invalid`) for operators. The consumer is observe-only at this stage; Postgres remains the authoritative state machine, but the validator is in place so the next iteration can promote it to a control-plane writer without changing the envelope schema. +- [ ] Pending: have the kernel apply observed facts as state transitions instead of the daemons committing through direct service calls. This is gated on Stage 2 blackboard CAS so the control path can move off the direct daemon → kernel calls without losing fencing guarantees. + +### 1.4 Public Event API Refactor + +- [x] `WatchRun` subscribes to the bus (`orchestrationbus.Bus.SubscribeRunEvents`) when one is configured. The handler subscribes before reading the snapshot so live events cannot slip past the backfill window. +- [x] After backfill the watcher forwards bus envelopes, dedupes against `afterSeq`, and reconciles against Postgres every 5s so a transient bus interruption never silently strands a subscriber. +- [x] Polling fall-back kept for deployments where `[nats].url` is empty. +- [x] Web UI consumes `/orchestration/runs/{run_id}/watch` over fetch-based SSE (`apps/web/src/pages/orchestration/composables/use-run-event-stream.ts`). Each committed event triggers a 250ms-debounced inspector refetch; the legacy poll loop drops to a 5s safety-net interval that only fires when the stream is not in `open` state. Reconnect uses an exponential backoff capped at 8s and resumes from the last seen `seq`. + +### 1.5 Tests And Operations + +- [x] Unit tests for the bus (`internal/orchestrationbus/inmem_test.go`, `subjects_test.go`) and the dispatcher (`internal/orchestrationoutbox/outbox_test.go` covering happy path, malformed-row poison-pill, publish failure, notify wakeup). +- [x] Integration test `TestIntegrationWatchRunDeliversEventsThroughBusOutbox` runs a real Postgres + the dispatcher + an `InMemoryBus` and asserts that `WatchRun` delivers events end-to-end and the dispatcher drains `published_at` to zero. +- [ ] Pending: integration test against an actual JetStream instance (Docker compose `nats` already exposes one), and a blackbox test that exercises a full run with `workerd` / `verifyd` reaching the kernel only via bus once the server-side fact consumer lands. +- [ ] Pending: short operational doc covering stream inspection, draining stuck consumers, and replaying from a `seq`. + +Done when `workerd`/`verifyd` drive runtime through JetStream, run timelines are reproducible from Postgres replay, and JetStream loss only delays delivery without changing committed truth. + +## Stage 2. Blackboard + +Goal: introduce a reconstructable shared runtime view with explicit ownership and CAS rules, and remove the implicit pattern of stuffing runtime state into ad-hoc rows. + +### 2.1 Schema And Namespaces + +- [x] Defined KV layout in `internal/orchestrationblackboard/keys.go`: + - `bb.run.{run_id}.{namespace}.{...}` for run-scoped values + - `bb.task.{task_id}.{namespace}.{...}` for task-scoped values +- [x] Each `Value` envelope carries `schema_version`, `writer_type`, `writer_id`, `attempt_id`, `claim_epoch`, `updated_at`, `persistence_class`, and `payload` per `RFC.md`. +- [x] Locked-down namespaces (`internal/orchestrationblackboard/keys.go::Namespace*`): `context`, `plan`, `deps`, `progress`, `result`, `artifacts`, `verifier`, `human`. + +### 2.2 Writer Ownership And CAS + +- [x] `internal/orchestrationblackboard.Writer` enforces scope rules: workers may only write their own task scope, verifiers may only write the verifier namespace, the orchestrator may write any scope. +- [x] `result.*` writes go through `Writer.CompareAndSwap`, which preserves KV revision CAS and rejects writes whose `claim_epoch` is below the value already on the blackboard. +- [x] Stale writers (lost lease, superseded planner epoch, terminal task) are rejected at the blackboard layer through `ErrStaleWriter` / `ErrUnauthorisedWriter`, not just at Postgres. +- [x] Two backends behind the same `Store` interface: `InMemoryStore` (single-process / tests) and `JetStreamStore` on top of NATS KV. The factory in `internal/orchestrationblackboard/factory.go` mirrors `orchestrationbus` and falls back to in-memory when `[nats].url` is empty. + +### 2.3 Frozen Input Manifest Integration + +- [x] Dispatch captures the live blackboard view through `Service.captureBlackboardRevisions` (`internal/orchestration/blackboard.go`) and writes the snapshot into `orchestration_input_manifests.captured_blackboard_revisions` before hashing the manifest. +- [x] Captured revisions cover the run-scope context plus every direct predecessor task's `result.*` keys, so verifier replay can later reach the same view through `GetRevision`. +- [x] When the kernel runs without a wired blackboard store the dispatch path stays identical to its pre-Stage-2 shape, keeping tests and single-process deploys unchanged. +- [x] `Service.publishTaskCompletionToBlackboard` writes `bb.task.{task_id}.result.summary` after the Postgres commit succeeds, with CAS so repeated commits stay idempotent. + +### 2.4 Rebuild Path + +- [x] `Service.RebuildBlackboard(ctx, caller, runID)` (`internal/orchestration/blackboard.go`) walks `orchestration_runs` for run-scope context and `orchestration_task_results` joined with the producing attempt for `result.*` summaries, writing each entry through the orchestrator-class `Writer`. +- [x] Admin entry point lives at `POST /orchestration/runs/{run_id}/blackboard/rebuild` (declared on `orchestrationAPI`, surfaced through the regenerated `@memohai/sdk` as `postOrchestrationRunsByRunIdBlackboardRebuild`). +- [x] Recovery on orchestrator/server restart never assumes blackboard contents are intact; if KV is missing, callers invoke rebuild and the kernel keeps serving Postgres truth in the meantime. +- [ ] Pending: structured artifacts (`bb.task.*.artifacts.*`) and verifier notes (`bb.task.*.verifier.*`) are not yet rebuilt because they require artifact-store / verifier persistence work that lands with later stages. + +### 2.5 Tests + +- [x] Unit tests for envelope encoding (`value_test.go`), key validation (`keys_test.go`), in-memory CAS / stale-writer rejection (`inmem_test.go`, `writer_test.go`), and JetStream KV behaviour gated on `TEST_NATS_URL` (`jetstream_test.go`). +- [x] `TestIntegrationBlackboardCaptureAndPublish` drives a producer/consumer DAG end-to-end and asserts both publish and capture paths. +- [x] `TestIntegrationRebuildBlackboardRecoversAfterStoreWipe` wipes the in-memory store mid-run and asserts the rebuild path restores the runtime view from Postgres. +- [x] `TestIntegrationRebuildBlackboardWithoutStoreReturnsConfiguredFalse` pins the no-store contract for deploys without NATS. +- [ ] Pending: a JetStream-backed blackbox test that wipes the live KV bucket mid-run; gated on the same JetStream blackbox harness Stage 1 still owes. + +Done when blackboard is observable, writer ownership is enforced, and the system tolerates KV loss without losing committed state. The kernel-side contract is in place; the remaining bullets are JetStream-backed end-to-end coverage that will land alongside the Stage 1 blackbox follow-up. + +## Stage 3. Env Session Runtime + +Goal: bring external environment state (browser context, desktop, phone, container) into a first-class lease/snapshot model so tasks can resume across attempts and side effects can be audited. + +### 3.1 Schema + +- Add tables and `sqlc` queries for: + - `orchestration_env_resources` + - `orchestration_env_sessions` + - `orchestration_env_lease_reservations` + - `orchestration_env_bindings` + - `orchestration_env_snapshots` + +### 3.2 Env Manager Interface + +- Add `internal/env` with a backend-neutral `EnvManager` per `RFC.md`. +- Provide initial backends: + - `internal/env/container` (current containerd workspace as an EnvResource) + - `internal/env/browser` (browser gateway sessions) +- Implement reserve/commit/abort, bind, hold, snapshot, reset, release. +- Plumb env fences (`env_lease_epoch`, `env_lease_token`) through worker writes and side-effect execution. + +### 3.3 Action Ledger Wiring + +- Extend `orchestration_action_ledger` writes to be the authoritative log for env actions. +- Classify actions by `effect_class`: env_local_mutation, external_read, external_write, external_irreversible. +- Snapshot before/after env state for non-trivial actions. + +### 3.4 HITL And Approval Tokens + +- Hold env sessions across checkpoint waits when `resume_policy.resume_mode = resume_held_env`. +- Side-effect approval tokens bind to `(attempt_id, claim_epoch, env_session_id, env_lease_epoch)`. +- Enforce that `external_irreversible` actions require an approval token plus matching fences. + +### 3.5 Drift Detection + +- Workers periodically observe env snapshots. +- Verifier compares before/after snapshots against task expectations. +- Drift outcomes feed into retry/replan/HITL policy. + +Done when an interactive task can pause for human input, hold its env session, resume in a new attempt, and side effects are audit-traceable. + +## Stage 4. Resource Accounting + +Goal: replace implicit capacity assumptions with explicit quota, ledger, and admission control so the system can be operated with predictable headroom. + +### 4.1 Schema + +- Add tables and `sqlc` queries for: + - `orchestration_resource_quotas` + - `orchestration_resource_ledger` + - `orchestration_resource_holds` + - `orchestration_artifact_reservations` + +### 4.2 Admission + +- Scheduler admission consults quota, current holds, and worker profile concurrency before claim. +- Held env sessions count toward owner quota until released or reclaimed. +- Tasks blocked on admission stay `ready` and enter an admission queue. + +### 4.3 Reclaim And Fairness + +- Implement reclaim for: idle reservations, held env sessions over TTL, parked attempts beyond hold TTL. +- Default fairness uses per-tenant weighted scheduling, then priority/age within tenant. +- All reclaim/expire/preempt events write to `orchestration_resource_ledger`. + +### 4.4 Tests + +- Property-style tests for ledger invariants (sum of holds equals active reservations). +- Integration tests for admission under contention. +- Blackbox test that runs hit quota and recover after release. + +Done when capacity decisions are auditable from `orchestration_resource_ledger` alone. + +## Stage 5. Experience Layer + +Goal: turn verified outcomes and useful failure recoveries into a structured experience store that planner, scheduler, worker, and verifier can consult. + +### 5.1 Schema + +- Add tables for `orchestration_experience_records` and `orchestration_experience_feedback`. +- Define structured fields per `RFC.md`: `kind`, `scope`, `worker_profile`, `evidence_refs`, `structured_data`, `confidence`, `verified`, `version`. + +### 5.2 Promotion Pipeline + +- Implement extractor, verifier, deduper, scorer, and promoter stages over completed runs. +- Inputs: verified artifacts, run summary, failure/retry records, verifier outputs, human checkpoint resolutions. +- Output: promotable experience records with evidence references back to the source run. + +### 5.3 Retrieval Hooks + +- Planner consults experience for decomposition templates and worker profile recommendations. +- Scheduler consults experience for timeout/retry tuning. +- Verifier consults experience for known-good and known-bad signatures. + +### 5.4 Governance + +- Tool persistence policy (`ToolPersistencePolicy`) enforced at tool registration. +- `external_side_effect` outputs never auto-promote. +- Versioning, retention, and rollback for experience entries. + +Done when re-running a similar goal produces measurable benefit from the experience store and degraded entries can be pruned without losing audit trail. + +## Test Matrix + +Each stage extends this matrix. Earlier stages must keep passing as later stages land. + +### Unit + +- planner decoder strictness +- replanner decoder strictness +- DAG validation +- runtime limits +- retry policy +- status transition helpers +- idempotency canonicalization +- outbox dispatcher idempotency and replay (Stage 1) +- blackboard CAS and stale writer rejection (Stage 2) +- env lease fencing and approval token binding (Stage 3) +- resource ledger invariants (Stage 4) +- experience promotion gates (Stage 5) + +### Integration + +- migration up/down +- checkpoint resolve/timeout +- planning intent recovery +- task dispatch/retry +- verification outcomes +- projection snapshot reads +- JetStream-backed fact ingress and committed event delivery (Stage 1) +- blackboard rebuild from Postgres (Stage 2) +- env reserve/commit/abort and snapshot capture (Stage 3) +- admission under contention and reclaim of stale holds (Stage 4) +- experience retrieval through planner/scheduler/verifier paths (Stage 5) + +### Blackbox / E2E + +- `StartRun -> plan -> dispatch -> workerd -> verifyd -> completed` +- worker failure -> replan -> replacement task -> completed +- verifier rejection -> replan -> completed +- checkpoint pause -> resolve -> resume +- checkpoint timeout -> default action -> resume +- worker crash / lease expiry -> retry or recover +- ack-loss replay for worker/verifier +- run completes with `workerd`/`verifyd` driven only via JetStream (Stage 1) +- run completes after blackboard KV is wiped mid-run (Stage 2) +- interactive task pauses, holds env session, resumes across attempts (Stage 3) +- run blocks on quota then resumes after release (Stage 4) +- repeated similar runs benefit from experience records (Stage 5) +- UI reads run/tasks/checkpoints/artifacts/events through public APIs at every stage + +## Resource Expectations + +Current always-on cost: + +- `server` +- `workerd` +- `verifyd` +- `web` +- Postgres +- NATS JetStream (becomes a runtime dependency in Stage 1) +- optional `qdrant`/`sparse`/`browser` + +`workerd` and `verifyd` are around 70-80 MB RSS each. That is Go daemon idle overhead from config, DB pool, model/runtime wiring, and container/workspace deps, not per-task worker memory. After Stage 1, JetStream-driven wakeups will reduce CPU spend during idle, but RSS stays similar. RSS reduction would require process consolidation or on-demand supervision and is not in this plan. + +NATS adds modest baseline cost when used at scale: one JetStream stream per subject family with default replicas, plus durable consumer state. This stays well within typical dev/staging budgets. + +## Stage Completion Checklists + +Stage 0: + +- [ ] backend regression and DB integration/blackbox tests pass +- [ ] migration up/down both pass +- [ ] frontend build passes in a clean `pnpm` environment +- [ ] orchestration page is usable on restored dev data +- [ ] HITL barrier semantics match `RFC.md` invariants + +Stage 1: + +- [x] JetStream streams provisioned with documented retention/dedup window in `internal/orchestrationbus/jetstream.go` +- [x] outbox dispatcher publishes committed events idempotently and stamps `published_at` per row +- [x] `WatchRun` subscribes to the bus before reading the snapshot and reconciles against Postgres on a 5s tick +- [x] replay-from-`seq` works for late subscribers via Postgres backfill plus bus dedupe on `afterSeq` +- [x] kernel-side fact consumer in `internal/orchestrationfacts` validates `attempt.fact.*` envelopes against Postgres and emits structured outcomes for operators +- [x] live UI thinking/activity stream consumes SSE-backed `WatchRun` (`apps/web/src/pages/orchestration/composables/use-run-event-stream.ts`) and falls back to a 5s poll only when the bus is unavailable +- [ ] `workerd` / `verifyd` drive control transitions through the bus instead of direct service calls (gated on Stage 2 blackboard CAS so fencing semantics survive the move) + +Stage 2: + +- [x] blackboard KV layout matches `RFC.md` (`internal/orchestrationblackboard/keys.go`, `value.go`) +- [x] CAS rules and stale writer rejection are enforced (`internal/orchestrationblackboard/writer.go`, `inmem.go`, `jetstream.go`) +- [x] dispatch produces frozen `InputManifest` covering predecessor `result.*` and run-scope context (`internal/orchestration/blackboard.go::captureBlackboardRevisions`) +- [x] `Service.RebuildBlackboard(run_id)` reconstructs the runtime view from Postgres (`internal/orchestration/blackboard.go`) and is reachable through `POST /orchestration/runs/{run_id}/blackboard/rebuild` +- [ ] artifact/verifier rebuild paths land alongside Stage 3/Stage 4 once those stores are persisted + +Stage 3: + +- [ ] env tables and `sqlc` queries shipped +- [ ] `EnvManager` interface and at least two backends in place +- [ ] action ledger captures classified env actions +- [ ] HITL hold/resume works for held env sessions +- [ ] approval tokens enforced for irreversible actions + +Stage 4: + +- [ ] resource tables and `sqlc` queries shipped +- [ ] admission consults quota and current holds +- [ ] reclaim/expire writes to ledger +- [ ] fairness behavior under contention is testable + +Stage 5: + +- [ ] experience tables and `sqlc` queries shipped +- [ ] promotion pipeline gates by verification status +- [ ] planner/scheduler/verifier consult experience +- [ ] retention and rollback supported + +## Next Step + +Where we are right now: + +1. Stage 0 stabilization (executor recovery, planner hardening, HITL barriers, observability, hygiene) is the substrate the rest of this plan rides on. Keep its tests green as later stages land. +2. Stage 1 (NATS event bus) is mostly landed: subject contract, in-memory + JetStream bus, committed-event outbox, bus-backed `WatchRun`, fact emission from `workerd`/`verifyd`, the kernel-side fact consumer, and the SSE-driven UI thinking/activity stream are all in. The two open bullets are (a) moving control-plane state transitions onto the bus instead of direct daemon → kernel calls (now unblocked by Stage 2 CAS) and (b) a JetStream-backed blackbox test against the Compose `nats` service. +3. Stage 2 (blackboard) is in: contract + namespaces, in-memory + JetStream KV stores, writer ownership / CAS, frozen `InputManifest` capture at dispatch, post-commit `result.summary` publish, and the Postgres-backed `RebuildBlackboard` recovery path with an admin endpoint. Remaining work is a JetStream-backed blackbox wipe-and-recover test (shares the harness with Stage 1's open blackbox bullet) and rebuild coverage for artifacts and verifier notes once those persistence stores land in Stages 3-4. +4. Stage 3 (env session runtime) is the next net-new stage. The blackboard CAS layer plus the kernel-side fact consumer give us the substrate to thread `env_lease_epoch` / `env_lease_token` through worker writes without losing fencing guarantees. +5. Stages 4 and 5 follow in order. Every earlier stage's tests must keep passing as later stages land. + +Each stage ends with docs and PR descriptions updated to reflect actual landed behavior, not target state. diff --git a/apps/web/package.json b/apps/web/package.json index d544e6000..910ba02fc 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -27,6 +27,7 @@ "@tailwindcss/vite": "^4.2.2", "@tanstack/vue-table": "^8.21.3", "@vee-validate/zod": "^4.15.1", + "@vue-flow/core": "^1.45.0", "@vueuse/core": "^14.1.0", "@vueuse/integrations": "^14.2.1", "@xterm/addon-fit": "^0.11.0", @@ -35,6 +36,7 @@ "animate.css": "^4.1.1", "cron-parser": "^5.5.0", "cronstrue": "^3.14.0", + "dagre": "^0.8.5", "dotenv": "^17.2.3", "echarts": "^6.0.0", "katex": "^0.16.28", @@ -63,6 +65,7 @@ }, "devDependencies": { "@memohai/config": "workspace:*", + "@types/dagre": "^0.7.54", "@types/moment": "^2.13.0", "@types/node": "^24.10.1", "@types/qrcode": "^1.5.6", diff --git a/apps/web/src/components/orchestration-sidebar/index.vue b/apps/web/src/components/orchestration-sidebar/index.vue new file mode 100644 index 000000000..37285f5c0 --- /dev/null +++ b/apps/web/src/components/orchestration-sidebar/index.vue @@ -0,0 +1,107 @@ + + + diff --git a/apps/web/src/components/sidebar/index.vue b/apps/web/src/components/sidebar/index.vue index 8a0ca7f43..a219ea100 100644 --- a/apps/web/src/components/sidebar/index.vue +++ b/apps/web/src/components/sidebar/index.vue @@ -84,6 +84,17 @@
+ + + + {{ t('sidebar.orchestration') }} + + (() => sortBots(botData.value?.items ?? [])) const isSettingsActive = computed(() => route.path.startsWith('/settings')) +const isOrchestrationActive = computed(() => route.path.startsWith('/orchestration')) diff --git a/apps/web/src/i18n/locales/en.json b/apps/web/src/i18n/locales/en.json index 477fd3dca..ee3319791 100644 --- a/apps/web/src/i18n/locales/en.json +++ b/apps/web/src/i18n/locales/en.json @@ -104,6 +104,8 @@ "platform": "Platform", "usage": "Usage", "appearance": "Appearance", + "browser": "Browser", + "orchestration": "Orchestration", "supermarket": "Supermarket", "about": "About" }, @@ -614,6 +616,304 @@ "name": "Name", "namePlaceholder": "Configuration name" }, + "orchestration": { + "title": "Orchestration", + "botLabel": "Bot", + "runList": "Runs", + "noBotsTitle": "No Bots", + "noBotsDescription": "Create a bot first to inspect orchestration runs.", + "noRunsTitle": "No Runs", + "noRunsDescription": "This bot has no orchestration runs yet.", + "summary": "Run Summary", + "dag": "Tasks", + "timeline": "Timeline", + "taskDetail": "Task Detail", + "snapshotSeq": "Snapshot Seq", + "goal": "Goal", + "rootTask": "Root Task", + "lifecycleStatus": "Lifecycle", + "planningStatus": "Planning", + "terminalReason": "Terminal Reason", + "createdAt": "Created At", + "updatedAt": "Updated At", + "finishedAt": "Finished At", + "openCheckpoints": "Open Checkpoints", + "readyTasks": "Ready", + "dispatchingTasks": "Dispatching", + "runningTasks": "Running", + "verifyingTasks": "Verifying", + "waitingHumanTasks": "Waiting Human", + "completedTasks": "Completed", + "failedTasks": "Failed", + "cancelledTasks": "Cancelled", + "activeAttempts": "Active Attempts", + "activeVerifications": "Active Verifications", + "activeWorkers": "Active Workers", + "status": "Status", + "workerProfile": "Worker Profile", + "selectedTask": "Selected Task", + "attempts": "Attempts", + "verifications": "Verifications", + "checkpoints": "Checkpoints", + "artifacts": "Artifacts", + "results": "Results", + "dependencies": "Dependencies", + "noTaskSelected": "Select a task to inspect execution details.", + "noTimeline": "No timeline events yet.", + "refresh": "Refresh", + "runInspector": "Run Inspector", + "metricTotalTasks": "Tasks", + "searchRunsPlaceholder": "Filter runs…", + "noRunMatches": "No runs match your filter.", + "runIdShort": "Run", + "rootTaskId": "Root", + "copyRunId": "Copy run id", + "copyTaskId": "Copy task id", + "graphHint": "Click node or table row", + "taskList": "Tasks", + "runs": "Runs", + "task": "Task", + "root": "Root", + "latestResult": "Latest result", + "rawFields": "Details", + "reason": "Reason", + "loadingInspector": "Loading run…", + "inspectorLoadFailed": "Failed to load run inspector", + "checkpointOpen": "open", + "workers": "Workers", + "workerTableCol": "Wkr", + "tasksPanelTitle": "Task list and details", + "tasksPanelHint": "Click a row on the left — the right pane shows the same task. The DAG stays in sync.", + "tasksPanelAria": "Task list and task detail", + "noTaskSelectedHint": "Select a task in the list on the left (or a node in the graph). Details for that task appear here.", + "studioTitle": "Agent DAG Studio", + "studioSubtitle": "Orchestration graph and execution inspector", + "runAction": "Run", + "runStarted": "Run started", + "runStartFailed": "Failed to start run", + "retryRun": "Retry run", + "runRetryStarted": "Run restarted", + "runRetryFailed": "Failed to retry run", + "retryTask": "Retry task", + "taskRetryStarted": "Task requeued", + "taskRetryFailed": "Failed to retry task", + "stopTask": "Stop task", + "taskStopRequested": "Task stopped", + "taskStopFailed": "Failed to stop task", + "stopRun": "Stop run", + "runStopRequested": "Stop requested", + "runStopFailed": "Failed to stop run", + "checkpointResolved": "Checkpoint resolved", + "checkpointResolveFailed": "Failed to resolve checkpoint", + "copyFailed": "Failed to copy", + "autoLayout": "Auto Layout", + "fitView": "Fit View", + "fullscreen": "Fullscreen", + "zoomIn": "Zoom in", + "zoomOut": "Zoom out", + "taskDag": "Tasks", + "runFlow": "Run Flow", + "dagView": "DAG", + "flowView": "Flow", + "noFlowSpans": "No flow records yet", + "flowPlanning": "Planning", + "flowReplanning": "Replanning", + "flowAttempt": "Attempt", + "flowAttemptFinalize": "Finalize attempt", + "flowVerification": "Verification", + "flowCheckpoint": "Human checkpoint", + "flowCheckpointResume": "Resume checkpoint", + "flowStep": "Step", + "flowSteps": "Steps", + "flowActions": "Actions", + "flowActionCount": "{count} actions", + "runFlowDescription": "Shows planning, attempts, replanning, verification, and human checkpoints in the order they actually happened.", + "flowOpenInspectorHint": "Click to inspect node actions", + "flowSystemStepHint": "System flow step", + "searchBotsPlaceholder": "Search bots", + "searchTasksPlaceholder": "Search tasks", + "rootTaskEntryHint": "Entry node for decomposed tasks", + "nodeLibrary": "Node Library", + "allTasks": "All Tasks", + "noTaskSelectedShort": "No task selected", + "lastRun": "Last run", + "nodeInspector": "Node Inspector", + "node": "node", + "config": "Config", + "env": "Environment", + "envResources": "Environments", + "images": "Images", + "addImage": "Add Image", + "imageDetail": "Image Details", + "defaultImage": "Default", + "addedImage": "Added", + "imageName": "Name", + "imageAddress": "Image", + "imageTag": "Image tag", + "imageType": "Type", + "imageTypeRegistry": "Existing image", + "imageTypeDockerfile": "Build from Dockerfile", + "imageSource": "Source", + "imageSourceRegistry": "Existing image", + "imageSourceDockerfile": "Dockerfile", + "imageRegistryDockerHub": "Docker Hub", + "imageRegistryGHCR": "GitHub Container Registry", + "imageRegistryQuay": "Quay", + "imageRegistryCustom": "Custom Registry", + "imageRegistryLocalBuild": "Local build", + "imageStatusReady": "Ready", + "imageStatusPending": "Pending build", + "imageStatusBuilding": "Building", + "imageStatusFailed": "Failed", + "imageDigest": "Digest", + "imageBuildError": "Build Error", + "imageNotFound": "Image not found", + "dockerfile": "Dockerfile", + "dockerfileFile": "Dockerfile file", + "importDockerfile": "Import", + "imageBuildUnsupported": "The current container backend does not support Dockerfile builds", + "imageBuildTarget": "Build target", + "imageBuildPlatform": "Platform", + "imageBuildNoCache": "No cache", + "imageBuildPull": "Pull base image", + "imageBuildArgs": "Build args", + "imageBuildArgName": "Arg name", + "imageBuildArgValue": "Value", + "imageBuildLabels": "Labels", + "imageBuildLabelName": "Label name", + "imageBuildLabelValue": "Value", + "imageAdded": "Added", + "imageAddFailed": "Could not add image", + "loadingImages": "Loading", + "addEnv": "Add Env", + "editEnv": "Edit Env", + "deleteEnv": "Delete Env", + "deleteEnvConfirm": "This cannot be undone. Environments with run history cannot be deleted; archive them instead.", + "loadingEnvResources": "Loading", + "noEnvResources": "None", + "envResourceName": "Name", + "envResourceType": "Type", + "envResourceNameRequired": "Name is required", + "envResourceImageRequired": "Image is required", + "envResourcePromptName": "Resource name", + "envContainerImage": "Container image", + "envResourceImage": "Image", + "envResourceStatus": "Status", + "envVariableName": "Name", + "envVariableValue": "Value", + "envResourceEnv": "Environment variables", + "envResourceCreated": "Added", + "envResourceCreateFailed": "Could not add environment", + "envResourceUpdated": "Saved", + "envResourceUpdateFailed": "Could not save environment", + "envResourceDeleted": "Deleted", + "envResourceDeleteFailed": "Could not delete environment", + "taskInfo": "Info", + "inputs": "Inputs", + "outputs": "Outputs", + "logs": "Logs", + "act": "Act", + "noAct": "No actions yet", + "thinking": "Thinking", + "noThinking": "None", + "thinkingReasoning": "Thinking", + "thinkingRead": "Read file", + "thinkingListed": "Listed directory", + "thinkingRanCommand": "Ran command", + "thinkingUsedTool": "Used tool", + "thinkingGeneratedResult": "Generated result", + "belongsToRun": "Belongs to Run", + "description": "Description", + "worker": "Worker", + "attempt": "Attempt", + "upstreamNodes": "Upstream Nodes", + "downstreamNodes": "Downstream Nodes", + "execution": "Execution", + "toolCalls": "Tool calls", + "resultSummary": "Result summary", + "resultDetails": "Details", + "technicalDetails": "Debug info", + "relatedTasks": "Related tasks", + "outputRecords": "Output records", + "validationRecords": "Validation records", + "pendingQuestions": "Pending questions", + "noEnv": "No reservation", + "envSession": "Session", + "envLease": "Lease version", + "envBinding": "Binding", + "envSnapshots": "Snapshots", + "envDriftStatus": "Changes", + "envBindingDetails": "Reservation", + "envResource": "Resource", + "envMode": "Access", + "envEffectClass": "Side effects", + "envLeaseToken": "Lease token", + "envLeaseEpoch": "Version {epoch}", + "envChanged": "Changed", + "envUnchanged": "No change", + "envChangeUnknown": "Not enough data", + "envNotUsed": "Not used", + "envSnapshotBefore": "Before", + "envSnapshotAfter": "After", + "envSnapshotPeriodic": "Checkpoint", + "envActionReserved": "Reserved", + "envActionReleased": "Released", + "envActionHeld": "Kept for human input", + "envActionResumed": "Reused after input", + "envModeRead": "Read only", + "envModeWrite": "Read and write", + "envEffectLocalRead": "Local read", + "envEffectLocalChange": "Local change", + "envEffectExternalRead": "Reads external service", + "envEffectExternalWrite": "Writes to external service", + "envEffectExternalIrreversible": "Needs approval", + "envKindContainer": "Container", + "envKindBrowser": "Browser", + "noEnvSnapshots": "None", + "executionAttempt": "Execution #{count}", + "validationRun": "Validation run", + "noExecutionSummary": "No execution summary yet", + "noExecutionSpans": "None", + "artifactsInputs": "Artifacts / Inputs", + "inputManifests": "Input manifests", + "checkpoint": "Checkpoint", + "taskId": "Task ID", + "heartbeat": "Heartbeat", + "tasks": "Tasks", + "blockedTasks": "Blocked", + "pendingTasks": "Pending", + "statusSuccess": "Success", + "statusPending": "Pending", + "statusIdle": "Idle", + "statusActive": "Active", + "statusDisabled": "Disabled", + "statusArchived": "Archived", + "statusRunning": "Running", + "statusDispatching": "Dispatching", + "statusVerifying": "Verifying", + "statusWaitingHuman": "Waiting for input", + "statusFailed": "Failed", + "statusBlocked": "Blocked", + "statusCancelled": "Cancelled", + "stageRootGoal": "Root goal", + "stageTaskCount": "{count} tasks", + "stageRunGoal": "Run Goal", + "stageInput": "Input", + "stageUnderstanding": "Understanding", + "stagePlanning": "Planning", + "stageExecution": "Execution", + "stageValidation": "Validation", + "stageOutput": "Output", + "nodeKindTrigger": "Trigger", + "nodeKindLlm": "LLM", + "nodeKindPlanner": "Planner", + "nodeKindSearch": "Search", + "nodeKindTool": "Tool", + "nodeKindMemory": "Memory", + "nodeKindMerge": "Merge", + "nodeKindValidation": "Validation", + "nodeKindOutput": "Output" + }, "speech": { "title": "Speech", "add": "Add Speech Provider", diff --git a/apps/web/src/i18n/locales/zh.json b/apps/web/src/i18n/locales/zh.json index 9587af07d..2658ec04d 100644 --- a/apps/web/src/i18n/locales/zh.json +++ b/apps/web/src/i18n/locales/zh.json @@ -105,6 +105,8 @@ "usage": "用量统计", "appearance": "外观定制", "supermarket": "应用市场", + "browser": "浏览器", + "orchestration": "编排", "about": "关于" }, "breadcrumb": { @@ -610,6 +612,304 @@ "name": "名称", "namePlaceholder": "输入配置名称" }, + "orchestration": { + "title": "编排", + "botLabel": "Bot", + "runList": "运行", + "noBotsTitle": "暂无 Bot", + "noBotsDescription": "请先创建 Bot, 再查看编排运行。", + "noRunsTitle": "暂无运行", + "noRunsDescription": "当前 Bot 还没有编排运行记录。", + "summary": "运行概览", + "dag": "任务", + "timeline": "时间线", + "taskDetail": "任务详情", + "snapshotSeq": "快照序号", + "goal": "目标", + "rootTask": "根任务", + "lifecycleStatus": "生命周期", + "planningStatus": "规划状态", + "terminalReason": "终止原因", + "createdAt": "创建时间", + "updatedAt": "更新时间", + "finishedAt": "完成时间", + "openCheckpoints": "打开的检查点", + "readyTasks": "就绪", + "dispatchingTasks": "调度中", + "runningTasks": "运行中", + "verifyingTasks": "验证中", + "waitingHumanTasks": "等待确认", + "completedTasks": "已完成", + "failedTasks": "失败", + "cancelledTasks": "已取消", + "activeAttempts": "活跃尝试", + "activeVerifications": "活跃验证", + "activeWorkers": "活跃执行器", + "status": "状态", + "workerProfile": "执行器配置", + "selectedTask": "选中任务", + "attempts": "执行尝试", + "verifications": "验证记录", + "checkpoints": "检查点", + "artifacts": "产物", + "results": "结果", + "dependencies": "依赖", + "noTaskSelected": "请选择一个任务以查看执行细节。", + "noTimeline": "暂无时间线事件。", + "refresh": "刷新", + "runInspector": "运行检查器", + "metricTotalTasks": "任务数", + "searchRunsPlaceholder": "筛选运行…", + "noRunMatches": "没有匹配的运行。", + "runIdShort": "运行", + "rootTaskId": "根任务", + "copyRunId": "复制运行 ID", + "copyTaskId": "复制任务 ID", + "graphHint": "点击节点或表格行", + "taskList": "任务列表", + "runs": "运行", + "task": "任务", + "root": "根", + "latestResult": "最新结果", + "rawFields": "原始字段", + "reason": "原因", + "loadingInspector": "加载运行中…", + "inspectorLoadFailed": "运行详情加载失败", + "checkpointOpen": "打开", + "workers": "执行器", + "workerTableCol": "执", + "tasksPanelTitle": "任务列表与详情", + "tasksPanelHint": "在左侧点选一行,右侧会显示同一任务;与上方 DAG 选中状态一致。", + "tasksPanelAria": "任务列表与任务详情", + "noTaskSelectedHint": "在左侧列表或上方 DAG 中选择一个任务,右侧会显示对应详情。", + "studioTitle": "编排工作台", + "studioSubtitle": "编排图与执行检查器", + "runAction": "启动运行", + "runStarted": "运行已启动", + "runStartFailed": "启动运行失败", + "retryRun": "重新运行", + "runRetryStarted": "已重新发起运行", + "runRetryFailed": "重新运行失败", + "retryTask": "重试任务", + "taskRetryStarted": "已重新调度任务", + "taskRetryFailed": "重试任务失败", + "stopTask": "停止任务", + "taskStopRequested": "已停止任务", + "taskStopFailed": "停止任务失败", + "stopRun": "停止运行", + "runStopRequested": "已请求停止运行", + "runStopFailed": "停止运行失败", + "checkpointResolved": "已提交确认", + "checkpointResolveFailed": "提交确认失败", + "copyFailed": "复制失败", + "autoLayout": "自动布局", + "fitView": "适配视图", + "fullscreen": "全屏", + "zoomIn": "放大", + "zoomOut": "缩小", + "taskDag": "任务", + "runFlow": "流程", + "dagView": "DAG", + "flowView": "流程", + "noFlowSpans": "暂无流程记录", + "flowPlanning": "规划", + "flowReplanning": "重规划", + "flowAttempt": "执行尝试", + "flowAttemptFinalize": "完成尝试", + "flowVerification": "验证", + "flowCheckpoint": "人工确认", + "flowCheckpointResume": "恢复人工确认", + "flowStep": "步骤", + "flowSteps": "步骤", + "flowActions": "动作", + "flowActionCount": "{count} 个动作", + "runFlowDescription": "按真实发生顺序展示规划、执行尝试、重规划、验证和人工确认。", + "flowOpenInspectorHint": "点击查看节点动作", + "flowSystemStepHint": "系统流程步骤", + "searchBotsPlaceholder": "搜索 Bot", + "searchTasksPlaceholder": "搜索任务", + "rootTaskEntryHint": "分解后任务的入口节点", + "nodeLibrary": "节点图例", + "allTasks": "全部任务", + "noTaskSelectedShort": "未选择任务", + "lastRun": "最近运行", + "nodeInspector": "节点检查器", + "node": "节点", + "config": "配置", + "env": "环境", + "envResources": "环境", + "images": "镜像", + "addImage": "添加镜像", + "imageDetail": "镜像详情", + "defaultImage": "默认", + "addedImage": "已添加", + "imageName": "名称", + "imageAddress": "镜像地址", + "imageTag": "镜像标签", + "imageType": "类型", + "imageTypeRegistry": "现有镜像", + "imageTypeDockerfile": "从 Dockerfile 构建", + "imageSource": "来源", + "imageSourceRegistry": "现有镜像", + "imageSourceDockerfile": "Dockerfile", + "imageRegistryDockerHub": "Docker Hub", + "imageRegistryGHCR": "GitHub Container Registry", + "imageRegistryQuay": "Quay", + "imageRegistryCustom": "自定义 Registry", + "imageRegistryLocalBuild": "本地构建", + "imageStatusReady": "可用", + "imageStatusPending": "待构建", + "imageStatusBuilding": "构建中", + "imageStatusFailed": "失败", + "imageDigest": "摘要", + "imageBuildError": "构建错误", + "imageNotFound": "镜像不存在", + "dockerfile": "Dockerfile", + "dockerfileFile": "Dockerfile 文件", + "importDockerfile": "导入", + "imageBuildUnsupported": "当前容器后端不支持 Dockerfile 构建", + "imageBuildTarget": "构建阶段", + "imageBuildPlatform": "平台", + "imageBuildNoCache": "不使用缓存", + "imageBuildPull": "拉取基础镜像", + "imageBuildArgs": "构建参数", + "imageBuildArgName": "参数名", + "imageBuildArgValue": "值", + "imageBuildLabels": "标签", + "imageBuildLabelName": "标签名", + "imageBuildLabelValue": "值", + "imageAdded": "已添加", + "imageAddFailed": "添加失败", + "loadingImages": "加载中", + "addEnv": "添加环境", + "editEnv": "编辑环境", + "deleteEnv": "删除环境", + "deleteEnvConfirm": "删除后无法恢复。已有运行记录的环境不能删除,可以改为归档。", + "loadingEnvResources": "加载中", + "noEnvResources": "暂无", + "envResourceName": "名称", + "envResourceType": "类型", + "envResourceNameRequired": "先填写名称", + "envResourceImageRequired": "先填写镜像", + "envResourcePromptName": "资源名", + "envContainerImage": "容器镜像", + "envResourceImage": "镜像", + "envResourceStatus": "状态", + "envVariableName": "变量名", + "envVariableValue": "值", + "envResourceEnv": "环境变量", + "envResourceCreated": "已添加", + "envResourceCreateFailed": "添加失败", + "envResourceUpdated": "已保存", + "envResourceUpdateFailed": "保存失败", + "envResourceDeleted": "已删除", + "envResourceDeleteFailed": "删除失败", + "taskInfo": "任务信息", + "inputs": "输入", + "outputs": "输出", + "logs": "日志", + "act": "动作", + "noAct": "暂无动作", + "thinking": "思考", + "noThinking": "暂无", + "thinkingReasoning": "正在思考", + "thinkingRead": "读取文件", + "thinkingListed": "查看目录", + "thinkingRanCommand": "运行命令", + "thinkingUsedTool": "使用工具", + "thinkingGeneratedResult": "生成结果", + "belongsToRun": "所属运行", + "description": "描述", + "worker": "执行器", + "attempt": "执行尝试", + "upstreamNodes": "上游节点", + "downstreamNodes": "下游节点", + "execution": "执行", + "toolCalls": "工具调用", + "resultSummary": "结果摘要", + "resultDetails": "详细内容", + "technicalDetails": "调试信息", + "relatedTasks": "关联任务", + "outputRecords": "输出记录", + "validationRecords": "验证记录", + "pendingQuestions": "待确认", + "noEnv": "未申请环境", + "envSession": "会话", + "envLease": "租约版本", + "envBinding": "绑定", + "envSnapshots": "快照", + "envDriftStatus": "变化", + "envBindingDetails": "分配情况", + "envResource": "资源", + "envMode": "访问方式", + "envEffectClass": "会做什么", + "envLeaseToken": "租约令牌", + "envLeaseEpoch": "第 {epoch} 版", + "envChanged": "有变化", + "envUnchanged": "无变化", + "envChangeUnknown": "信息不足", + "envNotUsed": "未使用", + "envSnapshotBefore": "执行前", + "envSnapshotAfter": "执行后", + "envSnapshotPeriodic": "中途记录", + "envActionReserved": "已分配", + "envActionReleased": "已释放", + "envActionHeld": "等待人工确认时保留", + "envActionResumed": "确认后继续使用", + "envModeRead": "只读", + "envModeWrite": "可读写", + "envEffectLocalRead": "本地读取", + "envEffectLocalChange": "本地修改", + "envEffectExternalRead": "读取外部服务", + "envEffectExternalWrite": "写入外部服务", + "envEffectExternalIrreversible": "需要审批", + "envKindContainer": "容器", + "envKindBrowser": "浏览器", + "noEnvSnapshots": "暂无", + "executionAttempt": "第 {count} 次执行", + "validationRun": "验证执行", + "noExecutionSummary": "暂无执行摘要", + "noExecutionSpans": "暂无", + "artifactsInputs": "产物 / 输入", + "inputManifests": "输入快照", + "checkpoint": "检查点", + "taskId": "任务 ID", + "heartbeat": "心跳", + "tasks": "任务", + "blockedTasks": "阻塞", + "pendingTasks": "等待", + "statusSuccess": "成功", + "statusPending": "待处理", + "statusIdle": "空闲", + "statusActive": "活跃", + "statusDisabled": "已禁用", + "statusArchived": "已归档", + "statusRunning": "运行中", + "statusDispatching": "调度中", + "statusVerifying": "验证中", + "statusWaitingHuman": "等待确认", + "statusFailed": "失败", + "statusBlocked": "阻塞", + "statusCancelled": "已取消", + "stageRootGoal": "总目标", + "stageTaskCount": "{count} 个任务", + "stageRunGoal": "总目标", + "stageInput": "输入", + "stageUnderstanding": "理解", + "stagePlanning": "规划", + "stageExecution": "执行", + "stageValidation": "验证", + "stageOutput": "输出", + "nodeKindTrigger": "入口", + "nodeKindLlm": "LLM", + "nodeKindPlanner": "规划器", + "nodeKindSearch": "搜索", + "nodeKindTool": "工具", + "nodeKindMemory": "记忆", + "nodeKindMerge": "合并", + "nodeKindValidation": "验证", + "nodeKindOutput": "输出" + }, "speech": { "title": "语音合成", "add": "添加语音合成提供方", diff --git a/apps/web/src/pages/orchestration-section/index.vue b/apps/web/src/pages/orchestration-section/index.vue new file mode 100644 index 000000000..ad6ef767a --- /dev/null +++ b/apps/web/src/pages/orchestration-section/index.vue @@ -0,0 +1,26 @@ + + + diff --git a/apps/web/src/pages/orchestration/components/flow-gap-node.vue b/apps/web/src/pages/orchestration/components/flow-gap-node.vue new file mode 100644 index 000000000..51ff85ed7 --- /dev/null +++ b/apps/web/src/pages/orchestration/components/flow-gap-node.vue @@ -0,0 +1,20 @@ + + + diff --git a/apps/web/src/pages/orchestration/components/flow-lane-node.vue b/apps/web/src/pages/orchestration/components/flow-lane-node.vue new file mode 100644 index 000000000..56c6b02da --- /dev/null +++ b/apps/web/src/pages/orchestration/components/flow-lane-node.vue @@ -0,0 +1,80 @@ + + + diff --git a/apps/web/src/pages/orchestration/components/flow-span-node.vue b/apps/web/src/pages/orchestration/components/flow-span-node.vue new file mode 100644 index 000000000..d8ec17bae --- /dev/null +++ b/apps/web/src/pages/orchestration/components/flow-span-node.vue @@ -0,0 +1,161 @@ + + + diff --git a/apps/web/src/pages/orchestration/components/lane-node.vue b/apps/web/src/pages/orchestration/components/lane-node.vue new file mode 100644 index 000000000..65117c2bf --- /dev/null +++ b/apps/web/src/pages/orchestration/components/lane-node.vue @@ -0,0 +1,31 @@ + + + diff --git a/apps/web/src/pages/orchestration/components/run-dag.vue b/apps/web/src/pages/orchestration/components/run-dag.vue new file mode 100644 index 000000000..cc353fc89 --- /dev/null +++ b/apps/web/src/pages/orchestration/components/run-dag.vue @@ -0,0 +1,270 @@ + + + + + diff --git a/apps/web/src/pages/orchestration/components/run-flow.vue b/apps/web/src/pages/orchestration/components/run-flow.vue new file mode 100644 index 000000000..66e72af24 --- /dev/null +++ b/apps/web/src/pages/orchestration/components/run-flow.vue @@ -0,0 +1,314 @@ + + + + + diff --git a/apps/web/src/pages/orchestration/components/task-flow-node.vue b/apps/web/src/pages/orchestration/components/task-flow-node.vue new file mode 100644 index 000000000..50a20ac25 --- /dev/null +++ b/apps/web/src/pages/orchestration/components/task-flow-node.vue @@ -0,0 +1,193 @@ + + + diff --git a/apps/web/src/pages/orchestration/composables/use-dag-graph.ts b/apps/web/src/pages/orchestration/composables/use-dag-graph.ts new file mode 100644 index 000000000..862a9b401 --- /dev/null +++ b/apps/web/src/pages/orchestration/composables/use-dag-graph.ts @@ -0,0 +1,397 @@ +import { computed, type ComputedRef } from 'vue' +import dagre from 'dagre' +import { MarkerType, type Edge, type Node } from '@vue-flow/core' +import type { RunInspectorDependency, RunInspectorPayload, RunInspectorTask } from '../model' + +export type TaskNodeKind = + | 'trigger' + | 'llm' + | 'planner' + | 'search' + | 'tool' + | 'memory' + | 'merge' + | 'verify' + | 'output' + +export interface TaskFlowNodeData { + task: RunInspectorTask + isRoot: boolean + isSelected: boolean + isRelated: boolean + hasSelection: boolean + canRetry?: boolean + isRetrying?: boolean + onRetryTask?: (taskID: string) => void + level: number + kind: TaskNodeKind + maxLevel: number +} + +export interface LaneNodeData { + level: number + count: number + isRootLane: boolean +} + +const NODE_WIDTH = 208 +const NODE_HEIGHT = 80 +const RANK_SEP = 120 +const NODE_SEP = 32 +const LANE_PAD_X = 28 +const LANE_PAD_Y_TOP = 76 +const LANE_PAD_Y_BOTTOM = 32 + +const EDGE_COLOR_DEFAULT = '#9ca3af' +const EDGE_COLOR_ACTIVE = '#6366f1' + +type DagRelation = Pick & { + structural?: boolean +} + +function relationKey(pred?: string, succ?: string): string { + return `${pred ?? ''}->${succ ?? ''}` +} + +function buildDagRelations(tasks: RunInspectorTask[], dependencies: RunInspectorDependency[]): DagRelation[] { + const taskIDs = new Set(tasks.map((task) => task.id).filter(Boolean) as string[]) + const seen = new Set() + const relations: DagRelation[] = [] + + for (const dependency of dependencies) { + const pred = dependency.predecessor_task_id + const succ = dependency.successor_task_id + if (!pred || !succ || !taskIDs.has(pred) || !taskIDs.has(succ)) continue + seen.add(relationKey(pred, succ)) + relations.push(dependency) + } + + for (const task of tasks) { + const succ = task.id + const pred = task.decomposed_from_task_id + if (!pred || !succ || !taskIDs.has(pred) || !taskIDs.has(succ)) continue + const key = relationKey(pred, succ) + if (seen.has(key)) continue + seen.add(key) + relations.push({ + id: `decompose:${pred}:${succ}`, + predecessor_task_id: pred, + successor_task_id: succ, + structural: true, + }) + } + + return relations +} + +export function buildTaskLevels( + taskList: RunInspectorTask[], + edges: DagRelation[], +): Map { + const byID = new Map() + for (const task of taskList) { + if (task.id) byID.set(task.id, task) + } + const incoming = new Map() + const outgoing = new Map() + + for (const task of taskList) { + if (!task.id) continue + incoming.set(task.id, 0) + outgoing.set(task.id, []) + } + + for (const edge of edges) { + const pred = edge.predecessor_task_id + const succ = edge.successor_task_id + if (!pred || !succ || !byID.has(pred) || !byID.has(succ)) continue + incoming.set(succ, (incoming.get(succ) ?? 0) + 1) + outgoing.get(pred)?.push(succ) + } + + const queue = taskList + .filter((task) => task.id && (incoming.get(task.id) ?? 0) === 0) + .map((task) => task.id as string) + const levels = new Map() + for (const id of queue) levels.set(id, 0) + + while (queue.length > 0) { + const currentID = queue.shift() as string + const currentLevel = levels.get(currentID) ?? 0 + for (const nextID of outgoing.get(currentID) ?? []) { + levels.set(nextID, Math.max(levels.get(nextID) ?? 0, currentLevel + 1)) + incoming.set(nextID, (incoming.get(nextID) ?? 0) - 1) + if ((incoming.get(nextID) ?? 0) <= 0) queue.push(nextID) + } + } + + for (const task of taskList) { + if (task.id && !levels.has(task.id)) levels.set(task.id, 0) + } + + return levels +} + +export function buildTaskLevelMapWithRoot( + tasks: RunInspectorTask[], + dependencies: DagRelation[], + rootID: string, +): Map { + if (!rootID || tasks.length <= 1) return buildTaskLevels(tasks, dependencies) + + const rootTask = tasks.find((task) => task.id === rootID) + if (!rootTask) return buildTaskLevels(tasks, dependencies) + + const childTasks = tasks.filter((task) => task.id && task.id !== rootID) + const childEdges = dependencies.filter((edge) => + edge.predecessor_task_id !== rootID && edge.successor_task_id !== rootID, + ) + const childLevels = buildTaskLevels(childTasks, childEdges) + const levels = new Map([[rootID, 0]]) + for (const task of childTasks) { + if (!task.id) continue + levels.set(task.id, (childLevels.get(task.id) ?? 0) + 1) + } + return levels +} + +export function inferTaskNodeKind( + task: RunInspectorTask, + level: number, + maxLevel: number, + rootID: string, + hasVerification: (taskID: string) => boolean, +): TaskNodeKind { + const text = `${task.goal ?? ''} ${task.worker_profile ?? ''}`.toLowerCase() + if ((task.id && rootID === task.id) || level === 0) return 'trigger' + if (task.status === 'verifying' || (task.id && hasVerification(task.id))) return 'verify' + if (text.includes('search') || text.includes('web')) return 'search' + if (text.includes('memory') || text.includes('blackboard')) return 'memory' + if (text.includes('merge') || text.includes('aggregate') || text.includes('combine')) return 'merge' + if (level === maxLevel || text.includes('output') || text.includes('deliver') || text.includes('final')) return 'output' + if (text.includes('plan') || text.includes('decompose')) return 'planner' + if (text.includes('tool') || text.includes('api') || text.includes('exec')) return 'tool' + return 'llm' +} + +export interface DagGraphResult { + taskNodes: ComputedRef[]> + laneNodes: ComputedRef[]> + nodes: ComputedRef + edges: ComputedRef + levelMap: ComputedRef> + maxLevel: ComputedRef + kindByTaskID: ComputedRef> + selectedRelated: ComputedRef> +} + +export function useDagGraph( + inspector: ComputedRef, + selectedTaskID: ComputedRef, +): DagGraphResult { + const tasks = computed(() => inspector.value?.tasks ?? []) + const deps = computed(() => inspector.value?.dependencies ?? []) + const verifications = computed(() => inspector.value?.verifications ?? []) + const rootID = computed(() => String(inspector.value?.run.root_task_id ?? '')) + const relations = computed(() => buildDagRelations(tasks.value, deps.value)) + + const levelMap = computed(() => buildTaskLevelMapWithRoot(tasks.value, relations.value, rootID.value)) + + const maxLevel = computed(() => { + let max = 0 + for (const value of levelMap.value.values()) { + if (value > max) max = value + } + return max + }) + + const taskHasVerification = (taskID: string) => + verifications.value.some((item) => String(item.task_id ?? '') === taskID) + + const kindByTaskID = computed(() => { + const map = new Map() + for (const task of tasks.value) { + if (!task.id) continue + const level = levelMap.value.get(task.id) ?? 0 + map.set(task.id, inferTaskNodeKind(task, level, maxLevel.value, rootID.value, taskHasVerification)) + } + return map + }) + + const selectedRelated = computed(() => { + const related = new Set() + const id = selectedTaskID.value + if (!id) return related + related.add(id) + for (const edge of relations.value) { + if (edge.predecessor_task_id === id && edge.successor_task_id) { + related.add(edge.successor_task_id) + } + if (edge.successor_task_id === id && edge.predecessor_task_id) { + related.add(edge.predecessor_task_id) + } + } + return related + }) + + const positions = computed(() => { + const graph = new dagre.graphlib.Graph() + graph.setGraph({ rankdir: 'LR', ranksep: RANK_SEP, nodesep: NODE_SEP, marginx: 32, marginy: 48 }) + graph.setDefaultEdgeLabel(() => ({})) + + for (const task of tasks.value) { + if (!task.id) continue + graph.setNode(task.id, { width: NODE_WIDTH, height: NODE_HEIGHT }) + } + for (const edge of relations.value) { + const pred = edge.predecessor_task_id + const succ = edge.successor_task_id + if (!pred || !succ) continue + if (!graph.hasNode(pred) || !graph.hasNode(succ)) continue + graph.setEdge(pred, succ) + } + dagre.layout(graph) + + const result = new Map() + for (const task of tasks.value) { + if (!task.id) continue + const node = graph.node(task.id) as { x?: number, y?: number } | undefined + if (!node || typeof node.x !== 'number' || typeof node.y !== 'number') continue + result.set(task.id, { x: node.x - NODE_WIDTH / 2, y: node.y - NODE_HEIGHT / 2 }) + } + return result + }) + + const taskNodes = computed[]>(() => + tasks.value + .filter((task): task is RunInspectorTask & { id: string } => Boolean(task.id)) + .map>((task) => { + const level = levelMap.value.get(task.id) ?? 0 + const kind = kindByTaskID.value.get(task.id) ?? 'llm' + const isRoot = rootID.value === task.id + const isSelected = selectedTaskID.value === task.id + const hasSelection = selectedTaskID.value.length > 0 + const isRelated = selectedRelated.value.has(task.id) + + return { + id: task.id, + type: 'taskFlow', + position: positions.value.get(task.id) ?? { x: 0, y: 0 }, + data: { + task, + isRoot, + isSelected, + isRelated, + hasSelection, + level, + kind, + maxLevel: maxLevel.value, + }, + draggable: false, + selectable: false, + connectable: false, + width: NODE_WIDTH, + height: NODE_HEIGHT, + zIndex: 1, + } + }), + ) + + const laneNodes = computed[]>(() => { + const byLevel = new Map() + for (const task of tasks.value) { + if (!task.id) continue + const pos = positions.value.get(task.id) + if (!pos) continue + const lvl = levelMap.value.get(task.id) ?? 0 + const top = pos.y + const bottom = pos.y + NODE_HEIGHT + const entry = byLevel.get(lvl) + if (entry) { + entry.x = Math.min(entry.x, pos.x) + entry.top = Math.min(entry.top, top) + entry.bottom = Math.max(entry.bottom, bottom) + entry.count += 1 + } else { + byLevel.set(lvl, { x: pos.x, top, bottom, count: 1 }) + } + } + const sorted = Array.from(byLevel.entries()).sort((a, b) => a[0] - b[0]) + if (sorted.length === 0) return [] + + let globalTop = Infinity + let globalBottom = -Infinity + for (const [, e] of sorted) { + globalTop = Math.min(globalTop, e.top) + globalBottom = Math.max(globalBottom, e.bottom) + } + + return sorted.map(([level, e]) => { + const isRootLane = level === 0 && e.count === 1 && tasks.value.some((task) => task.id === rootID.value) + const x = e.x - LANE_PAD_X + const width = NODE_WIDTH + LANE_PAD_X * 2 + const top = globalTop - LANE_PAD_Y_TOP + const height = (globalBottom - globalTop) + LANE_PAD_Y_TOP + LANE_PAD_Y_BOTTOM + return { + id: `lane-${level}`, + type: 'lane', + position: { x, y: top }, + data: { level, count: e.count, isRootLane }, + draggable: false, + selectable: false, + connectable: false, + focusable: false, + deletable: false, + width, + height, + zIndex: -10, + style: { width: `${width}px`, height: `${height}px`, pointerEvents: 'none' as const }, + } + }) + }) + + const nodes = computed(() => [...laneNodes.value, ...taskNodes.value]) + + const edges = computed(() => + relations.value + .filter((edge) => edge.predecessor_task_id && edge.successor_task_id) + .map((edge) => { + const id = edge.id || `e-${edge.predecessor_task_id}-${edge.successor_task_id}` + const isActive = Boolean( + selectedTaskID.value && ( + edge.predecessor_task_id === selectedTaskID.value || + edge.successor_task_id === selectedTaskID.value + ), + ) + return { + id: String(id), + source: edge.predecessor_task_id as string, + target: edge.successor_task_id as string, + type: 'smoothstep', + updatable: false, + selectable: false, + focusable: false, + class: [ + isActive ? 'memoh-edge-active' : 'memoh-edge-default', + edge.structural ? 'memoh-edge-structural' : '', + ].filter(Boolean).join(' '), + markerEnd: { + type: MarkerType.ArrowClosed, + color: isActive ? EDGE_COLOR_ACTIVE : EDGE_COLOR_DEFAULT, + width: 14, + height: 14, + }, + style: { + strokeWidth: isActive ? 1.8 : 1.25, + }, + } + }), + ) + + return { taskNodes, laneNodes, nodes, edges, levelMap, maxLevel, kindByTaskID, selectedRelated } +} + +export const TASK_FLOW_NODE_DIMENSIONS = { + width: NODE_WIDTH, + height: NODE_HEIGHT, +} diff --git a/apps/web/src/pages/orchestration/composables/use-flow-graph.ts b/apps/web/src/pages/orchestration/composables/use-flow-graph.ts new file mode 100644 index 000000000..588724917 --- /dev/null +++ b/apps/web/src/pages/orchestration/composables/use-flow-graph.ts @@ -0,0 +1,801 @@ +import { computed, type ComputedRef } from 'vue' +import type { Edge, Node } from '@vue-flow/core' +import type { RunFlowSpan, RunInspectorPayload, RunInspectorTask } from '../model' +import { compactTaskTitle, shortId } from '../model' + +export type FlowLaneKind = 'planning' | 'attempt' | 'verification' | 'checkpoint' | 'system' + +export interface FlowLaneNodeData { + key: string + kind: FlowLaneKind + label: string + subtitle: string + count: number +} + +export interface FlowSpanNodeData { + span: RunFlowSpan + isSelected: boolean + isRelated: boolean + hasSelection: boolean + canRetry?: boolean + isRetrying?: boolean + onRetryTask?: (taskID: string) => void + index: number + total: number + taskTitle: string + taskStatus: string + laneKey: string + laneLabel: string + width: number + offsetLabel: string + durationLabel: string + scaleMode: 'time' | 'sequence' +} + +export interface FlowGapNodeData { + laneKey: string + label: string + width: number + isGrouped: boolean + groupLabel: string +} + +export interface FlowSummary { + steps: number + planning: number + attempts: number + actions: number +} + +export type FlowGraphNode = Node | Node | Node + +export interface FlowGraphResult { + nodes: ComputedRef + edges: ComputedRef + summary: ComputedRef +} + +const LABEL_WIDTH = 184 +const TRACK_PADDING_X = 28 +const TRACK_WIDTH_MIN = 360 +const TRACK_WIDTH_MAX = 1200 +const TRACK_WIDTH_PER_SPAN = 120 +const LANE_TRACK_WIDTH_MIN = 240 +const LANE_MIN_OVERLAP_X = 80 +const LANE_HEIGHT = 154 +const LANE_SPAN_TOP = 44 +const LANE_BOTTOM_PADDING = 18 +const LANE_GAP = 10 +const SPAN_HEIGHT = 92 +const SPAN_WIDTH_MIN = 220 +const SPAN_WIDTH_MAX = 420 +const SPAN_ROW_GAP_X = 16 +const UNIT_FALLBACK_DURATION = 1 +const GAP_RATIO_THRESHOLD = 0.25 +const GAP_MULTIPLE_THRESHOLD = 3 +const GAP_WIDTH_MIN = 72 +const GAP_WIDTH_MAX = 128 +const GAP_GROUP_OVERLAP_THRESHOLD = 0.6 + +interface SpanRange { + span: RunFlowSpan + start: number + end: number +} + +interface LaneGapSegment { + laneKey: string + realStart: number + realEnd: number + duration: number + localX: number + visualX: number + width: number + label: string + groupID: string + isGrouped: boolean +} + +interface LaneCompression { + laneStart: number + laneEnd: number + laneDuration: number + laneX: number + gaps: LaneGapSegment[] + mapValue: (value: number) => number +} + +function safeStartSeq(span: RunFlowSpan): number { + const value = Number(span.start_seq ?? 0) + return Number.isFinite(value) ? value : 0 +} + +function safeEndSeq(span: RunFlowSpan): number { + const start = safeStartSeq(span) + const value = Number(span.end_seq ?? 0) + if (!Number.isFinite(value) || value <= 0) return start + UNIT_FALLBACK_DURATION + return Math.max(value, start + UNIT_FALLBACK_DURATION) +} + +function safeStartTime(span: RunFlowSpan): number { + const raw = span.started_at || span.finished_at + if (!raw) return 0 + const t = new Date(raw).getTime() + return Number.isFinite(t) ? t : 0 +} + +function safeEndTime(span: RunFlowSpan, now: number): number { + const finished = span.finished_at ? new Date(span.finished_at).getTime() : 0 + if (Number.isFinite(finished) && finished > 0) return finished + + const start = safeStartTime(span) + if (start <= 0) return 0 + + const status = String(span.status ?? '').trim() + if (['active', 'running', 'dispatching', 'verifying'].includes(status)) { + return Math.max(now, start + 1000) + } + return start + 1000 +} + +function spanSortKey(span: RunFlowSpan): [number, number, string] { + return [safeStartSeq(span), safeStartTime(span), String(span.id ?? '')] +} + +function compareSpans(a: RunFlowSpan, b: RunFlowSpan): number { + const ka = spanSortKey(a) + const kb = spanSortKey(b) + if (ka[0] !== kb[0]) return ka[0] - kb[0] + if (ka[1] !== kb[1]) return ka[1] - kb[1] + return ka[2].localeCompare(kb[2]) +} + +function nodeID(span: RunFlowSpan, fallbackIndex: number): string { + const id = String(span.id ?? '').trim() + return id || `span-${fallbackIndex}` +} + +function resolveTaskTitle(taskID: string, tasks: RunInspectorTask[]): string { + if (!taskID) return '' + const task = tasks.find((item) => item.id === taskID) + if (!task) return shortId(taskID) + return compactTaskTitle(task.goal ?? '', task.id ?? '') +} + +function resolveTaskStatus(taskID: string, tasks: RunInspectorTask[]): string { + if (!taskID) return '' + const task = tasks.find((item) => item.id === taskID) + return String(task?.status ?? '').trim() +} + +function laneKind(span: RunFlowSpan): FlowLaneKind { + switch (span.kind) { + case 'planning': + case 'replanning': + return 'planning' + case 'attempt': + case 'attempt_finalize': + return 'attempt' + case 'verification': + return 'verification' + case 'checkpoint': + case 'checkpoint_resume': + return 'checkpoint' + default: + return 'system' + } +} + +function laneKeyForSpan(span: RunFlowSpan): string { + const kind = laneKind(span) + const taskID = String(span.task_id ?? '').trim() + if (kind === 'attempt' && taskID) return `task:${taskID}` + return kind +} + +function laneLabelForSpan(span: RunFlowSpan, tasks: RunInspectorTask[]): string { + const kind = laneKind(span) + const taskID = String(span.task_id ?? '').trim() + if (kind === 'attempt' && taskID) return resolveTaskTitle(taskID, tasks) + switch (kind) { + case 'planning': + return 'Planning' + case 'attempt': + return 'Attempt' + case 'verification': + return 'Verification' + case 'checkpoint': + return 'Checkpoint' + default: + return 'System' + } +} + +function laneSubtitleForSpan(span: RunFlowSpan, tasks: RunInspectorTask[]): string { + const kind = laneKind(span) + const taskID = String(span.task_id ?? '').trim() + if (kind === 'attempt') return taskID ? shortId(taskID, 12) : '' + if (kind === 'planning') return '' + if (kind === 'verification') return '' + if (kind === 'checkpoint') return '' + return taskID ? resolveTaskTitle(taskID, tasks) : 'run span' +} + +function laneRank(kind: FlowLaneKind): number { + switch (kind) { + case 'planning': + return 0 + case 'attempt': + return 1 + case 'verification': + return 2 + case 'checkpoint': + return 3 + default: + return 4 + } +} + +function formatDuration(ms: number): string { + if (!Number.isFinite(ms) || ms <= 0) return '' + const seconds = Math.max(1, Math.round(ms / 1000)) + if (seconds < 60) return `${seconds}s` + const minutes = Math.floor(seconds / 60) + const rest = seconds % 60 + if (minutes < 60) return rest > 0 ? `${minutes}m ${rest}s` : `${minutes}m` + const hours = Math.floor(minutes / 60) + const minuteRest = minutes % 60 + return minuteRest > 0 ? `${hours}h ${minuteRest}m` : `${hours}h` +} + +function formatSeqRange(start: number, end: number): string { + return start === end ? `#${start}` : `#${start}-${end}` +} + +function clamp(min: number, max: number, value: number): number { + return Math.min(max, Math.max(min, value)) +} + +function median(values: number[]): number { + const sorted = values.filter((value) => Number.isFinite(value) && value > 0).sort((a, b) => a - b) + if (sorted.length === 0) return 0 + const mid = Math.floor(sorted.length / 2) + if (sorted.length % 2 === 1) return sorted[mid] ?? 0 + return ((sorted[mid - 1] ?? 0) + (sorted[mid] ?? 0)) / 2 +} + +function overlapDuration(aStart: number, aEnd: number, bStart: number, bEnd: number): number { + return Math.max(0, Math.min(aEnd, bEnd) - Math.max(aStart, bStart)) +} + +function buildLaneCompression( + laneKey: string, + ranges: SpanRange[], + domainStart: number, + domainRange: number, + trackWidth: number, + mode: 'time' | 'sequence', +): LaneCompression { + const ordered = [...ranges].sort((a, b) => { + if (a.start !== b.start) return a.start - b.start + return a.end - b.end + }) + const laneStart = Math.min(...ordered.map((item) => item.start).filter((value) => value > 0)) + const normalizedLaneStart = Number.isFinite(laneStart) ? laneStart : domainStart + let laneEnd = Math.max(...ordered.map((item) => item.end)) + if (!Number.isFinite(laneEnd) || laneEnd <= normalizedLaneStart) { + laneEnd = normalizedLaneStart + UNIT_FALLBACK_DURATION + } + const laneDuration = Math.max(UNIT_FALLBACK_DURATION, laneEnd - normalizedLaneStart) + const scale = trackWidth / Math.max(UNIT_FALLBACK_DURATION, domainRange) + const laneX = ((normalizedLaneStart - domainStart) / Math.max(UNIT_FALLBACK_DURATION, domainRange)) * trackWidth + + if (mode !== 'time' || ordered.length < 2) { + return { + laneStart: normalizedLaneStart, + laneEnd, + laneDuration, + laneX: Math.max(0, laneX), + gaps: [], + mapValue: (value: number) => Math.max(0, (value - normalizedLaneStart) * scale), + } + } + + const intervals: Array<{ start: number, end: number }> = [] + for (const item of ordered) { + const start = Math.max(normalizedLaneStart, item.start) + const end = Math.max(item.end, start + UNIT_FALLBACK_DURATION) + intervals.push({ start, end }) + } + + const merged: Array<{ start: number, end: number }> = [] + for (const interval of intervals) { + const last = merged[merged.length - 1] + if (!last || interval.start > last.end) { + merged.push({ ...interval }) + continue + } + last.end = Math.max(last.end, interval.end) + } + + const gapCandidates: Array<{ start: number, end: number, duration: number }> = [] + for (let i = 1; i < merged.length; i += 1) { + const previous = merged[i - 1] + const next = merged[i] + if (!previous || !next) continue + const duration = next.start - previous.end + if (duration > 0) { + gapCandidates.push({ start: previous.end, end: next.start, duration }) + } + } + + const typicalGap = median(gapCandidates.map((gap) => gap.duration)) + const collapsed = gapCandidates + .filter((gap) => { + return shouldCollapseGap(gap.duration, laneDuration, typicalGap) + }) + .map((gap) => { + const width = collapsedGapWidth(gap.duration, laneDuration, trackWidth) + return { + laneKey, + realStart: gap.start, + realEnd: gap.end, + duration: gap.duration, + localX: 0, + visualX: 0, + width, + label: `+${formatDuration(gap.duration)} gap`, + groupID: '', + isGrouped: false, + } + }) + + function mapValue(value: number): number { + const raw = Math.max(0, (value - normalizedLaneStart) * scale) + let adjustment = 0 + for (const gap of collapsed) { + const originalWidth = gap.duration * scale + const removedWidth = Math.max(0, originalWidth - gap.width) + if (value >= gap.realEnd) { + adjustment += removedWidth + continue + } + if (value > gap.realStart) { + const progress = (value - gap.realStart) / gap.duration + const compressedProgress = gap.width * clamp(0, 1, progress) + const originalProgress = (value - gap.realStart) * scale + adjustment += Math.max(0, originalProgress - compressedProgress) + } + } + return Math.max(0, raw - adjustment) + } + + for (const gap of collapsed) { + gap.localX = mapValue(gap.realStart) + gap.visualX = gap.localX + } + + return { + laneStart: normalizedLaneStart, + laneEnd, + laneDuration, + laneX: Math.max(0, laneX), + gaps: collapsed, + mapValue, + } +} + +function assignGapGroups(gaps: LaneGapSegment[]): void { + let nextGroup = 1 + for (let i = 0; i < gaps.length; i += 1) { + const gap = gaps[i] + if (!gap) continue + if (!gap.groupID) { + gap.groupID = `gap-group-${nextGroup}` + nextGroup += 1 + } + for (let j = i + 1; j < gaps.length; j += 1) { + const other = gaps[j] + if (!other) continue + const overlap = overlapDuration(gap.realStart, gap.realEnd, other.realStart, other.realEnd) + const base = Math.min(gap.duration, other.duration) + const ratio = base > 0 ? overlap / base : 0 + if (ratio >= GAP_GROUP_OVERLAP_THRESHOLD) { + other.groupID = gap.groupID + } + } + } + + const counts = new Map() + for (const gap of gaps) { + counts.set(gap.groupID, (counts.get(gap.groupID) ?? 0) + 1) + } + for (const gap of gaps) { + gap.isGrouped = (counts.get(gap.groupID) ?? 0) > 1 + } +} + +function shouldCollapseGap(duration: number, laneDuration: number, typicalGap: number): boolean { + if (laneDuration <= 0 || duration <= 0) return false + const gapRatio = duration / laneDuration + const gapMultiple = typicalGap > 0 ? duration / typicalGap : Infinity + return gapRatio >= GAP_RATIO_THRESHOLD && gapMultiple >= GAP_MULTIPLE_THRESHOLD +} + +function collapsedGapWidth(duration: number, laneDuration: number, trackWidth: number): number { + const gapRatio = laneDuration > 0 ? duration / laneDuration : 0 + return clamp(GAP_WIDTH_MIN, GAP_WIDTH_MAX, trackWidth * Math.min(0.12, gapRatio * 0.35)) +} + +function alignGroupedGaps(gaps: LaneGapSegment[], lanes: Map): void { + const byGroup = new Map() + for (const gap of gaps) { + const list = byGroup.get(gap.groupID) ?? [] + list.push(gap) + byGroup.set(gap.groupID, list) + } + + for (const group of byGroup.values()) { + if (group.length <= 1) { + const gap = group[0] + if (gap) gap.visualX = gap.localX + continue + } + + const absolutePositions = group + .map((gap) => { + const lane = lanes.get(gap.laneKey) + return (lane?.laneX ?? 0) + LABEL_WIDTH + TRACK_PADDING_X + gap.localX + }) + .sort((a, b) => a - b) + const mid = Math.floor(absolutePositions.length / 2) + const groupX = absolutePositions.length % 2 === 1 + ? absolutePositions[mid] ?? 0 + : ((absolutePositions[mid - 1] ?? 0) + (absolutePositions[mid] ?? 0)) / 2 + + for (const gap of group) { + const lane = lanes.get(gap.laneKey) + const laneTrackX = (lane?.laneX ?? 0) + LABEL_WIDTH + TRACK_PADDING_X + gap.visualX = Math.max(0, groupX - laneTrackX) + } + } +} + +function addTrailingLaneGaps( + orderedLanes: Array, + compressions: Map, + trackWidth: number, +): void { + for (let index = 0; index < orderedLanes.length - 1; index += 1) { + const lane = orderedLanes[index] + const nextLane = orderedLanes[index + 1] + if (!lane || !nextLane) continue + + const compression = compressions.get(lane.key) + const nextCompression = compressions.get(nextLane.key) + if (!compression || !nextCompression) continue + + const duration = nextCompression.laneStart - compression.laneEnd + if (!shouldCollapseGap(duration, Math.max(compression.laneDuration, nextCompression.laneStart - compression.laneStart), 0)) { + continue + } + + const width = collapsedGapWidth(duration, Math.max(compression.laneDuration, nextCompression.laneStart - compression.laneStart), trackWidth) + const localX = compression.mapValue(compression.laneEnd) + SPAN_ROW_GAP_X + compression.gaps.push({ + laneKey: lane.key, + realStart: compression.laneEnd, + realEnd: nextCompression.laneStart, + duration, + localX, + visualX: localX, + width, + label: `+${formatDuration(duration)} gap`, + groupID: '', + isGrouped: false, + }) + } +} + +function ensureLaneOverlaps(lanes: Array, metrics: Map): void { + if (lanes.length <= 1) return + + const ordered = [...lanes] + .map((lane) => ({ lane, metrics: metrics.get(lane.key) })) + .filter((item): item is { lane: FlowLaneNodeData & { firstStart: number }, metrics: { x: number, y: number, height: number, contentWidth: number } } => Boolean(item.metrics)) + .sort((a, b) => a.metrics.x - b.metrics.x) + + for (let index = 1; index < ordered.length; index += 1) { + const previous = ordered[index - 1]?.metrics + const current = ordered[index]?.metrics + if (!previous || !current) continue + + const previousRight = previous.x + previous.contentWidth + const requiredRight = current.x + LANE_MIN_OVERLAP_X + if (previousRight < requiredRight) { + previous.contentWidth = requiredRight - previous.x + } + } +} + +export function useFlowGraph( + inspector: ComputedRef, + selectedTaskID: ComputedRef, +): FlowGraphResult { + const spans = computed(() => { + const list = inspector.value?.flow_spans ?? [] + return [...list].sort(compareSpans) + }) + + const tasks = computed(() => inspector.value?.tasks ?? []) + + const scaleMode = computed<'time' | 'sequence'>(() => { + const withTime = spans.value.filter((span) => safeStartTime(span) > 0) + return withTime.length >= 2 ? 'time' : 'sequence' + }) + + const summary = computed(() => { + let planning = 0 + let attempts = 0 + let actions = 0 + for (const span of spans.value) { + if (span.kind === 'planning' || span.kind === 'replanning') planning += 1 + if (span.kind === 'attempt') attempts += 1 + const count = Number(span.action_count ?? 0) + if (Number.isFinite(count)) actions += count + } + return { steps: spans.value.length, planning, attempts, actions } + }) + + const nodes = computed(() => { + const total = spans.value.length + const hasSelection = selectedTaskID.value.length > 0 + if (total === 0) return [] + + const now = Date.now() + const mode = scaleMode.value + const spanRanges = spans.value.map((span) => { + const start = mode === 'time' ? safeStartTime(span) : safeStartSeq(span) + const end = mode === 'time' ? safeEndTime(span, now) : safeEndSeq(span) + const normalizedStart = start > 0 ? start : 0 + const normalizedEnd = Math.max(end || 0, normalizedStart + UNIT_FALLBACK_DURATION) + return { span, start: normalizedStart, end: normalizedEnd } + }) + + let domainStart = Math.min(...spanRanges.map((item) => item.start).filter((value) => value > 0)) + if (!Number.isFinite(domainStart)) domainStart = 0 + let domainEnd = Math.max(...spanRanges.map((item) => item.end)) + if (!Number.isFinite(domainEnd) || domainEnd <= domainStart) { + domainEnd = domainStart + UNIT_FALLBACK_DURATION + } + + const domainRange = Math.max(UNIT_FALLBACK_DURATION, domainEnd - domainStart) + const trackWidth = Math.min(TRACK_WIDTH_MAX, Math.max(TRACK_WIDTH_MIN, total * TRACK_WIDTH_PER_SPAN)) + const spanIDToRange = new Map(spanRanges.map((item, index) => [nodeID(item.span, index), item])) + + const laneMap = new Map() + for (const item of spanRanges) { + const key = laneKeyForSpan(item.span) + const kind = laneKind(item.span) + const existing = laneMap.get(key) + if (existing) { + existing.count += 1 + existing.firstStart = Math.min(existing.firstStart, item.start) + continue + } + laneMap.set(key, { + key, + kind, + label: laneLabelForSpan(item.span, tasks.value), + subtitle: laneSubtitleForSpan(item.span, tasks.value), + count: 1, + firstStart: item.start, + }) + } + + const lanes = [...laneMap.values()].sort((a, b) => { + const rankDiff = laneRank(a.kind) - laneRank(b.kind) + if (rankDiff !== 0) return rankDiff + if (a.firstStart !== b.firstStart) return a.firstStart - b.firstStart + return a.label.localeCompare(b.label) + }) + const laneIndex = new Map(lanes.map((lane, index) => [lane.key, index])) + + const baseLaneWidth = LABEL_WIDTH + TRACK_PADDING_X * 2 + LANE_TRACK_WIDTH_MIN + const rangesByLane = new Map() + for (const item of spanRanges) { + const laneKey = laneKeyForSpan(item.span) + const list = rangesByLane.get(laneKey) ?? [] + list.push(item) + rangesByLane.set(laneKey, list) + } + + const laneCompression = new Map() + for (const lane of lanes) { + laneCompression.set( + lane.key, + buildLaneCompression(lane.key, rangesByLane.get(lane.key) ?? [], domainStart, domainRange, trackWidth, mode), + ) + } + if (mode === 'time') { + addTrailingLaneGaps(lanes, laneCompression, trackWidth) + } + + const rowRightsByLane = new Map() + const laneMaxRight = new Map() + + const spanLayouts = spans.value.map((span, index) => { + const id = nodeID(span, index) + const taskID = String(span.task_id ?? '').trim() + const isSelected = hasSelection && taskID === selectedTaskID.value + const laneKey = laneKeyForSpan(span) + const range = spanIDToRange.get(id) + const start = range?.start ?? domainStart + const end = range?.end ?? start + UNIT_FALLBACK_DURATION + const compression = laneCompression.get(laneKey) + const laneX = compression?.laneX ?? 0 + const localStart = compression?.mapValue(start) ?? ((start - domainStart) / domainRange) * trackWidth + const localEnd = compression?.mapValue(end) ?? ((end - domainStart) / domainRange) * trackWidth + const rawWidth = Math.max(1, localEnd - localStart) + const width = Math.min(SPAN_WIDTH_MAX, Math.max(SPAN_WIDTH_MIN, rawWidth)) + const x = laneX + LABEL_WIDTH + TRACK_PADDING_X + Math.max(0, localStart) + const right = x + width + const rowRights = rowRightsByLane.get(laneKey) ?? [] + let row = rowRights.findIndex((rowRight) => x >= rowRight + SPAN_ROW_GAP_X) + if (row < 0) row = rowRights.length + rowRights[row] = right + rowRightsByLane.set(laneKey, rowRights) + laneMaxRight.set(laneKey, Math.max(laneMaxRight.get(laneKey) ?? 0, right)) + + return { + id, + span, + index, + taskID, + isSelected, + laneKey, + x, + row, + width, + start, + end, + } + }) + + const allGapSegments = [...laneCompression.values()].flatMap((compression) => compression.gaps) + assignGapGroups(allGapSegments) + alignGroupedGaps(allGapSegments, laneCompression) + for (const gap of allGapSegments) { + const compression = laneCompression.get(gap.laneKey) + const trackOrigin = (compression?.laneX ?? 0) + LABEL_WIDTH + TRACK_PADDING_X + let previousRight = 0 + for (const layout of spanLayouts) { + if (layout.laneKey !== gap.laneKey || layout.end > gap.realStart) continue + previousRight = Math.max(previousRight, layout.x + layout.width) + } + if (previousRight > 0) { + gap.visualX = Math.max(gap.visualX, previousRight - trackOrigin + SPAN_ROW_GAP_X) + } + } + + const laneMetrics = new Map() + let cursorY = 0 + for (const lane of lanes) { + const compression = laneCompression.get(lane.key) + const laneX = compression?.laneX ?? 0 + const rowCount = Math.max(1, rowRightsByLane.get(lane.key)?.length ?? 1) + const height = LANE_SPAN_TOP + rowCount * SPAN_HEIGHT + (rowCount - 1) * LANE_GAP + LANE_BOTTOM_PADDING + let laneRight = laneMaxRight.get(lane.key) ?? (laneX + baseLaneWidth) + for (const gap of compression?.gaps ?? []) { + laneRight = Math.max(laneRight, laneX + LABEL_WIDTH + TRACK_PADDING_X + gap.visualX + gap.width) + } + const contentWidth = Math.max(baseLaneWidth, laneRight - laneX + TRACK_PADDING_X) + laneMetrics.set(lane.key, { x: laneX, y: cursorY, height, contentWidth }) + cursorY += height + LANE_GAP + } + ensureLaneOverlaps(lanes, laneMetrics) + + const laneNodes = lanes.map>((lane) => { + const metrics = laneMetrics.get(lane.key) ?? { x: 0, y: 0, height: LANE_HEIGHT, contentWidth: baseLaneWidth } + return { + id: `lane:${lane.key}`, + type: 'flowLane', + position: { x: metrics.x, y: metrics.y }, + data: { + key: lane.key, + kind: lane.kind, + label: lane.label, + subtitle: lane.subtitle, + count: lane.count, + }, + draggable: false, + selectable: false, + connectable: false, + width: metrics.contentWidth, + height: metrics.height, + zIndex: -10, + style: { pointerEvents: 'none' }, + } + }) + + const gapNodes = allGapSegments.map>((gap, index) => { + const metrics = laneMetrics.get(gap.laneKey) ?? { x: 0, y: 0, height: LANE_HEIGHT, contentWidth: baseLaneWidth } + return { + id: `gap:${gap.laneKey}:${index}:${Math.round(gap.realStart)}`, + type: 'flowGap', + position: { + x: metrics.x + LABEL_WIDTH + TRACK_PADDING_X + gap.visualX, + y: metrics.y + LANE_SPAN_TOP + (SPAN_HEIGHT - 22) / 2, + }, + data: { + laneKey: gap.laneKey, + label: gap.label, + width: gap.width, + isGrouped: gap.isGrouped, + groupLabel: gap.isGrouped ? gap.groupID.replace('gap-group-', 'group ') : '', + }, + draggable: false, + selectable: false, + connectable: false, + width: gap.width, + height: 22, + zIndex: 2, + style: { pointerEvents: 'none' }, + } + }) + + const spanNodes = spanLayouts.map>((layout) => { + const metrics = laneMetrics.get(layout.laneKey) ?? { x: 0, y: 0, height: LANE_HEIGHT, contentWidth: baseLaneWidth } + const y = metrics.y + LANE_SPAN_TOP + layout.row * (SPAN_HEIGHT + LANE_GAP) + const lane = lanes[laneIndex.get(layout.laneKey) ?? 0] + + return { + id: layout.id, + type: 'flowSpan', + position: { + x: layout.x, + y, + }, + data: { + span: layout.span, + isSelected: layout.isSelected, + isRelated: layout.isSelected, + hasSelection, + index: layout.index + 1, + total, + taskTitle: resolveTaskTitle(layout.taskID, tasks.value), + taskStatus: laneKind(layout.span) === 'attempt' ? resolveTaskStatus(layout.taskID, tasks.value) : '', + laneKey: layout.laneKey, + laneLabel: lane?.label ?? '', + width: layout.width, + offsetLabel: mode === 'time' + ? formatDuration(layout.start - domainStart) + : formatSeqRange(Math.round(layout.start), Math.round(layout.end)), + durationLabel: mode === 'time' + ? formatDuration(layout.end - layout.start) + : formatSeqRange(Math.round(layout.start), Math.round(layout.end)), + scaleMode: mode, + }, + draggable: false, + selectable: false, + connectable: false, + width: layout.width, + height: SPAN_HEIGHT, + zIndex: 1, + } + }) + + return [...laneNodes, ...gapNodes, ...spanNodes] + }) + + const edges = computed(() => []) + + return { nodes, edges, summary } +} + +export const FLOW_SPAN_NODE_DIMENSIONS = { + labelWidth: LABEL_WIDTH, + laneHeight: LANE_HEIGHT, + spanHeight: SPAN_HEIGHT, +} diff --git a/apps/web/src/pages/orchestration/composables/use-orchestration-meta.ts b/apps/web/src/pages/orchestration/composables/use-orchestration-meta.ts new file mode 100644 index 000000000..11c02c6f3 --- /dev/null +++ b/apps/web/src/pages/orchestration/composables/use-orchestration-meta.ts @@ -0,0 +1,188 @@ +import { useI18n } from 'vue-i18n' +import { + AlertCircle, + CheckCircle2, + Clock3, + GitMerge, + LoaderCircle, + ScanSearch, + ShieldCheck, + Sparkles, + Workflow, + Wrench, + type LucideIcon, +} from 'lucide-vue-next' + +export interface StatusMeta { + label: string + icon: LucideIcon + dot: string + chip: string + task: string +} + +export interface FlowKindMeta { + icon: LucideIcon + label: string + color: string +} + +export function useOrchestrationMeta() { + const { t } = useI18n() + + function statusMeta(status: string): StatusMeta { + switch (status) { + case 'created': + return { + label: t('orchestration.statusPending'), + icon: Clock3, + dot: 'bg-muted-foreground', + chip: 'border-border bg-muted/70 text-muted-foreground', + task: 'border-border bg-muted/30', + } + case 'idle': + return { + label: t('orchestration.statusIdle'), + icon: Clock3, + dot: 'bg-muted-foreground', + chip: 'border-border bg-muted/70 text-muted-foreground', + task: 'border-border bg-muted/30', + } + case 'active': + return { + label: t('orchestration.statusActive'), + icon: LoaderCircle, + dot: 'bg-sky-500', + chip: 'border-sky-500/20 bg-sky-500/10 text-sky-700 dark:text-sky-300', + task: 'border-sky-500/30 bg-sky-500/8', + } + case 'completed': + return { + label: t('orchestration.statusSuccess'), + icon: CheckCircle2, + dot: 'bg-emerald-500', + chip: 'border-emerald-500/20 bg-emerald-500/10 text-emerald-700 dark:text-emerald-300', + task: 'bg-background', + } + case 'running': + return { + label: t('orchestration.statusRunning'), + icon: LoaderCircle, + dot: 'bg-sky-500', + chip: 'border-sky-500/20 bg-sky-500/10 text-sky-700 dark:text-sky-300', + task: 'border-sky-500/30 bg-sky-500/8', + } + case 'dispatching': + return { + label: t('orchestration.statusDispatching'), + icon: LoaderCircle, + dot: 'bg-sky-500', + chip: 'border-sky-500/20 bg-sky-500/10 text-sky-700 dark:text-sky-300', + task: 'border-sky-500/30 bg-sky-500/8', + } + case 'verifying': + return { + label: t('orchestration.statusVerifying'), + icon: LoaderCircle, + dot: 'bg-sky-500', + chip: 'border-sky-500/20 bg-sky-500/10 text-sky-700 dark:text-sky-300', + task: 'border-sky-500/30 bg-sky-500/8', + } + case 'waiting_human': + return { + label: t('orchestration.statusWaitingHuman'), + icon: Clock3, + dot: 'bg-amber-500', + chip: 'border-amber-500/20 bg-amber-500/10 text-amber-700 dark:text-amber-300', + task: 'border-amber-500/30 bg-amber-500/8', + } + case 'failed': + return { + label: t('orchestration.statusFailed'), + icon: AlertCircle, + dot: 'bg-rose-500', + chip: 'border-rose-500/20 bg-rose-500/10 text-rose-700 dark:text-rose-300', + task: 'border-rose-500/30 bg-rose-500/8', + } + case 'blocked': + return { + label: t('orchestration.statusBlocked'), + icon: AlertCircle, + dot: 'bg-rose-500', + chip: 'border-rose-500/20 bg-rose-500/10 text-rose-700 dark:text-rose-300', + task: 'border-rose-500/30 bg-rose-500/8', + } + case 'cancelled': + return { + label: t('orchestration.statusCancelled'), + icon: AlertCircle, + dot: 'bg-rose-500', + chip: 'border-rose-500/20 bg-rose-500/10 text-rose-700 dark:text-rose-300', + task: 'border-rose-500/30 bg-rose-500/8', + } + default: + return { + label: status ? status.replaceAll('_', ' ') : t('orchestration.statusPending'), + icon: Clock3, + dot: 'bg-muted-foreground', + chip: 'border-border bg-muted/70 text-muted-foreground', + task: 'border-border bg-muted/30', + } + } + } + + function flowKindMeta(kind?: string): FlowKindMeta { + switch (kind) { + case 'planning': + return { + icon: Sparkles, + label: t('orchestration.flowPlanning'), + color: 'border-border bg-background text-foreground', + } + case 'replanning': + return { + icon: GitMerge, + label: t('orchestration.flowReplanning'), + color: 'border-amber-500/25 bg-amber-500/10 text-amber-700 dark:text-amber-300', + } + case 'verification': + return { + icon: ShieldCheck, + label: t('orchestration.flowVerification'), + color: 'border-emerald-500/25 bg-emerald-500/10 text-emerald-700 dark:text-emerald-300', + } + case 'checkpoint': + return { + icon: ScanSearch, + label: t('orchestration.flowCheckpoint'), + color: 'border-orange-500/25 bg-orange-500/10 text-orange-700 dark:text-orange-300', + } + case 'checkpoint_resume': + return { + icon: ScanSearch, + label: t('orchestration.flowCheckpointResume'), + color: 'border-orange-500/25 bg-orange-500/10 text-orange-700 dark:text-orange-300', + } + case 'attempt': + return { + icon: Wrench, + label: t('orchestration.flowAttempt'), + color: 'border-sky-500/25 bg-sky-500/10 text-sky-700 dark:text-sky-300', + } + case 'attempt_finalize': + return { + icon: Wrench, + label: t('orchestration.flowAttemptFinalize'), + color: 'border-sky-500/25 bg-sky-500/10 text-sky-700 dark:text-sky-300', + } + default: + return { + icon: Workflow, + label: t('orchestration.flowStep'), + color: 'border-border bg-muted/70 text-muted-foreground', + } + } + } + + return { statusMeta, flowKindMeta } +} diff --git a/apps/web/src/pages/orchestration/composables/use-run-event-stream.ts b/apps/web/src/pages/orchestration/composables/use-run-event-stream.ts new file mode 100644 index 000000000..4b8e173ab --- /dev/null +++ b/apps/web/src/pages/orchestration/composables/use-run-event-stream.ts @@ -0,0 +1,217 @@ +import { onBeforeUnmount, ref, watch, type Ref } from 'vue' + +export interface RunEventStreamEvent { + type: string + seq?: number + run_id?: string + task_id?: string + attempt_id?: string + payload?: Record + [key: string]: unknown +} + +export type RunEventStreamStatus = 'idle' | 'connecting' | 'open' | 'closed' | 'error' + +interface UseRunEventStreamOptions { + runId: Ref + // Whether the stream should be open. Re-checked when runId changes and + // on every event, so callers can drop it as soon as the run terminates. + enabled: Ref + // Called once per decoded event. Returning a Promise is fine; we do not + // await it, so handlers must be safe to fire-and-forget. + onEvent: (event: RunEventStreamEvent) => void | Promise + // Override for the API base. Defaults to the /api proxy. + baseUrl?: string +} + +const DEFAULT_BASE_URL = '/api' + +// Reconnect backoff in ms. After the last value we keep retrying at that +// interval until enabled flips off. Numbers are intentionally relaxed: the +// server still has every event in Postgres, so a delayed reconnect just +// triggers one backfill page. +const RECONNECT_BACKOFF_MS = [500, 1000, 2000, 4000, 8000] + +/** + * Subscribes to /orchestration/runs/{runId}/watch over fetch + ReadableStream. + * EventSource is not used because it cannot send an Authorization header. + * Each decoded event goes to onEvent; status is exposed so the host page can + * show a connection indicator. + * + * The stream closes itself when: + * - runId becomes empty + * - enabled flips to false + * - the component unmounts + * - the server closes the response + */ +export function useRunEventStream(options: UseRunEventStreamOptions) { + const { runId, enabled, onEvent } = options + const baseUrl = (options.baseUrl ?? DEFAULT_BASE_URL).replace(/\/$/, '') + + const status = ref('idle') + const lastEventAt = ref(null) + const lastError = ref(null) + + let controller: AbortController | null = null + let reconnectTimer: number | null = null + let reconnectAttempt = 0 + let lastSeq = 0 + let activeRunId = '' + + function clearReconnect() { + if (reconnectTimer !== null) { + window.clearTimeout(reconnectTimer) + reconnectTimer = null + } + } + + function close() { + clearReconnect() + if (controller) { + controller.abort() + controller = null + } + status.value = 'closed' + } + + function scheduleReconnect() { + if (!enabled.value || !activeRunId) return + const delay = RECONNECT_BACKOFF_MS[Math.min(reconnectAttempt, RECONNECT_BACKOFF_MS.length - 1)] + reconnectAttempt += 1 + reconnectTimer = window.setTimeout(() => { + reconnectTimer = null + void open() + }, delay) + } + + async function open() { + if (!enabled.value || !activeRunId) { + close() + return + } + clearReconnect() + if (controller) controller.abort() + const localController = new AbortController() + controller = localController + + const url = `${baseUrl}/orchestration/runs/${encodeURIComponent(activeRunId)}/watch${ + lastSeq > 0 ? `?after_seq=${lastSeq}` : '' + }` + const headers: HeadersInit = { + Accept: 'text/event-stream', + } + const token = localStorage.getItem('token') + if (token) headers.Authorization = `Bearer ${token}` + + status.value = 'connecting' + let response: Response + try { + response = await fetch(url, { + method: 'GET', + headers, + signal: localController.signal, + }) + } catch (err) { + if (localController.signal.aborted) return + lastError.value = err + status.value = 'error' + scheduleReconnect() + return + } + + if (!response.ok || !response.body) { + lastError.value = new Error(`watch stream returned ${response.status}`) + status.value = 'error' + scheduleReconnect() + return + } + + status.value = 'open' + reconnectAttempt = 0 + const reader = response.body.pipeThrough(new TextDecoderStream()).getReader() + let buffer = '' + + try { + while (true) { + const { value, done } = await reader.read() + if (done) break + buffer += value + const events = buffer.split('\n\n') + buffer = events.pop() ?? '' + for (const block of events) { + dispatchBlock(block) + } + } + } catch (err) { + if (!localController.signal.aborted) { + lastError.value = err + status.value = 'error' + scheduleReconnect() + } + return + } + + if (!localController.signal.aborted) { + // Server closed cleanly. Reconnect so the live tail stays open until + // the host disables it. + status.value = 'closed' + scheduleReconnect() + } + } + + function dispatchBlock(block: string) { + const trimmed = block.trim() + if (!trimmed) return + let payload: string | null = null + for (const line of trimmed.split('\n')) { + if (line.startsWith('data:')) { + payload = line.slice(5).trim() + break + } + } + if (!payload) return + let parsed: RunEventStreamEvent + try { + parsed = JSON.parse(payload) as RunEventStreamEvent + } catch { + return + } + if (parsed.type === 'ping') return + if (typeof parsed.seq === 'number' && parsed.seq > lastSeq) { + lastSeq = parsed.seq + } + lastEventAt.value = new Date() + void onEvent(parsed) + } + + watch( + [runId, enabled], + ([newRunId, newEnabled]) => { + const trimmed = (newRunId ?? '').trim() + if (trimmed !== activeRunId) { + // New run: reset the cursor so the next stream backfills from the + // start of its own history. + lastSeq = 0 + reconnectAttempt = 0 + } + activeRunId = trimmed + if (!trimmed || !newEnabled) { + close() + status.value = 'idle' + return + } + void open() + }, + { immediate: true }, + ) + + onBeforeUnmount(() => { + close() + }) + + return { + status, + lastEventAt, + lastError, + } +} diff --git a/apps/web/src/pages/orchestration/env-resource-detail.vue b/apps/web/src/pages/orchestration/env-resource-detail.vue new file mode 100644 index 000000000..64fcb3177 --- /dev/null +++ b/apps/web/src/pages/orchestration/env-resource-detail.vue @@ -0,0 +1,327 @@ + + + diff --git a/apps/web/src/pages/orchestration/env-resource-new.vue b/apps/web/src/pages/orchestration/env-resource-new.vue new file mode 100644 index 000000000..14e28016e --- /dev/null +++ b/apps/web/src/pages/orchestration/env-resource-new.vue @@ -0,0 +1,212 @@ + + + diff --git a/apps/web/src/pages/orchestration/env-resources.vue b/apps/web/src/pages/orchestration/env-resources.vue new file mode 100644 index 000000000..17155e834 --- /dev/null +++ b/apps/web/src/pages/orchestration/env-resources.vue @@ -0,0 +1,158 @@ + + + diff --git a/apps/web/src/pages/orchestration/image-detail.vue b/apps/web/src/pages/orchestration/image-detail.vue new file mode 100644 index 000000000..e77d42056 --- /dev/null +++ b/apps/web/src/pages/orchestration/image-detail.vue @@ -0,0 +1,200 @@ + + +