From cef229d5c64ab02d832db4330b47062e006e7730 Mon Sep 17 00:00:00 2001 From: Kacper Wojciechowski <39823706+jog1t@users.noreply.github.com> Date: Sat, 7 Feb 2026 00:26:48 +0100 Subject: [PATCH] feat(workflow): connect workflow visualizer with live data --- .../actors/actor-inspector-context.tsx | 262 ++++++-- .../actors/workflow/actor-workflow-tab.tsx | 396 +----------- .../workflow/transform-workflow-history.ts | 226 +++++++ .../actors/workflow/workflow-visualizer.tsx | 565 ++++++++++++------ 4 files changed, 817 insertions(+), 632 deletions(-) create mode 100644 frontend/src/components/actors/workflow/transform-workflow-history.ts diff --git a/frontend/src/components/actors/actor-inspector-context.tsx b/frontend/src/components/actors/actor-inspector-context.tsx index 55dbe59ae8..012dc84eca 100644 --- a/frontend/src/components/actors/actor-inspector-context.tsx +++ b/frontend/src/components/actors/actor-inspector-context.tsx @@ -1,3 +1,5 @@ +import type { ReadRangeOptions, ReadRangeWire } from "@rivetkit/traces"; +import { decodeReadRangeWire } from "@rivetkit/traces/encoding"; import { mutationOptions, type QueryClient, @@ -10,19 +12,20 @@ import { createContext, useContext, useMemo, useRef } from "react"; import type ReconnectingWebSocket from "reconnectingwebsocket"; import { type Connection, + decodeWorkflowHistoryTransport, + type QueueStatus, type ToServer, TO_CLIENT_VERSIONED as toClient, TO_SERVER_VERSIONED as toServer, - type QueueStatus, } from "rivetkit/inspector"; -import type { ReadRangeOptions, ReadRangeWire } from "@rivetkit/traces"; -import { decodeReadRangeWire } from "@rivetkit/traces/encoding"; import { toast } from "sonner"; import { match } from "ts-pattern"; import z from "zod"; import { type ConnectionStatus, useWebSocket } from "../hooks/use-websocket"; import { useActorInspectorData } from "./hooks/use-actor-inspector-data"; import type { ActorId } from "./queries"; +import { transformWorkflowHistory } from "./workflow/transform-workflow-history"; +import type { WorkflowHistory } from "./workflow/workflow-types"; export const actorInspectorQueriesKeys = { actorState: (actorId: ActorId) => ["actor", actorId, "state"] as const, @@ -39,6 +42,10 @@ export const actorInspectorQueriesKeys = { actorQueueSize: (actorId: ActorId) => ["actor", actorId, "queue", "size"] as const, actorWakeUp: (actorId: ActorId) => ["actor", actorId, "wake-up"] as const, + actorWorkflowHistory: (actorId: ActorId) => + ["actor", actorId, "workflow-history"] as const, + actorIsWorkflowEnabled: (actorId: ActorId) => + ["actor", actorId, "is-workflow-enabled"] as const, }; type QueueStatusSummary = { @@ -61,6 +68,10 @@ interface ActorInspectorApi { getRpcs: () => Promise; getTraces: (options: ReadRangeOptions) => Promise; getQueueStatus: (limit: number) => Promise; + getWorkflowHistory: () => Promise<{ + history: WorkflowHistory | null; + isEnabled: boolean; + }>; getMetadata: () => Promise<{ version: string }>; } @@ -283,7 +294,10 @@ export const createDefaultActorInspectorContext = ({ actorQueueStatusQueryOptions(actorId: ActorId, limit: number) { return queryOptions({ staleTime: 0, - queryKey: actorInspectorQueriesKeys.actorQueueStatus(actorId, limit), + queryKey: actorInspectorQueriesKeys.actorQueueStatus( + actorId, + limit, + ), queryFn: () => { return api.getQueueStatus(limit); }, @@ -297,6 +311,24 @@ export const createDefaultActorInspectorContext = ({ }); }, + actorWorkflowHistoryQueryOptions(actorId: ActorId) { + return queryOptions({ + staleTime: Infinity, + queryKey: actorInspectorQueriesKeys.actorWorkflowHistory(actorId), + queryFn: () => { + return api.getWorkflowHistory(); + }, + }); + }, + + actorIsWorkflowEnabledQueryOptions(actorId: ActorId) { + return queryOptions({ + staleTime: Infinity, + queryKey: actorInspectorQueriesKeys.actorIsWorkflowEnabled(actorId), + queryFn: () => false, + }); + }, + actorPingQueryOptions(actorId: ActorId) { return queryOptions({ queryKey: ["actor", actorId, "ping"], @@ -470,9 +502,11 @@ export const ActorInspectorProvider = ({ const actionsManager = useRef(new ActionsManager()); - const { data: actorMetadata, isSuccess: isActorMetadataSuccess } = useQuery({ - ...actorMetadataQueryOptions({ actorId, credentials }), - }); + const { data: actorMetadata, isSuccess: isActorMetadataSuccess } = useQuery( + { + ...actorMetadataQueryOptions({ actorId, credentials }), + }, + ); const { isSuccess: isActorDataSuccess } = useActorInspectorData(actorId); @@ -519,19 +553,25 @@ export const ActorInspectorProvider = ({ }, executeAction: async (name, args) => { const { id, promise } = - actionsManager.current.createResolver(); + actionsManager.current.createResolver({ + name: "executeAction", + }); sendMessage( - serverMessage({ - body: { - tag: "ActionRequest", - val: { - id: BigInt(id), - name, - args: new Uint8Array(cbor.encode(args)).buffer, + serverMessage( + { + body: { + tag: "ActionRequest", + val: { + id: BigInt(id), + name, + args: new Uint8Array(cbor.encode(args)) + .buffer, + }, }, }, - }, inspectorProtocolVersion), + inspectorProtocolVersion, + ), ); return promise; @@ -539,29 +579,36 @@ export const ActorInspectorProvider = ({ patchState: async (state) => { sendMessage( - serverMessage({ - body: { - tag: "PatchStateRequest", - val: { - state: new Uint8Array(cbor.encode(state)) - .buffer, + serverMessage( + { + body: { + tag: "PatchStateRequest", + val: { + state: new Uint8Array(cbor.encode(state)) + .buffer, + }, }, }, - }, inspectorProtocolVersion), + inspectorProtocolVersion, + ), ); }, getConnections: async () => { - const { id, promise } = - actionsManager.current.createResolver(); + const { id, promise } = actionsManager.current.createResolver< + Connection[] + >({ name: "getConnections" }); sendMessage( - serverMessage({ - body: { - tag: "ConnectionsRequest", - val: { id: BigInt(id) }, + serverMessage( + { + body: { + tag: "ConnectionsRequest", + val: { id: BigInt(id) }, + }, }, - }, inspectorProtocolVersion), + inspectorProtocolVersion, + ), ); return promise; @@ -571,31 +618,38 @@ export const ActorInspectorProvider = ({ const { id, promise } = actionsManager.current.createResolver<{ isEnabled: boolean; state: unknown; - }>(); + }>({ name: "getState" }); sendMessage( - serverMessage({ - body: { - tag: "StateRequest", - val: { id: BigInt(id) }, + serverMessage( + { + body: { + tag: "StateRequest", + val: { id: BigInt(id) }, + }, }, - }, inspectorProtocolVersion), + inspectorProtocolVersion, + ), ); return promise; }, getRpcs() { - const { id, promise } = - actionsManager.current.createResolver(); + const { id, promise } = actionsManager.current.createResolver< + string[] + >({ name: "getRpcs" }); sendMessage( - serverMessage({ - body: { - tag: "RpcsListRequest", - val: { id: BigInt(id) }, + serverMessage( + { + body: { + tag: "RpcsListRequest", + val: { id: BigInt(id) }, + }, }, - }, inspectorProtocolVersion), + inspectorProtocolVersion, + ), ); return promise; @@ -603,21 +657,25 @@ export const ActorInspectorProvider = ({ getTraces: async ({ startMs, endMs, limit }) => { const { id, promise } = actionsManager.current.createResolver({ + name: "getTraces", timeoutMs: 10_000, }); sendMessage( - serverMessage({ - body: { - tag: "TraceQueryRequest", - val: { - id: BigInt(id), - startMs: BigInt(Math.floor(startMs)), - endMs: BigInt(Math.floor(endMs)), - limit: BigInt(limit), + serverMessage( + { + body: { + tag: "TraceQueryRequest", + val: { + id: BigInt(id), + startMs: BigInt(Math.floor(startMs)), + endMs: BigInt(Math.floor(endMs)), + limit: BigInt(limit), + }, }, }, - }, inspectorProtocolVersion), + inspectorProtocolVersion, + ), ); return promise; @@ -626,19 +684,47 @@ export const ActorInspectorProvider = ({ const safeLimit = Math.max(0, Math.floor(limit)); const { id, promise } = actionsManager.current.createResolver({ + name: "getQueueStatus", timeoutMs: 10_000, }); sendMessage( - serverMessage({ - body: { - tag: "QueueRequest", - val: { - id: BigInt(id), - limit: BigInt(safeLimit), + serverMessage( + { + body: { + tag: "QueueRequest", + val: { + id: BigInt(id), + limit: BigInt(safeLimit), + }, }, }, - }, inspectorProtocolVersion), + inspectorProtocolVersion, + ), + ); + + return promise; + }, + + getWorkflowHistory: async () => { + const { id, promise } = actionsManager.current.createResolver<{ + history: WorkflowHistory | null; + isEnabled: boolean; + }>({ + name: "getWorkflowHistory", + timeoutMs: 10_000, + }); + + sendMessage( + serverMessage( + { + body: { + tag: "WorkflowHistoryRequest", + val: { id: BigInt(id) }, + }, + }, + inspectorProtocolVersion, + ), ); return promise; @@ -688,9 +774,7 @@ const createMessageHandler = actionsManager: React.RefObject; }) => async (e: ReconnectingWebSocket.MessageEvent) => { - let message: ReturnType< - typeof toClient.deserializeWithEmbeddedVersion - >; + let message: ReturnType; try { message = toClient.deserializeWithEmbeddedVersion( new Uint8Array(await e.data.arrayBuffer()), @@ -721,6 +805,20 @@ const createMessageHandler = actorInspectorQueriesKeys.actorIsStateEnabled(actorId), body.val.isStateEnabled, ); + + queryClient.setQueryData( + actorInspectorQueriesKeys.actorIsWorkflowEnabled(actorId), + body.val.isWorkflowEnabled, + ); + + if (body.val.workflowHistory) { + queryClient.setQueryData( + actorInspectorQueriesKeys.actorWorkflowHistory(actorId), + transformWorkflowHistoryFromInspector( + body.val.workflowHistory, + ), + ); + } }) .with({ tag: "ConnectionsResponse" }, (body) => { const { rid } = body.val; @@ -789,6 +887,23 @@ const createMessageHandler = queryKey: ["actor", actorId, "queue"], }); }) + .with({ tag: "WorkflowHistoryUpdated" }, (body) => { + queryClient.setQueryData( + actorInspectorQueriesKeys.actorWorkflowHistory(actorId), + transformWorkflowHistoryFromInspector(body.val.history), + ); + }) + .with({ tag: "WorkflowHistoryResponse" }, (body) => { + const { rid } = body.val; + actionsManager.current.resolve(Number(rid), { + history: body.val.history + ? transformWorkflowHistoryFromInspector( + body.val.history, + ) + : null, + isEnabled: body.val.isWorkflowEnabled, + }); + }) .with({ tag: "Error" }, (body) => { if (body.val.message === INSPECTOR_ERROR_EVENTS_DROPPED) { return; @@ -809,6 +924,22 @@ function transformState(state: ArrayBuffer) { return cbor.decode(new Uint8Array(state)); } +function transformWorkflowHistoryFromInspector(raw: ArrayBuffer): { + history: WorkflowHistory | null; + isEnabled: boolean; +} { + try { + const decoded = decodeWorkflowHistoryTransport(raw); + return { + history: transformWorkflowHistory(decoded), + isEnabled: true, + }; + } catch (error) { + console.warn("Failed to decode workflow history", error); + return { history: null, isEnabled: true }; + } +} + function serverMessage(data: ToServer, version: number) { return toServer.serializeWithEmbeddedVersion(data, version); } @@ -820,6 +951,7 @@ class ActionsManager { createResolver(options?: { timeoutMs?: number; + name?: string; }): { id: number; promise: Promise } { const id = this.nextId++; const { promise, resolve, reject } = Promise.withResolvers(); @@ -829,7 +961,11 @@ class ActionsManager { // set a timeout to reject the promise if not resolved in time setTimeout(() => { if (this.suspensions.has(id)) { - reject(new Error("Action timed out")); + reject( + new Error( + `Action timed out: ${options?.name ?? "unknown"}`, + ), + ); this.suspensions.delete(id); } }, timeoutMs); diff --git a/frontend/src/components/actors/workflow/actor-workflow-tab.tsx b/frontend/src/components/actors/workflow/actor-workflow-tab.tsx index c607104b98..b3d435777a 100644 --- a/frontend/src/components/actors/workflow/actor-workflow-tab.tsx +++ b/frontend/src/components/actors/workflow/actor-workflow-tab.tsx @@ -1,379 +1,27 @@ import { faSpinnerThird, Icon } from "@rivet-gg/icons"; +import { useQuery } from "@tanstack/react-query"; import type { PropsWithChildren } from "react"; +import { useActorInspector } from "../actor-inspector-context"; +import type { ActorId } from "../queries"; import { WorkflowVisualizer } from "./workflow-visualizer"; -import type { WorkflowHistory } from "./workflow-types"; interface ActorWorkflowTabProps { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - actorId: string; + actorId: ActorId; } -// Sample workflow data for demonstration -// This will be replaced with actual API data when the workflow inspector is implemented -const sampleWorkflowHistory: WorkflowHistory = { - workflowId: "d579cc14-f798-42de-b4b6-2d10fa37d03b", - state: "completed", - nameRegistry: [ - "bootstrap", - "validate-input", - "checkpoint-after-validation", - "load-user-profile", - "compute-discount", - "ephemeral-cache-check", - "checkpoint-before-reserve", - "process-items-loop", - "fetch-item-0", - "compute-tax-0", - "reserve-inventory-0", - "fetch-item-1", - "compute-tax-1", - "reserve-inventory-1", - "short-cooldown", - "cooldown-sleep", - "join-dependencies", - "inventory", - "inventory-audit", - "pricing", - "pricing-method", - "shipping", - "shipping-zone", - "join-inventory-sleep", - "join-shipping-sleep", - "race-fulfillment", - "race-fast", - "race-fast-sleep", - "race-slow", - "race-slow-sleep", - "finalize", - ], - input: { - userId: "user-123", - itemIds: ["item-1", "item-2", "item-3", "item-4"], - deadlineMs: 1769561520725, - }, - output: { - workflowId: "0f6fe6cf-e6ca-46de-9512-72db613c2ad6", - bootstrap: { - requestId: "3958c87d-9dcb-4f20-8795-6317794e4351", - startedAt: 1769561520421, - }, - profile: { - id: "user-123", - tier: "standard", - flags: ["email-verified", "promo-eligible"], - }, - discount: { percent: 5, reason: "tier-discount" }, - items: { - receipts: [ - { - itemId: "item-1", - basePrice: 100, - tax: 8, - finalPrice: 103, - reservationId: "402885a5-2b88-423f-8500-6eb641073a5f", - }, - { - itemId: "item-2", - basePrice: 115, - tax: 9, - finalPrice: 118, - reservationId: "05055094-3c17-4527-9374-a16abd2a0ff6", - }, - ], - summary: { count: 4, total: 504 }, - }, - }, - history: [ - { - key: "bootstrap", - entry: { - id: "26299b0e-70e8-4b30-a8f2-c452775b224c", - location: [0], - kind: { - type: "step", - data: { - output: { - requestId: "97531aad-6075-47ea-90f1-31c19262b750", - startedAt: 1769562508317, - }, - }, - }, - dirty: false, - startedAt: 1769562508317, - completedAt: 1769562508350, - }, - }, - { - key: "validate-input", - entry: { - id: "c4668f41-8d82-4510-9b39-df2c107463d3", - location: [1], - kind: { - type: "step", - data: { output: true }, - }, - dirty: false, - startedAt: 1769562508355, - completedAt: 1769562508412, - }, - }, - { - key: "checkpoint-after-validation", - entry: { - id: "a4569615-5446-4ca0-85fc-f41a1502ea4a", - location: [2], - kind: { - type: "rollback_checkpoint", - data: { name: "checkpoint-after-validation" }, - }, - dirty: false, - startedAt: 1769562508420, - completedAt: 1769562508425, - }, - }, - { - key: "load-user-profile", - entry: { - id: "6eee86bd-3820-4ff5-ab6f-f54af37316d9", - location: [3], - kind: { - type: "step", - data: { - output: { - id: "user-123", - tier: "standard", - flags: ["email-verified", "promo-eligible"], - }, - }, - }, - dirty: false, - startedAt: 1769562508430, - completedAt: 1769562508892, - }, - }, - { - key: "compute-discount", - entry: { - id: "bb09df91-3e00-4138-8a2f-529be262091d", - location: [4], - kind: { - type: "step", - data: { output: { percent: 5, reason: "tier-discount" } }, - }, - dirty: false, - startedAt: 1769562508900, - completedAt: 1769562508945, - }, - }, - { - key: "ephemeral-cache-check", - entry: { - id: "e7ebd9ed-d71d-4f75-914b-dafd7e009087", - location: [5], - kind: { - type: "step", - data: { output: { cacheHit: false, tier: "standard" } }, - }, - dirty: false, - startedAt: 1769562508950, - completedAt: 1769562509012, - }, - }, - { - key: "short-cooldown", - entry: { - id: "cd8929e1-6e36-4fdc-bee6-ef9154e32433", - location: [6], - kind: { - type: "sleep", - data: { deadline: 1769562508608, state: "completed" }, - }, - dirty: false, - startedAt: 1769562508500, - completedAt: 1769562508608, - }, - }, - { - key: "join-dependencies", - entry: { - id: "ea93a137-db66-48fb-b1fd-9e6f5d5a084c", - location: [7], - kind: { - type: "join", - data: { - branches: { - inventory: { - status: "completed", - output: { - reserved: 4, - checked: 4, - notes: ["inventory-ok", "items=4"], - }, - }, - pricing: { - status: "completed", - output: { - subtotal: 504, - discount: 25, - total: 479, - method: "promo", - }, - }, - shipping: { - status: "completed", - output: { method: "ground", etaDays: 4, zone: "us-east" }, - }, - }, - }, - }, - dirty: false, - startedAt: 1769562509000, - completedAt: 1769562509500, - }, - }, - { - key: "join-dependencies/inventory/inventory-audit", - entry: { - id: "7ac02dbe-5dc0-45cb-8c91-bbb102962a1a", - location: [7, 8, 9], - kind: { - type: "step", - data: { output: 4 }, - }, - dirty: false, - startedAt: 1769562509050, - completedAt: 1769562509150, - }, - }, - { - key: "join-dependencies/inventory/join-inventory-sleep", - entry: { - id: "9390eac8-5a96-47a9-8e9c-bc2618c1e5f9", - location: [7, 8, 10], - kind: { - type: "sleep", - data: { deadline: 1769562509509, state: "completed" }, - }, - dirty: false, - startedAt: 1769562509160, - completedAt: 1769562509400, - }, - }, - { - key: "join-dependencies/pricing/pricing-method", - entry: { - id: "7a6047fc-4fd6-4945-89a5-95735b5609d6", - location: [7, 11, 12], - kind: { - type: "step", - data: { output: "promo" }, - }, - dirty: false, - startedAt: 1769562509050, - completedAt: 1769562509200, - }, - }, - { - key: "join-dependencies/shipping/shipping-zone", - entry: { - id: "0603cbf7-bc00-4ad1-8477-1558d6924e15", - location: [7, 13, 14], - kind: { - type: "step", - data: { output: "us-east" }, - }, - dirty: false, - startedAt: 1769562509050, - completedAt: 1769562509180, - }, - }, - { - key: "join-dependencies/shipping/join-shipping-sleep", - entry: { - id: "8de4fa8f-77d5-41c3-b73a-00ad4c66548b", - location: [7, 13, 15], - kind: { - type: "sleep", - data: { deadline: 1769562509509, state: "completed" }, - }, - dirty: false, - startedAt: 1769562509190, - completedAt: 1769562509450, - }, - }, - { - key: "race-fulfillment", - entry: { - id: "623243da-8b14-4b3b-9a83-aa4bdd1a4e43", - location: [16], - kind: { - type: "race", - data: { - winner: "race-fast", - branches: { - "race-fast": { - status: "completed", - output: { method: "express", cost: 18, etaDays: 1 }, - }, - "race-slow": { - status: "cancelled", - error: "Cancelled: lost race", - }, - }, - }, - }, - dirty: false, - startedAt: 1769562509520, - completedAt: 1769562509700, - }, - }, - { - key: "race-fulfillment/race-fast/race-fast-sleep", - entry: { - id: "74dc4690-7567-4dee-9067-33d7d9d8b9ef", - location: [16, 17, 18], - kind: { - type: "sleep", - data: { deadline: 1769562509643, state: "completed" }, - }, - dirty: false, - startedAt: 1769562509550, - completedAt: 1769562509650, - }, - }, - { - key: "finalize", - entry: { - id: "d9da246b-54c7-4681-afef-3eda5c2f66c0", - location: [19], - kind: { - type: "step", - data: { output: true }, - }, - dirty: false, - startedAt: 1769562509750, - completedAt: 1769562509800, - }, - }, - ], -}; +export function ActorWorkflowTab({ actorId }: ActorWorkflowTabProps) { + const inspector = useActorInspector(); -export function ActorWorkflowTab(_props: ActorWorkflowTabProps) { - // For now, show sample workflow data - // In the future, this will use the inspector API to get real workflow data - const isWorkflowEnabled = true; - const isLoading = false; - const isError = false; + const { data: isWorkflowEnabled, isLoading: isEnabledLoading } = useQuery( + inspector.actorIsWorkflowEnabledQueryOptions(actorId), + ); - if (isError) { - return ( - - Workflow Visualizer is currently unavailable. -
- See console/logs for more details. -
- ); - } + const { data: workflowData, isLoading: isHistoryLoading } = useQuery( + inspector.actorWorkflowHistoryQueryOptions(actorId), + ); + + const isLoading = isEnabledLoading || isHistoryLoading; + const workflow = workflowData?.history ?? null; if (isLoading) { return ( @@ -390,16 +38,24 @@ export function ActorWorkflowTab(_props: ActorWorkflowTabProps) { return (

- Workflow Visualizer is not enabled for this Actor.
This - feature requires a workflow-based Actor. + Workflow Visualizer is not enabled for this Actor.
{" "} + This feature requires a workflow-based Actor.

); } + if (!workflow) { + return ( + +

No workflow history available yet.

+
+ ); + } + return (
- +
); } diff --git a/frontend/src/components/actors/workflow/transform-workflow-history.ts b/frontend/src/components/actors/workflow/transform-workflow-history.ts new file mode 100644 index 0000000000..5c20c98831 --- /dev/null +++ b/frontend/src/components/actors/workflow/transform-workflow-history.ts @@ -0,0 +1,226 @@ +import * as cbor from "cbor-x"; +import type { + TransportWorkflowHistory, + decodeWorkflowHistoryTransport, +} from "rivetkit/inspector"; +import type { WorkflowHistory, EntryKind, EntryStatus, Location, SleepState, BranchStatus, BranchStatusType, EntryKindType } from "./workflow-types"; + +type TransportWorkflowEntry = TransportWorkflowHistory["entries"][number]; +type TransportWorkflowEntryMetadata = ReturnType; + +function decodeCborOrNull(data: ArrayBuffer | null): unknown { + if (data === null) return undefined; + try { + return cbor.decode(new Uint8Array(data)); + } catch { + return undefined; + } +} + +function transformLocation( + location: TransportWorkflowEntry["location"], +): Location { + return location.map((segment) => { + if (segment.tag === "WorkflowNameIndex") { + return segment.val; + } + return { + loop: segment.val.loop, + iteration: segment.val.iteration, + }; + }); +} + +function transformSleepState(state: string): SleepState { + switch (state) { + case "PENDING": + return "pending"; + case "COMPLETED": + return "completed"; + case "INTERRUPTED": + return "interrupted"; + default: + return "pending"; + } +} + +function transformBranchStatusType(status: string): BranchStatusType { + switch (status) { + case "PENDING": + return "pending"; + case "RUNNING": + return "running"; + case "COMPLETED": + return "completed"; + case "FAILED": + return "failed"; + case "CANCELLED": + return "cancelled"; + default: + return "pending"; + } +} + +function transformBranches( + branches: ReadonlyMap, +): Record { + const result: Record = {}; + for (const [name, branch] of branches) { + result[name] = { + status: transformBranchStatusType(branch.status), + output: decodeCborOrNull(branch.output), + error: branch.error ?? undefined, + }; + } + return result; +} + +function transformEntryKind(kind: TransportWorkflowEntry["kind"]): EntryKind { + switch (kind.tag) { + case "WorkflowStepEntry": + return { + type: "step", + data: { + output: decodeCborOrNull(kind.val.output), + error: kind.val.error ?? undefined, + }, + }; + case "WorkflowLoopEntry": + return { + type: "loop", + data: { + state: decodeCborOrNull(kind.val.state), + iteration: kind.val.iteration, + output: decodeCborOrNull(kind.val.output), + }, + }; + case "WorkflowSleepEntry": + return { + type: "sleep", + data: { + deadline: Number(kind.val.deadline), + state: transformSleepState(kind.val.state), + }, + }; + case "WorkflowMessageEntry": + return { + type: "message", + data: { + name: kind.val.name, + data: decodeCborOrNull(kind.val.messageData), + }, + }; + case "WorkflowRollbackCheckpointEntry": + return { + type: "rollback_checkpoint", + data: { name: kind.val.name }, + }; + case "WorkflowJoinEntry": + return { + type: "join", + data: { branches: transformBranches(kind.val.branches) }, + }; + case "WorkflowRaceEntry": + return { + type: "race", + data: { + winner: kind.val.winner, + branches: transformBranches(kind.val.branches), + }, + }; + case "WorkflowRemovedEntry": + return { + type: "removed", + data: { + originalType: kind.val.originalType as EntryKindType, + originalName: kind.val.originalName ?? undefined, + }, + }; + } +} + +function transformEntryStatus(status: string): EntryStatus { + switch (status) { + case "PENDING": + return "pending"; + case "RUNNING": + return "running"; + case "COMPLETED": + return "completed"; + case "FAILED": + return "failed"; + case "EXHAUSTED": + return "retrying"; + default: + return "pending"; + } +} + +function buildEntryKey( + location: Location, + nameRegistry: readonly string[], +): string { + return location + .map((segment) => { + if (typeof segment === "number") { + return nameRegistry[segment] ?? `unknown-${segment}`; + } + const loopName = nameRegistry[segment.loop] ?? `loop-${segment.loop}`; + return `${loopName}[${segment.iteration}]`; + }) + .join("/"); +} + +/** + * Transform a decoded TransportWorkflowHistory into the UI WorkflowHistory format. + */ +export function transformWorkflowHistory( + transport: TransportWorkflowHistory, +): WorkflowHistory { + const { nameRegistry, entries, entryMetadata } = transport; + + const history = entries.map((entry) => { + const location = transformLocation(entry.location); + const meta = entryMetadata.get(entry.id); + const key = buildEntryKey(location, nameRegistry); + + return { + key, + entry: { + id: entry.id, + location, + kind: transformEntryKind(entry.kind), + dirty: false, + status: meta ? transformEntryStatus(meta.status) : ("pending" as EntryStatus), + startedAt: meta ? Number(meta.createdAt) : undefined, + completedAt: meta?.completedAt != null ? Number(meta.completedAt) : undefined, + retryCount: meta ? meta.attempts : undefined, + error: meta?.error ?? undefined, + }, + }; + }); + + // Derive the overall workflow state from entry metadata. + const hasRunning = history.some((h) => h.entry.status === "running"); + const hasFailed = history.some((h) => h.entry.status === "failed"); + const hasPending = history.some((h) => h.entry.status === "pending"); + const allCompleted = history.length > 0 && history.every((h) => h.entry.status === "completed"); + + let state: WorkflowHistory["state"] = "pending"; + if (allCompleted) { + state = "completed"; + } else if (hasFailed) { + state = "failed"; + } else if (hasRunning) { + state = "running"; + } else if (hasPending && history.some((h) => h.entry.status === "completed")) { + state = "running"; + } + + return { + workflowId: entries[0]?.id ?? "unknown", + state, + nameRegistry: [...nameRegistry], + history, + }; +} diff --git a/frontend/src/components/actors/workflow/workflow-visualizer.tsx b/frontend/src/components/actors/workflow/workflow-visualizer.tsx index b17c2fcbf5..ebced706a2 100644 --- a/frontend/src/components/actors/workflow/workflow-visualizer.tsx +++ b/frontend/src/components/actors/workflow/workflow-visualizer.tsx @@ -1,48 +1,50 @@ "use client"; import { - faPlay, - faRefresh, + faArrowDown, + faArrowUp, + faBolt, + faCircleCheck, + faCircleExclamation, faClock, + faCodeMerge, faEnvelope, faFlag, - faCodeMerge, - faBolt, - faTrash, - faMagnifyingGlassPlus, faMagnifyingGlassMinus, + faMagnifyingGlassPlus, faMaximize, + faPlay, + faRefresh, faRotateLeft, - faCircleCheck, - faCircleExclamation, faSpinnerThird, - faArrowDown, - faArrowUp, + faTrash, faXmark, Icon, } from "@rivet-gg/icons"; -import { useState, useRef, useCallback, useMemo, useEffect } from "react"; -import { cn } from "@/components"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { cn, DiscreteCopyButton } from "@/components"; +import { ActorObjectInspector } from "../console/actor-inspector"; import type { - WorkflowHistory, EntryKindType, - ExtendedEntryType, EntryStatus, + ExtendedEntryType, HistoryItem, - LoopEntry, JoinEntry, - RaceEntry, + LoopEntry, + LoopIterationMarker, MessageEntry, + RaceEntry, RemovedEntry, - StepEntry, SleepEntry, - LoopIterationMarker, + StepEntry, + WorkflowHistory, } from "./workflow-types"; // Layout constants const NODE_WIDTH = 200; const NODE_HEIGHT = 52; -const NODE_HEIGHT_DETAILED = 100; +const DETAILS_HEIGHT = 80; +const NODE_HEIGHT_DETAILED = NODE_HEIGHT + DETAILS_HEIGHT + 4; const NODE_GAP_Y = 32; const BRANCH_GAP_X = 48; const BRANCH_GAP_Y = 48; @@ -142,17 +144,23 @@ function TypeIcon({ case "rollback_checkpoint": return ; case "join": - return ; + return ( + + ); case "race": return ; case "removed": return ; case "input": - return ; + return ( + + ); case "output": return ; default: - return ; + return ( + + ); } } @@ -313,7 +321,8 @@ function parseAndLayout( // Sort by location topLevel.sort( - (a, b) => (a.entry.location[0] as number) - (b.entry.location[0] as number), + (a, b) => + (a.entry.location[0] as number) - (b.entry.location[0] as number), ); const layoutNodes: LayoutNode[] = []; @@ -493,7 +502,11 @@ function parseAndLayout( let iterY = loopY + LOOP_PADDING_Y; let prevIterLastNode: LayoutNode | null = null; - for (const { iteration, nodes: iterNodes, height } of iterationLayouts) { + for (const { + iteration, + nodes: iterNodes, + height, + } of iterationLayouts) { iterY += ITERATION_HEADER; for (let i = 0; i < iterNodes.length; i++) { const ln = iterNodes[i]; @@ -521,7 +534,8 @@ function parseAndLayout( }); } else if (prevIterLastNode) { // Connect first node of this iteration to last node of previous iteration - const prevCompletedAtTs = prevIterLastNode.node.completedAt; + const prevCompletedAtTs = + prevIterLastNode.node.completedAt; const currStartedAtTs = ln.node.startedAt; const deltaMs = prevCompletedAtTs && currStartedAtTs @@ -667,7 +681,9 @@ function parseAndLayout( name: branchName, isWinner: branchName === winner, isCancelled: - entryType === "race" && winner !== null && branchName !== winner, + entryType === "race" && + winner !== null && + branchName !== winner, x: 0, y: 0, width: NODE_WIDTH, @@ -756,14 +772,19 @@ function parseAndLayout( }); // Merge connections - const mergeY = currentY + maxBranchHeight + containerPadding + BRANCH_GAP_Y; + const mergeY = + currentY + maxBranchHeight + containerPadding + BRANCH_GAP_Y; for (const branch of branchLayouts) { if (!branch.isCancelled) { - const lastBranchNode = branch.nodes[branch.nodes.length - 1]; - const lastNodeCompletedAt = lastBranchNode?.node.completedAt; + const lastBranchNode = + branch.nodes[branch.nodes.length - 1]; + const lastNodeCompletedAt = + lastBranchNode?.node.completedAt; const branchPaddingX = 20; - const branchCenterX = branch.x + branchPaddingX + NODE_WIDTH / 2; - const containerBottom = branch.y + branch.height + containerPadding; + const branchCenterX = + branch.x + branchPaddingX + NODE_WIDTH / 2; + const containerBottom = + branch.y + branch.height + containerPadding; connections.push({ id: `conn-merge-${branch.name}`, x1: branchCenterX, @@ -908,11 +929,6 @@ function SVGNode({ const isFailed = node.status === "failed"; const isRetrying = node.status === "retrying"; - // Get data preview for detailed mode - const dataPreview = detailedMode - ? JSON.stringify(node.data, null, 2).slice(0, 120) - : ""; - return ( {/* biome-ignore lint/a11y/noStaticElementInteractions: SVG node for workflow visualization */} @@ -933,7 +949,13 @@ function SVGNode({ height={height} rx={10} fill={colors.bg} - stroke={selected ? "#52525b" : isFailed ? "#ef4444" : colors.border} + stroke={ + selected + ? "#52525b" + : isFailed + ? "#ef4444" + : colors.border + } strokeWidth={isFailed ? 2 : 1} className="transition-all duration-150" /> @@ -982,21 +1004,51 @@ function SVGNode({ )} {/* Status indicator - right side for running/retrying */} {(isRunning || isRetrying) && ( - - + +
+ +
)} {isFailed && ( - - + +
+ +
)} {/* Icon box with color */} @@ -1011,8 +1063,21 @@ function SVGNode({ strokeWidth={1} strokeOpacity={0.3} /> - - + +
+ +
{/* Text */} - {node.name.length > 18 ? `${node.name.slice(0, 18)}...` : node.name} + {node.name.length > 18 + ? `${node.name.slice(0, 18)}...` + : node.name} {/* Detailed mode: show data preview */} {detailedMode && ( - -
- {dataPreview} - {dataPreview.length >= 120 ? "..." : ""} + +
+
)} @@ -1079,7 +1140,8 @@ function Connection({ // Show delta: always if >= 500ms, on hover for smaller, or always if showAllDeltas const isSignificantDelta = deltaMs !== undefined && deltaMs >= 500; const shouldShowDelta = - deltaMs !== undefined && (isSignificantDelta || isHovered || showAllDeltas); + deltaMs !== undefined && + (isSignificantDelta || isHovered || showAllDeltas); // Build path based on connection type let path: string; @@ -1157,7 +1219,8 @@ function Connection({ fontFamily="system-ui" dominantBaseline="middle" > - {deltaMs !== undefined && formatDuration(deltaMs)} later + {deltaMs !== undefined && formatDuration(deltaMs)}{" "} + later ); })()} @@ -1236,7 +1299,10 @@ export function WorkflowVisualizer({ if (e.button === 0 || e.button === 1) { e.preventDefault(); setIsPanning(true); - setPanStart({ x: e.clientX - transform.x, y: e.clientY - transform.y }); + setPanStart({ + x: e.clientX - transform.x, + y: e.clientY - transform.y, + }); } }, [transform], @@ -1257,7 +1323,7 @@ export function WorkflowVisualizer({ const handleMouseUp = useCallback(() => setIsPanning(false), []); - const handleWheel = useCallback((e: React.WheelEvent) => { + const handleWheel = useCallback((e: WheelEvent) => { e.preventDefault(); if (e.ctrlKey || e.metaKey) { @@ -1287,6 +1353,14 @@ export function WorkflowVisualizer({ } }, []); + // Attach wheel listener as non-passive so preventDefault() works on touchpad gestures. + useEffect(() => { + const el = containerRef.current; + if (!el) return; + el.addEventListener("wheel", handleWheel, { passive: false }); + return () => el.removeEventListener("wheel", handleWheel); + }, [handleWheel]); + const zoomIn = () => setTransform((t) => ({ ...t, scale: Math.min(t.scale * 1.2, 2) })); const zoomOut = () => @@ -1294,8 +1368,10 @@ export function WorkflowVisualizer({ const resetView = () => setTransform({ x: 60, y: 60, scale: 1 }); const fitView = () => { if (containerRef.current) { - const { width, height } = containerRef.current.getBoundingClientRect(); - const scale = Math.min(width / 800, height / layout.totalHeight, 1) * 0.85; + const { width, height } = + containerRef.current.getBoundingClientRect(); + const scale = + Math.min(width / 800, height / layout.totalHeight, 1) * 0.85; setTransform({ x: 60, y: 60, scale }); } }; @@ -1311,7 +1387,6 @@ export function WorkflowVisualizer({ onMouseMove={handleMouseMove} onMouseUp={handleMouseUp} onMouseLeave={handleMouseUp} - onWheel={handleWheel} style={{ cursor: isPanning ? "grabbing" : "grab" }} > {/* Dot grid */} @@ -1410,15 +1485,28 @@ export function WorkflowVisualizer({ strokeWidth={1} /> - +
+ +
{ const baseColor = - group.type === "join" ? "#06b6d4" : "#ec4899"; - const iconDef = group.type === "join" ? faCodeMerge : faBolt; + group.type === "join" + ? "#06b6d4" + : "#ec4899"; + const iconDef = + group.type === "join" + ? faCodeMerge + : faBolt; return ( {group.branches.map((branch) => { @@ -1452,9 +1545,12 @@ export function WorkflowVisualizer({ : baseColor; const containerX = branch.x; const containerY = branch.y; - const containerWidth = branch.width + 40; - const containerHeight = branch.height + 48 + 20; - const containerCenterX = containerX + containerWidth / 2; + const containerWidth = + branch.width + 40; + const containerHeight = + branch.height + 48 + 20; + const containerCenterX = + containerX + containerWidth / 2; return ( - +
+ +
@@ -1571,14 +1688,20 @@ export function WorkflowVisualizer({ onClick={zoomIn} className="flex h-7 w-7 items-center justify-center rounded hover:bg-secondary" > - +
{Math.round(transform.scale * 100)}% @@ -1610,7 +1739,8 @@ export function WorkflowVisualizer({ Workflow
- {workflow.workflowId.slice(0, 8)}... | {workflow.state} + {workflow.workflowId.slice(0, 8)}... |{" "} + {workflow.state}
@@ -1618,16 +1748,22 @@ export function WorkflowVisualizer({ setDetailedMode(e.target.checked)} + onChange={(e) => + setDetailedMode(e.target.checked) + } className="h-3.5 w-3.5 rounded border-border bg-secondary accent-primary" /> - Detailed + + Detailed +
)} @@ -1710,7 +1849,8 @@ export function WorkflowVisualizer({ Retries
- {hoveredNode.node.retryCount} attempt(s) + {hoveredNode.node.retryCount}{" "} + attempt(s)
)} @@ -1719,12 +1859,16 @@ export function WorkflowVisualizer({ Data
-										{JSON.stringify(hoveredNode.node.data, null, 2).slice(
-											0,
-											200,
-										)}
-										{JSON.stringify(hoveredNode.node.data, null, 2).length >
-										200
+										{JSON.stringify(
+											hoveredNode.node.data,
+											null,
+											2,
+										).slice(0, 200)}
+										{JSON.stringify(
+											hoveredNode.node.data,
+											null,
+											2,
+										).length > 200
 											? "..."
 											: ""}
 									
@@ -1737,19 +1881,23 @@ export function WorkflowVisualizer({ {/* Bottom details panel */} {selectedNode && ( -
-
+
+ {/* Top row: identity + close */} +
@@ -1761,114 +1909,133 @@ export function WorkflowVisualizer({ {selectedNode.type}
-
- -
- -
-
- Key -
-
- {selectedNode.key} +
+ {selectedNode.status}
+ {selectedNode.retryCount && + selectedNode.retryCount > 0 && ( +
+ {selectedNode.retryCount} retry(s) +
+ )}
+ +
-
+ {/* Bottom row: metadata grid */} +
+
- Data + Key
-
-								{JSON.stringify(selectedNode.data, null, 2)}
-							
+ + + {selectedNode.key} + +
{selectedNode.startedAt && ( -
+
Started
-
- {new Date(selectedNode.startedAt).toLocaleString()} +
+ {new Date( + selectedNode.startedAt, + ).toLocaleString()}
)} {selectedNode.completedAt && ( -
+
Completed
-
- {new Date(selectedNode.completedAt).toLocaleString()} +
+ {new Date( + selectedNode.completedAt, + ).toLocaleString()}
)} {selectedNode.duration !== undefined && ( -
+
Duration
-
+
{formatDuration(selectedNode.duration)}
)} +
-
-
- Status -
-
- {selectedNode.status} -
-
- - {selectedNode.retryCount && selectedNode.retryCount > 0 && ( -
-
- Retries -
-
- {selectedNode.retryCount} + {/* Data + Error row */} + {(selectedNode.data || selectedNode.error) && ( +
+
+
+ Data
+
+									
+								
- )} - {selectedNode.error && ( -
-
- Error -
-
- {selectedNode.error} + {selectedNode.error && ( +
+
+ Error +
+
+ +
-
- )} - - -
+ )} +
+ )}
)}