diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9cf22e769d..c8eb5f8007 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1930,6 +1930,104 @@ importers: specifier: ^5 version: 5.9.2 + examples/queue-sandbox: + dependencies: + '@hono/node-server': + specifier: ^1.19.7 + version: 1.19.9(hono@4.11.3) + '@hono/node-ws': + specifier: ^1.2.0 + version: 1.3.0(@hono/node-server@1.19.9(hono@4.11.3))(hono@4.11.3) + '@rivetkit/react': + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/react + hono: + specifier: ^4.11.3 + version: 4.11.3 + react: + specifier: 19.1.0 + version: 19.1.0 + react-dom: + specifier: 19.1.0 + version: 19.1.0(react@19.1.0) + rivetkit: + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/rivetkit + srvx: + specifier: ^0.10.0 + version: 0.10.0 + devDependencies: + '@types/node': + specifier: ^22.13.9 + version: 22.19.5 + '@types/react': + specifier: ^19 + version: 19.2.2 + '@types/react-dom': + specifier: ^19 + version: 19.2.2(@types/react@19.2.2) + '@vitejs/plugin-react': + specifier: ^4.2.0 + version: 4.7.0(vite@5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)) + tsx: + specifier: ^3.12.7 + version: 3.14.0 + typescript: + specifier: ^5.5.2 + version: 5.9.3 + vite: + specifier: ^5.0.0 + version: 5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1) + vite-plugin-srvx: + specifier: ^1.0.0 + version: 1.0.0(srvx@0.10.0)(vite@5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)) + + examples/queue-sandbox-vercel: + dependencies: + '@hono/node-server': + specifier: ^1.19.7 + version: 1.19.9(hono@4.11.3) + '@hono/node-ws': + specifier: ^1.2.0 + version: 1.3.0(@hono/node-server@1.19.9(hono@4.11.3))(hono@4.11.3) + '@rivetkit/react': + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/react + hono: + specifier: ^4.11.3 + version: 4.11.3 + react: + specifier: 19.1.0 + version: 19.1.0 + react-dom: + specifier: 19.1.0 + version: 19.1.0(react@19.1.0) + rivetkit: + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/rivetkit + devDependencies: + '@types/node': + specifier: ^22.13.9 + version: 22.19.5 + '@types/react': + specifier: ^19 + version: 19.2.2 + '@types/react-dom': + specifier: ^19 + version: 19.2.2(@types/react@19.2.2) + '@vitejs/plugin-react': + specifier: ^4.2.0 + version: 4.7.0(vite@5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)) + tsx: + specifier: ^3.12.7 + version: 3.14.0 + typescript: + specifier: ^5.5.2 + version: 5.9.3 + vite: + specifier: ^5.0.0 + version: 5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1) + examples/raw-fetch-handler: dependencies: '@hono/node-server': @@ -2805,6 +2903,52 @@ importers: specifier: ^1.0.0 version: 1.0.0(srvx@0.10.0)(vite@5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)) + examples/workflow-sandbox-vercel: + dependencies: + '@hono/node-server': + specifier: ^1.19.7 + version: 1.19.9(hono@4.11.3) + '@hono/node-ws': + specifier: ^1.3.0 + version: 1.3.0(@hono/node-server@1.19.9(hono@4.11.3))(hono@4.11.3) + '@rivetkit/react': + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/react + hono: + specifier: ^4.11.3 + version: 4.11.3 + react: + specifier: 19.1.0 + version: 19.1.0 + react-dom: + specifier: 19.1.0 + version: 19.1.0(react@19.1.0) + rivetkit: + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/rivetkit + devDependencies: + '@types/node': + specifier: ^22.13.9 + version: 22.19.5 + '@types/react': + specifier: ^19 + version: 19.2.2 + '@types/react-dom': + specifier: ^19 + version: 19.2.2(@types/react@19.2.2) + '@vitejs/plugin-react': + specifier: ^4.2.0 + version: 4.7.0(vite@5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)) + tsx: + specifier: ^3.12.7 + version: 3.14.0 + typescript: + specifier: ^5.5.2 + version: 5.9.3 + vite: + specifier: ^5.0.0 + version: 5.4.20(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1) + frontend: dependencies: '@clerk/clerk-js': @@ -12004,6 +12148,7 @@ packages: glob@10.5.0: resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@11.0.3: @@ -15424,7 +15569,7 @@ packages: tar@7.5.2: resolution: {integrity: sha512-7NyxrTE4Anh8km8iEy7o0QYPs+0JKBTj5ZaqHg6B39erLg0qYXN3BijtShwbsNSvQ+LN75+KV+C4QR/f6Gwnpg==} engines: {node: '>=18'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me temp-dir@2.0.0: resolution: {integrity: sha512-aoBAniQmmwtcKp/7BzsH8Cxzv8OL736p7v1ihGb5e9DJ9kTwGWHrQrVB5+lfVDzfGrdRzXch+ig7LHaY1JTOrg==} diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index d2b698f8b1..ae9c50a3af 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -173,7 +173,7 @@ ], "scripts": { "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/serve-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts src/workflow/mod.ts", - "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v3.bare -o dist/schemas/actor-inspector/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v4.bare -o dist/schemas/actor-inspector/v4.ts" + "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/persist/v1.bare -o dist/schemas/persist/v1.ts && ./scripts/compile-bare.ts compile schemas/transport/v1.bare -o dist/schemas/transport/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts", "check-types": "tsc --noEmit", "lint": "biome check .", "lint:fix": "biome check --write .", diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare index ef349e5d24..19e572b57b 100644 --- a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare +++ b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare @@ -1,7 +1,7 @@ # MARK: Message To Server type PatchStateRequest struct { - state: data + state: data } type ActionRequest struct { @@ -18,15 +18,23 @@ type ConnectionsRequest struct { id: uint } -type EventsRequest struct { +type RpcsListRequest struct { id: uint } -type ClearEventsRequest struct { +type TraceQueryRequest struct { id: uint + startMs: uint + endMs: uint + limit: uint } -type RpcsListRequest struct { +type QueueRequest struct { + id: uint + limit: uint +} + +type WorkflowHistoryRequest struct { id: uint } @@ -35,9 +43,10 @@ type ToServerBody union { StateRequest | ConnectionsRequest | ActionRequest | - EventsRequest | - ClearEventsRequest | - RpcsListRequest + RpcsListRequest | + TraceQueryRequest | + QueueRequest | + WorkflowHistoryRequest } type ToServer struct { @@ -53,55 +62,18 @@ type Connection struct { details: data } -type ActionEvent struct { - name: str - args: data - connId: str -} - -type BroadcastEvent struct { - eventName: str - args: data -} - -type SubscribeEvent struct { - eventName: str - connId: str -} - -type UnSubscribeEvent struct { - eventName: str - connId: str -} - -type FiredEvent struct { - eventName: str - args: data - connId: str -} - -type EventBody union { - ActionEvent | - BroadcastEvent | - SubscribeEvent | - UnSubscribeEvent | - FiredEvent -} - -type Event struct { - id: str - timestamp: uint - body: EventBody -} +# Workflow history is encoded using schemas/transport. +type WorkflowHistory data type Init struct { connections: list - events: list state: optional isStateEnabled: bool rpcs: list isDatabaseEnabled: bool queueSize: uint + workflowHistory: optional + isWorkflowEnabled: bool } type ConnectionsResponse struct { @@ -115,28 +87,52 @@ type StateResponse struct { isStateEnabled: bool } -type EventsResponse struct { +type ActionResponse struct { rid: uint - events: list + output: data } -type ActionResponse struct { +type TraceQueryResponse struct { rid: uint - output: data + payload: data } -type StateUpdated struct { - state: State +type QueueMessageSummary struct { + id: uint + name: str + createdAtMs: uint +} + +type QueueStatus struct { + size: uint + maxSize: uint + messages: list + truncated: bool } -type EventsUpdated struct { - events: list +type QueueResponse struct { + rid: uint + status: QueueStatus +} + +type WorkflowHistoryResponse struct { + rid: uint + history: optional + isWorkflowEnabled: bool +} + +type StateUpdated struct { + state: State } type QueueUpdated struct { queueSize: uint } +type WorkflowHistoryUpdated struct { + history: WorkflowHistory +} + type RpcsListResponse struct { rid: uint rpcs: list @@ -145,6 +141,7 @@ type RpcsListResponse struct { type ConnectionsUpdated struct { connections: list } + type Error struct { message: str } @@ -152,13 +149,15 @@ type Error struct { type ToClientBody union { StateResponse | ConnectionsResponse | - EventsResponse | ActionResponse | ConnectionsUpdated | - EventsUpdated | QueueUpdated | StateUpdated | + WorkflowHistoryUpdated | RpcsListResponse | + TraceQueryResponse | + QueueResponse | + WorkflowHistoryResponse | Error | Init } diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v3.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v3.bare deleted file mode 100644 index 0e10d1c97a..0000000000 --- a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v3.bare +++ /dev/null @@ -1,120 +0,0 @@ -# MARK: Message To Server - -type PatchStateRequest struct { - state: data -} - -type ActionRequest struct { - id: uint - name: str - args: data -} - -type StateRequest struct { - id: uint -} - -type ConnectionsRequest struct { - id: uint -} - -type RpcsListRequest struct { - id: uint -} - -type TraceQueryRequest struct { - id: uint - startMs: uint - endMs: uint - limit: uint -} - -type ToServerBody union { - PatchStateRequest | - StateRequest | - ConnectionsRequest | - ActionRequest | - RpcsListRequest | - TraceQueryRequest -} - -type ToServer struct { - body: ToServerBody -} - -# MARK: Message To Client - -type State data - -type Connection struct { - id: str - details: data -} - -type Init struct { - connections: list - state: optional - isStateEnabled: bool - rpcs: list - isDatabaseEnabled: bool - queueSize: uint -} - -type ConnectionsResponse struct { - rid: uint - connections: list -} - -type StateResponse struct { - rid: uint - state: optional - isStateEnabled: bool -} - -type ActionResponse struct { - rid: uint - output: data -} - -type TraceQueryResponse struct { - rid: uint - payload: data -} - -type StateUpdated struct { - state: State -} - -type QueueUpdated struct { - queueSize: uint -} - -type RpcsListResponse struct { - rid: uint - rpcs: list -} - -type ConnectionsUpdated struct { - connections: list -} - -type Error struct { - message: str -} - -type ToClientBody union { - StateResponse | - ConnectionsResponse | - ActionResponse | - ConnectionsUpdated | - QueueUpdated | - StateUpdated | - RpcsListResponse | - TraceQueryResponse | - Error | - Init -} - -type ToClient struct { - body: ToClientBody -} diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v4.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v4.bare deleted file mode 100644 index 94073fc69d..0000000000 --- a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v4.bare +++ /dev/null @@ -1,145 +0,0 @@ -# MARK: Message To Server - -type PatchStateRequest struct { - state: data -} - -type ActionRequest struct { - id: uint - name: str - args: data -} - -type StateRequest struct { - id: uint -} - -type ConnectionsRequest struct { - id: uint -} - -type RpcsListRequest struct { - id: uint -} - -type TraceQueryRequest struct { - id: uint - startMs: uint - endMs: uint - limit: uint -} - -type QueueRequest struct { - id: uint - limit: uint -} - -type ToServerBody union { - PatchStateRequest | - StateRequest | - ConnectionsRequest | - ActionRequest | - RpcsListRequest | - TraceQueryRequest | - QueueRequest -} - -type ToServer struct { - body: ToServerBody -} - -# MARK: Message To Client - -type State data - -type Connection struct { - id: str - details: data -} - -type Init struct { - connections: list - state: optional - isStateEnabled: bool - rpcs: list - isDatabaseEnabled: bool - queueSize: uint -} - -type ConnectionsResponse struct { - rid: uint - connections: list -} - -type StateResponse struct { - rid: uint - state: optional - isStateEnabled: bool -} - -type ActionResponse struct { - rid: uint - output: data -} - -type TraceQueryResponse struct { - rid: uint - payload: data -} - -type QueueMessageSummary struct { - id: uint - name: str - createdAtMs: uint -} - -type QueueStatus struct { - size: uint - maxSize: uint - messages: list - truncated: bool -} - -type QueueResponse struct { - rid: uint - status: QueueStatus -} - -type StateUpdated struct { - state: State -} - -type QueueUpdated struct { - queueSize: uint -} - -type RpcsListResponse struct { - rid: uint - rpcs: list -} - -type ConnectionsUpdated struct { - connections: list -} - -type Error struct { - message: str -} - -type ToClientBody union { - StateResponse | - ConnectionsResponse | - ActionResponse | - ConnectionsUpdated | - QueueUpdated | - StateUpdated | - RpcsListResponse | - TraceQueryResponse | - QueueResponse | - Error | - Init -} - -type ToClient struct { - body: ToClientBody -} diff --git a/rivetkit-typescript/packages/rivetkit/schemas/persist/v1.bare b/rivetkit-typescript/packages/rivetkit/schemas/persist/v1.bare new file mode 100644 index 0000000000..ef9594aa50 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/persist/v1.bare @@ -0,0 +1,203 @@ +# Workflow Engine Persistence Schema v1 +# +# This schema defines the binary encoding for workflow engine persistence. +# Types marked with `data` are arbitrary binary blobs (for user-provided data). + +# Opaque user data (CBOR-encoded) +type Cbor data + +# MARK: Location +# Index into the entry name registry +type NameIndex u32 + +# Marker for a loop iteration in a location path +type LoopIterationMarker struct { + loop: NameIndex + iteration: u32 +} + +# A segment in a location path - either a name index or a loop iteration marker +type PathSegment union { + NameIndex | + LoopIterationMarker +} + +# Location identifies where an entry exists in the workflow execution tree +type Location list + +# MARK: Entry Status +type EntryStatus enum { + PENDING + RUNNING + COMPLETED + FAILED + EXHAUSTED +} + +# MARK: Sleep State +type SleepState enum { + PENDING + COMPLETED + INTERRUPTED +} + +# MARK: Branch Status +type BranchStatusType enum { + PENDING + RUNNING + COMPLETED + FAILED + CANCELLED +} + +# MARK: Step Entry +type StepEntry struct { + # Output value (CBOR-encoded arbitrary data) + output: optional + # Error message if step failed + error: optional +} + +# MARK: Loop Entry +type LoopEntry struct { + # Loop state (CBOR-encoded arbitrary data) + state: Cbor + # Current iteration number + iteration: u32 + # Output value if loop completed (CBOR-encoded arbitrary data) + output: optional +} + +# MARK: Sleep Entry +type SleepEntry struct { + # Deadline timestamp in milliseconds + deadline: u64 + # Current sleep state + state: SleepState +} + +# MARK: Message Entry + type MessageEntry struct { + # Message name + name: str + # Message data (CBOR-encoded arbitrary data) + messageData: Cbor + } + + # MARK: Rollback Checkpoint Entry + type RollbackCheckpointEntry struct { + # Checkpoint name + name: str + } + + # MARK: Branch Status + +type BranchStatus struct { + status: BranchStatusType + # Output value if completed (CBOR-encoded arbitrary data) + output: optional + # Error message if failed + error: optional +} + +# MARK: Join Entry +type JoinEntry struct { + # Map of branch name to status + branches: map +} + +# MARK: Race Entry +type RaceEntry struct { + # Name of the winning branch, or null if no winner yet + winner: optional + # Map of branch name to status + branches: map +} + +# MARK: Removed Entry +type RemovedEntry struct { + # Original entry type before removal + originalType: str + # Original entry name + originalName: optional +} + +# MARK: Entry Kind +# Type-specific entry data +type EntryKind union { + StepEntry | + LoopEntry | + SleepEntry | + MessageEntry | + RollbackCheckpointEntry | + JoinEntry | + RaceEntry | + RemovedEntry +} + +# MARK: Entry +# An entry in the workflow history +type Entry struct { + # Unique entry ID + id: str + # Location in the workflow tree + location: Location + # Entry kind and data + kind: EntryKind +} + +# MARK: Entry Metadata +# Metadata for an entry (stored separately, lazily loaded) +type EntryMetadata struct { + status: EntryStatus + # Error message if failed + error: optional + # Number of execution attempts + attempts: u32 + # Last attempt timestamp in milliseconds + lastAttemptAt: u64 + # Creation timestamp in milliseconds + createdAt: u64 + # Completion timestamp in milliseconds + completedAt: optional + # Rollback completion timestamp in milliseconds + rollbackCompletedAt: optional + # Rollback error message if failed + rollbackError: optional +} + +# MARK: Message +# A message in the queue +type Message struct { + # Unique message ID (used as KV key) + id: str + # Message name + name: str + # Message data (CBOR-encoded arbitrary data) + messageData: Cbor + # Timestamp when message was sent in milliseconds + sentAt: u64 +} + +# MARK: Workflow State +type WorkflowState enum { + PENDING + RUNNING + SLEEPING + FAILED + COMPLETED + ROLLING_BACK +} + +# MARK: Workflow Metadata +# Workflow-level metadata stored separately from entries +type WorkflowMetadata struct { + # Current workflow state + state: WorkflowState + # Workflow output if completed (CBOR-encoded arbitrary data) + output: optional + # Error message if failed + error: optional + # Workflow version hash for migration detection + version: optional +} diff --git a/rivetkit-typescript/packages/rivetkit/schemas/transport/v1.bare b/rivetkit-typescript/packages/rivetkit/schemas/transport/v1.bare new file mode 100644 index 0000000000..eb26957359 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/transport/v1.bare @@ -0,0 +1,175 @@ +# Workflow History Transport Schema v1 +# +# This schema defines the binary encoding for workflow history snapshots +# sent over the inspector. + +# Opaque user data (CBOR-encoded) +type WorkflowCbor data + +# MARK: Location +# Index into the entry name registry +type WorkflowNameIndex u32 + +# Marker for a loop iteration in a location path +type WorkflowLoopIterationMarker struct { + loop: WorkflowNameIndex + iteration: u32 +} + +# A segment in a location path - either a name index or a loop iteration marker +type WorkflowPathSegment union { + WorkflowNameIndex | + WorkflowLoopIterationMarker +} + +# Location identifies where an entry exists in the workflow execution tree +type WorkflowLocation list + +# MARK: Entry Status +type WorkflowEntryStatus enum { + PENDING + RUNNING + COMPLETED + FAILED + EXHAUSTED +} + +# MARK: Sleep State +type WorkflowSleepState enum { + PENDING + COMPLETED + INTERRUPTED +} + +# MARK: Branch Status +type WorkflowBranchStatusType enum { + PENDING + RUNNING + COMPLETED + FAILED + CANCELLED +} + +# MARK: Step Entry +type WorkflowStepEntry struct { + # Output value (CBOR-encoded arbitrary data) + output: optional + # Error message if step failed + error: optional +} + +# MARK: Loop Entry +type WorkflowLoopEntry struct { + # Loop state (CBOR-encoded arbitrary data) + state: WorkflowCbor + # Current iteration number + iteration: u32 + # Output value if loop completed (CBOR-encoded arbitrary data) + output: optional +} + +# MARK: Sleep Entry +type WorkflowSleepEntry struct { + # Deadline timestamp in milliseconds + deadline: u64 + # Current sleep state + state: WorkflowSleepState +} + +# MARK: Message Entry + type WorkflowMessageEntry struct { + # Message name + name: str + # Message data (CBOR-encoded arbitrary data) + messageData: WorkflowCbor + } + + # MARK: Rollback Checkpoint Entry + type WorkflowRollbackCheckpointEntry struct { + # Checkpoint name + name: str + } + + # MARK: Branch Status + +type WorkflowBranchStatus struct { + status: WorkflowBranchStatusType + # Output value if completed (CBOR-encoded arbitrary data) + output: optional + # Error message if failed + error: optional +} + +# MARK: Join Entry +type WorkflowJoinEntry struct { + # Map of branch name to status + branches: map +} + +# MARK: Race Entry +type WorkflowRaceEntry struct { + # Name of the winning branch, or null if no winner yet + winner: optional + # Map of branch name to status + branches: map +} + +# MARK: Removed Entry +type WorkflowRemovedEntry struct { + # Original entry type before removal + originalType: str + # Original entry name + originalName: optional +} + +# MARK: Entry Kind +# Type-specific entry data +type WorkflowEntryKind union { + WorkflowStepEntry | + WorkflowLoopEntry | + WorkflowSleepEntry | + WorkflowMessageEntry | + WorkflowRollbackCheckpointEntry | + WorkflowJoinEntry | + WorkflowRaceEntry | + WorkflowRemovedEntry +} + +# MARK: Entry +# An entry in the workflow history +type WorkflowEntry struct { + # Unique entry ID + id: str + # Location in the workflow tree + location: WorkflowLocation + # Entry kind and data + kind: WorkflowEntryKind +} + +# MARK: Entry Metadata +# Metadata for an entry (stored separately, lazily loaded) +type WorkflowEntryMetadata struct { + status: WorkflowEntryStatus + # Error message if failed + error: optional + # Number of execution attempts + attempts: u32 + # Last attempt timestamp in milliseconds + lastAttemptAt: u64 + # Creation timestamp in milliseconds + createdAt: u64 + # Completion timestamp in milliseconds + completedAt: optional + # Rollback completion timestamp in milliseconds + rollbackCompletedAt: optional + # Rollback error message if failed + rollbackError: optional +} + +# MARK: Workflow History +# A snapshot of workflow history for inspector transport. +type WorkflowHistory struct { + nameRegistry: list + entries: list + entryMetadata: map +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 99f1b14b9a..567dc6c30c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -42,6 +42,32 @@ const zFunction = < T extends (...args: any[]) => any = (...args: unknown[]) => unknown, >() => z.custom((val) => typeof val === "function"); +export type InspectorUnsubscribe = () => void; + +export interface WorkflowInspectorConfig { + getHistory: () => THistory | null; + onHistoryUpdated?: ( + listener: (history: THistory) => void, + ) => InspectorUnsubscribe; +} + +export interface RunInspectorConfig { + workflow?: WorkflowInspectorConfig; +} + +const WorkflowInspectorConfigSchema = z.object({ + getHistory: zFunction["getHistory"]>(), + onHistoryUpdated: zFunction< + NonNullable["onHistoryUpdated"]> + >().optional(), +}); + +const RunInspectorConfigSchema = z + .object({ + workflow: WorkflowInspectorConfigSchema.optional(), + }) + .optional(); + // Schema for run handler with metadata export const RunConfigSchema = z.object({ /** Display name for the actor in the Inspector UI. */ @@ -50,6 +76,8 @@ export const RunConfigSchema = z.object({ icon: z.string().optional(), /** The run handler function. */ run: zFunction(), + /** Inspector integration for long-running run handlers. */ + inspector: RunInspectorConfigSchema.optional(), }); export type RunConfig = z.infer; @@ -73,6 +101,14 @@ export function getRunMetadata( return { name: run.name, icon: run.icon }; } +/** Extract run inspector configuration if provided. */ +export function getRunInspectorConfig( + run: ((...args: any[]) => any) | RunConfig | undefined, +): RunInspectorConfig | undefined { + if (!run || typeof run === "function") return undefined; + return run.inspector; +} + // This schema is used to validate the input at runtime. The generic types are defined below in `ActorConfig`. // // We don't use Zod generics with `z.custom` because: diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts index d2d542846a..34235226c6 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts @@ -5,6 +5,7 @@ import { CONN_DRIVER_SYMBOL, CONN_STATE_MANAGER_SYMBOL, } from "@/actor/conn/mod"; +import { getRunInspectorConfig } from "@/actor/config"; import { ActionContext } from "@/actor/contexts/action"; import * as actorErrors from "@/actor/errors"; import type { AnyActorInstance } from "@/mod"; @@ -15,6 +16,7 @@ interface ActorInspectorEmitterEvents { stateUpdated: (state: unknown) => void; connectionsUpdated: () => void; queueUpdated: () => void; + workflowHistoryUpdated: (history: schema.WorkflowHistory) => void; } export type Connection = Omit & { @@ -29,9 +31,22 @@ export class ActorInspector { public readonly emitter = createNanoEvents(); #lastQueueSize = 0; + #workflowInspector?: NonNullable< + ReturnType + >["workflow"]; constructor(private readonly actor: AnyActorInstance) { this.#lastQueueSize = actor.queueManager?.size ?? 0; + const runInspector = getRunInspectorConfig(actor.config.run); + this.#workflowInspector = runInspector?.workflow; + if (this.#workflowInspector?.onHistoryUpdated) { + this.#workflowInspector.onHistoryUpdated((history) => { + this.emitter.emit( + "workflowHistoryUpdated", + history as schema.WorkflowHistory, + ); + }); + } } getQueueSize() { @@ -66,6 +81,18 @@ export class ActorInspector { this.emitter.emit("queueUpdated"); } + isWorkflowEnabled() { + return this.#workflowInspector !== undefined; + } + + getWorkflowHistory(): schema.WorkflowHistory | null { + if (!this.#workflowInspector) { + return null; + } + const history = this.#workflowInspector.getHistory(); + return (history ?? null) as schema.WorkflowHistory | null; + } + // actor accessor methods isDatabaseEnabled() { diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts index 932007195f..5b8cae8e7b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts @@ -36,6 +36,8 @@ export async function handleWebSocketInspectorConnect({ isStateEnabled: inspector.isStateEnabled(), isDatabaseEnabled: inspector.isDatabaseEnabled(), queueSize: BigInt(inspector.getQueueSize()), + workflowHistory: inspector.getWorkflowHistory(), + isWorkflowEnabled: inspector.isWorkflowEnabled(), }, }, }); @@ -67,6 +69,14 @@ export async function handleWebSocketInspectorConnect({ }, }); }), + inspector.emitter.on("workflowHistoryUpdated", (history) => { + sendMessage(ws, { + body: { + tag: "WorkflowHistoryUpdated", + val: { history }, + }, + }); + }), ); }, onMessage: async (evt: RivetMessageEvent, ws: WSContext) => { @@ -154,6 +164,18 @@ export async function handleWebSocketInspectorConnect({ }, }, }); + } else if (message.body.tag === "WorkflowHistoryRequest") { + sendMessage(ws, { + body: { + tag: "WorkflowHistoryResponse", + val: { + rid: message.body.val.id, + history: inspector.getWorkflowHistory(), + isWorkflowEnabled: + inspector.isWorkflowEnabled(), + }, + }, + }); } else { assertUnreachable(message.body); } diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/mod.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/mod.ts index 1ee3127821..642fd09037 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/mod.ts @@ -1,2 +1,4 @@ export * from "../schemas/actor-inspector/mod"; export * from "../schemas/actor-inspector/versioned"; +export { decodeWorkflowHistoryTransport, encodeWorkflowHistoryTransport } from "./transport"; +export type { WorkflowHistory as TransportWorkflowHistory } from "../schemas/transport/mod"; diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/transport.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/transport.ts new file mode 100644 index 0000000000..810441796a --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/transport.ts @@ -0,0 +1,18 @@ +import type { WorkflowHistory } from "@/schemas/transport/mod"; +import { + decodeWorkflowHistory, + encodeWorkflowHistory, +} from "@/schemas/transport/mod"; +import { bufferToArrayBuffer, toUint8Array } from "@/utils"; + +export function encodeWorkflowHistoryTransport( + history: WorkflowHistory, +): ArrayBuffer { + return bufferToArrayBuffer(encodeWorkflowHistory(history)); +} + +export function decodeWorkflowHistoryTransport( + data: ArrayBuffer | ArrayBufferView, +): WorkflowHistory { + return decodeWorkflowHistory(toUint8Array(data)); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts index 6797a7306b..87a57d3b2c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts @@ -1 +1 @@ -export * from "../../../dist/schemas/actor-inspector/v4"; +export * from "../../../dist/schemas/actor-inspector/v2"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts index 873074b074..73379b94f0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts @@ -2,14 +2,15 @@ import { createVersionedDataHandler } from "vbare"; import * as v1 from "../../../dist/schemas/actor-inspector/v1"; import * as v2 from "../../../dist/schemas/actor-inspector/v2"; -import * as v3 from "../../../dist/schemas/actor-inspector/v3"; -import * as v4 from "../../../dist/schemas/actor-inspector/v4"; -export const CURRENT_VERSION = 4; +export const CURRENT_VERSION = 2; const EVENTS_DROPPED_ERROR = "inspector.events_dropped"; +const WORKFLOW_HISTORY_DROPPED_ERROR = "inspector.workflow_history_dropped"; +const QUEUE_DROPPED_ERROR = "inspector.queue_dropped"; +const TRACE_DROPPED_ERROR = "inspector.trace_dropped"; -// Converter from v1 to v2: Add queueSize field to Init message +// Converter from v1 to v2: Drop events in Init and add new fields const v1ToClientToV2 = (v1Data: v1.ToClient): v2.ToClient => { if (v1Data.body.tag === "Init") { const init = v1Data.body.val as v1.Init; @@ -17,147 +18,128 @@ const v1ToClientToV2 = (v1Data: v1.ToClient): v2.ToClient => { body: { tag: "Init", val: { - ...init, + connections: init.connections, + state: init.state, + isStateEnabled: init.isStateEnabled, + rpcs: init.rpcs, + isDatabaseEnabled: init.isDatabaseEnabled, queueSize: 0n, + workflowHistory: null, + isWorkflowEnabled: false, }, }, }; } - return v1Data as unknown as v2.ToClient; -}; - -// Converter from v2 to v1: Remove queueSize field from Init, filter out QueueUpdated -const v2ToClientToV1 = (v2Data: v2.ToClient): v1.ToClient => { - if (v2Data.body.tag === "Init") { - const init = v2Data.body.val; - const { queueSize, ...rest } = init; + if ( + v1Data.body.tag === "EventsUpdated" || + v1Data.body.tag === "EventsResponse" + ) { return { body: { - tag: "Init", - val: rest, + tag: "Error", + val: { + message: EVENTS_DROPPED_ERROR, + }, }, }; } - // QueueUpdated doesn't exist in v1, so we can't convert it - if (v2Data.body.tag === "QueueUpdated") { - throw new Error("Cannot convert QueueUpdated to v1"); - } - return v2Data as unknown as v1.ToClient; + return v1Data as unknown as v2.ToClient; }; -// Converter from v2 to v3: Remove events from Init, drop event updates -const v2ToClientToV3 = (v2Data: v2.ToClient): v3.ToClient => { +// Converter from v2 to v1: Add empty events to Init, drop newer updates +const v2ToClientToV1 = (v2Data: v2.ToClient): v1.ToClient => { if (v2Data.body.tag === "Init") { const init = v2Data.body.val; - const { events, ...rest } = init; return { body: { tag: "Init", - val: rest, + val: { + connections: init.connections, + events: [], + state: init.state, + isStateEnabled: init.isStateEnabled, + rpcs: init.rpcs, + isDatabaseEnabled: init.isDatabaseEnabled, + }, }, }; } if ( - v2Data.body.tag === "EventsUpdated" || - v2Data.body.tag === "EventsResponse" + v2Data.body.tag === "WorkflowHistoryUpdated" || + v2Data.body.tag === "WorkflowHistoryResponse" ) { return { body: { tag: "Error", val: { - message: EVENTS_DROPPED_ERROR, + message: WORKFLOW_HISTORY_DROPPED_ERROR, }, }, }; } - return v2Data as unknown as v3.ToClient; -}; - -// Converter from v3 to v2: Add empty events to Init, drop TraceQueryResponse -const v3ToClientToV2 = (v3Data: v3.ToClient): v2.ToClient => { - if (v3Data.body.tag === "Init") { - const init = v3Data.body.val; + if (v2Data.body.tag === "QueueUpdated") { return { body: { - tag: "Init", + tag: "Error", val: { - ...init, - events: [], + message: QUEUE_DROPPED_ERROR, }, }, }; } - if (v3Data.body.tag === "TraceQueryResponse") { - throw new Error("Cannot convert TraceQueryResponse to v2"); + if (v2Data.body.tag === "QueueResponse") { + return { + body: { + tag: "Error", + val: { + message: QUEUE_DROPPED_ERROR, + }, + }, + }; } - return v3Data as unknown as v2.ToClient; -}; - -// Converter from v3 to v4: No changes to client structure -const v3ToClientToV4 = (v3Data: v3.ToClient): v4.ToClient => { - return v3Data as unknown as v4.ToClient; -}; - -// Converter from v4 to v3: Drop queue responses -const v4ToClientToV3 = (v4Data: v4.ToClient): v3.ToClient => { - if (v4Data.body.tag === "QueueResponse") { - throw new Error("Cannot convert QueueResponse to v3"); + if (v2Data.body.tag === "TraceQueryResponse") { + return { + body: { + tag: "Error", + val: { + message: TRACE_DROPPED_ERROR, + }, + }, + }; } - return v4Data as unknown as v3.ToClient; + return v2Data as unknown as v1.ToClient; }; -// ToServer is identical between v1 and v2 +// Converter from v1 to v2: Drop events requests const v1ToServerToV2 = (v1Data: v1.ToServer): v2.ToServer => { + if ( + v1Data.body.tag === "EventsRequest" || + v1Data.body.tag === "ClearEventsRequest" + ) { + throw new Error("Cannot convert events requests to v2"); + } return v1Data as unknown as v2.ToServer; }; +// Converter from v2 to v1: Drop newer requests const v2ToServerToV1 = (v2Data: v2.ToServer): v1.ToServer => { - return v2Data as unknown as v1.ToServer; -}; - -// Converter from v2 to v3: Drop events requests -const v2ToServerToV3 = (v2Data: v2.ToServer): v3.ToServer => { if ( - v2Data.body.tag === "EventsRequest" || - v2Data.body.tag === "ClearEventsRequest" + v2Data.body.tag === "TraceQueryRequest" || + v2Data.body.tag === "QueueRequest" || + v2Data.body.tag === "WorkflowHistoryRequest" ) { - throw new Error("Cannot convert events requests to v3"); + throw new Error("Cannot convert v2-only requests to v1"); } - return v2Data as unknown as v3.ToServer; -}; - -// Converter from v3 to v2: Drop trace query -const v3ToServerToV2 = (v3Data: v3.ToServer): v2.ToServer => { - if (v3Data.body.tag === "TraceQueryRequest") { - throw new Error("Cannot convert TraceQueryRequest to v2"); - } - return v3Data as unknown as v2.ToServer; -}; - -// Converter from v3 to v4: No changes to server structure -const v3ToServerToV4 = (v3Data: v3.ToServer): v4.ToServer => { - return v3Data as unknown as v4.ToServer; -}; - -// Converter from v4 to v3: Drop queue request -const v4ToServerToV3 = (v4Data: v4.ToServer): v3.ToServer => { - if (v4Data.body.tag === "QueueRequest") { - throw new Error("Cannot convert QueueRequest to v3"); - } - return v4Data as unknown as v3.ToServer; + return v2Data as unknown as v1.ToServer; }; -export const TO_SERVER_VERSIONED = createVersionedDataHandler({ +export const TO_SERVER_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: return v1.encodeToServer(data as v1.ToServer); case 2: return v2.encodeToServer(data as v2.ToServer); - case 3: - return v3.encodeToServer(data); - case 4: - return v4.encodeToServer(data); default: throw new Error(`Unknown version ${version}`); } @@ -168,29 +150,21 @@ export const TO_SERVER_VERSIONED = createVersionedDataHandler({ return v1.decodeToServer(bytes); case 2: return v2.decodeToServer(bytes); - case 3: - return v3.decodeToServer(bytes); - case 4: - return v4.decodeToServer(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToServerToV2, v2ToServerToV3, v3ToServerToV4], - serializeConverters: () => [v4ToServerToV3, v3ToServerToV2, v2ToServerToV1], + deserializeConverters: () => [v1ToServerToV2], + serializeConverters: () => [v2ToServerToV1], }); -export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ +export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: return v1.encodeToClient(data as v1.ToClient); case 2: return v2.encodeToClient(data as v2.ToClient); - case 3: - return v3.encodeToClient(data); - case 4: - return v4.encodeToClient(data); default: throw new Error(`Unknown version ${version}`); } @@ -201,14 +175,10 @@ export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ return v1.decodeToClient(bytes); case 2: return v2.decodeToClient(bytes); - case 3: - return v3.decodeToClient(bytes); - case 4: - return v4.decodeToClient(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToClientToV2, v2ToClientToV3, v3ToClientToV4], - serializeConverters: () => [v4ToClientToV3, v3ToClientToV2, v2ToClientToV1], + deserializeConverters: () => [v1ToClientToV2], + serializeConverters: () => [v2ToClientToV1], }); diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/persist/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/persist/mod.ts new file mode 100644 index 0000000000..0a5768dc74 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/persist/mod.ts @@ -0,0 +1 @@ +export * from "../../../dist/schemas/persist/v1"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/transport/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/transport/mod.ts new file mode 100644 index 0000000000..cd0bd2d2bc --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/transport/mod.ts @@ -0,0 +1 @@ +export * from "../../../dist/schemas/transport/v1"; diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/inspector.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/inspector.ts new file mode 100644 index 0000000000..8010073cd9 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/inspector.ts @@ -0,0 +1,268 @@ +import * as cbor from "cbor-x"; +import { createNanoEvents } from "nanoevents"; +import type { + BranchStatus, + BranchStatusType, + EntryKind, + EntryStatus, + Location, + SleepState, + WorkflowHistoryEntry, + WorkflowHistorySnapshot, + WorkflowEntryMetadataSnapshot, +} from "@rivetkit/workflow-engine"; +import { encodeWorkflowHistoryTransport } from "@/inspector/transport"; +import type * as inspectorSchema from "@/schemas/actor-inspector/mod"; +import * as transport from "@/schemas/transport/mod"; +import { assertUnreachable, bufferToArrayBuffer } from "@/utils"; + +export interface WorkflowInspectorAdapter { + getHistory: () => inspectorSchema.WorkflowHistory | null; + onHistoryUpdated: ( + listener: (history: inspectorSchema.WorkflowHistory) => void, + ) => () => void; +} + +export function createWorkflowInspectorAdapter(): { + adapter: WorkflowInspectorAdapter; + update: (snapshot: WorkflowHistorySnapshot) => void; +} { + const emitter = createNanoEvents<{ + updated: (history: inspectorSchema.WorkflowHistory) => void; + }>(); + let history: inspectorSchema.WorkflowHistory | null = null; + + const adapter: WorkflowInspectorAdapter = { + getHistory: () => history, + onHistoryUpdated: (listener) => emitter.on("updated", listener), + }; + + const update = (snapshot: WorkflowHistorySnapshot) => { + const transportHistory = toWorkflowHistory(snapshot); + const next = encodeWorkflowHistoryTransport(transportHistory); + history = next; + emitter.emit("updated", next); + }; + + return { adapter, update }; +} + +function encodeCbor(value: unknown): ArrayBuffer { + return bufferToArrayBuffer(cbor.encode(value)); +} + +function encodeOptionalCbor(value: unknown): ArrayBuffer | null { + if (value === undefined) { + return null; + } + return encodeCbor(value); +} + +function toU64(value: number): bigint { + return BigInt(Math.max(0, Math.floor(value))); +} + +function toWorkflowLocation( + location: Location, +): transport.WorkflowLocation { + return location.map((segment) => { + if (typeof segment === "number") { + return { tag: "WorkflowNameIndex", val: segment }; + } + return { + tag: "WorkflowLoopIterationMarker", + val: { + loop: segment.loop, + iteration: segment.iteration, + }, + }; + }); +} + +function toWorkflowEntryKind( + kind: EntryKind, +): transport.WorkflowEntryKind { + switch (kind.type) { + case "step": + return { + tag: "WorkflowStepEntry", + val: { + output: encodeOptionalCbor(kind.data.output), + error: kind.data.error ?? null, + }, + }; + case "loop": + return { + tag: "WorkflowLoopEntry", + val: { + state: encodeCbor(kind.data.state), + iteration: kind.data.iteration, + output: encodeOptionalCbor(kind.data.output), + }, + }; + case "sleep": + return { + tag: "WorkflowSleepEntry", + val: { + deadline: toU64(kind.data.deadline), + state: toWorkflowSleepState(kind.data.state), + }, + }; + case "message": + return { + tag: "WorkflowMessageEntry", + val: { + name: kind.data.name, + messageData: encodeCbor(kind.data.data), + }, + }; + case "rollback_checkpoint": + return { + tag: "WorkflowRollbackCheckpointEntry", + val: { name: kind.data.name }, + }; + case "join": + return { + tag: "WorkflowJoinEntry", + val: { branches: toWorkflowBranchStatusMap(kind.data.branches) }, + }; + case "race": + return { + tag: "WorkflowRaceEntry", + val: { + winner: kind.data.winner ?? null, + branches: toWorkflowBranchStatusMap(kind.data.branches), + }, + }; + case "removed": + return { + tag: "WorkflowRemovedEntry", + val: { + originalType: kind.data.originalType, + originalName: kind.data.originalName ?? null, + }, + }; + default: + assertUnreachable(kind as never); + } +} + +function toWorkflowEntry( + entry: WorkflowHistoryEntry, +): transport.WorkflowEntry { + return { + id: entry.id, + location: toWorkflowLocation(entry.location), + kind: toWorkflowEntryKind(entry.kind), + }; +} + +function toWorkflowEntryStatus( + status: EntryStatus, +): transport.WorkflowEntryStatus { + switch (status) { + case "pending": + return transport.WorkflowEntryStatus.PENDING; + case "running": + return transport.WorkflowEntryStatus.RUNNING; + case "completed": + return transport.WorkflowEntryStatus.COMPLETED; + case "failed": + return transport.WorkflowEntryStatus.FAILED; + case "exhausted": + return transport.WorkflowEntryStatus.EXHAUSTED; + default: + assertUnreachable(status as never); + } +} + +function toWorkflowSleepState( + state: SleepState, +): transport.WorkflowSleepState { + switch (state) { + case "pending": + return transport.WorkflowSleepState.PENDING; + case "completed": + return transport.WorkflowSleepState.COMPLETED; + case "interrupted": + return transport.WorkflowSleepState.INTERRUPTED; + default: + assertUnreachable(state as never); + } +} + +function toWorkflowBranchStatusType( + status: BranchStatusType, +): transport.WorkflowBranchStatusType { + switch (status) { + case "pending": + return transport.WorkflowBranchStatusType.PENDING; + case "running": + return transport.WorkflowBranchStatusType.RUNNING; + case "completed": + return transport.WorkflowBranchStatusType.COMPLETED; + case "failed": + return transport.WorkflowBranchStatusType.FAILED; + case "cancelled": + return transport.WorkflowBranchStatusType.CANCELLED; + default: + assertUnreachable(status as never); + } +} + +function toWorkflowBranchStatus( + status: BranchStatus, +): transport.WorkflowBranchStatus { + return { + status: toWorkflowBranchStatusType(status.status), + output: encodeOptionalCbor(status.output), + error: status.error ?? null, + }; +} + +function toWorkflowBranchStatusMap( + branches: Record, +): ReadonlyMap { + return new Map( + Object.entries(branches).map(([name, status]) => [ + name, + toWorkflowBranchStatus(status), + ]), + ); +} + +function toWorkflowEntryMetadata( + metadata: WorkflowEntryMetadataSnapshot, +): transport.WorkflowEntryMetadata { + return { + status: toWorkflowEntryStatus(metadata.status), + error: metadata.error ?? null, + attempts: metadata.attempts, + lastAttemptAt: toU64(metadata.lastAttemptAt), + createdAt: toU64(metadata.createdAt), + completedAt: + metadata.completedAt === undefined + ? null + : toU64(metadata.completedAt), + rollbackCompletedAt: + metadata.rollbackCompletedAt === undefined + ? null + : toU64(metadata.rollbackCompletedAt), + rollbackError: metadata.rollbackError ?? null, + }; +} + +function toWorkflowHistory( + snapshot: WorkflowHistorySnapshot, +): transport.WorkflowHistory { + const entryMetadata = new Map(); + for (const [id, metadata] of snapshot.entryMetadata) { + entryMetadata.set(id, toWorkflowEntryMetadata(metadata)); + } + + return { + nameRegistry: snapshot.nameRegistry, + entries: snapshot.entries.map((entry) => toWorkflowEntry(entry)), + entryMetadata, + }; +} diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts index 8ba51c3329..75bb637e28 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts @@ -8,6 +8,7 @@ import { runWorkflow } from "@rivetkit/workflow-engine"; import invariant from "invariant"; import { ActorWorkflowContext } from "./context"; import { ActorWorkflowDriver, workflowQueueName } from "./driver"; +import { createWorkflowInspectorAdapter } from "./inspector"; export { Loop } from "@rivetkit/workflow-engine"; export { workflowQueueName } from "./driver"; @@ -32,6 +33,8 @@ export function workflow< >, ) => Promise, ): RunConfig { + const workflowInspector = createWorkflowInspectorAdapter(); + async function run( runCtx: RunContext< TState, @@ -56,7 +59,11 @@ export function workflow< async (ctx) => await fn(new ActorWorkflowContext(ctx, runCtx)), undefined, driver, - { mode: "live", logger: runCtx.log }, + { + mode: "live", + logger: runCtx.log, + onHistoryUpdated: workflowInspector.update, + }, ); runCtx.abortSignal.addEventListener( @@ -87,6 +94,7 @@ export function workflow< return { icon: "diagram-project", - run, + run: run as RunConfig["run"], + inspector: { workflow: workflowInspector.adapter }, }; } diff --git a/rivetkit-typescript/packages/rivetkit/tsconfig.json b/rivetkit-typescript/packages/rivetkit/tsconfig.json index f1223270fc..ed8967e718 100644 --- a/rivetkit-typescript/packages/rivetkit/tsconfig.json +++ b/rivetkit-typescript/packages/rivetkit/tsconfig.json @@ -5,6 +5,8 @@ "resolveJsonModule": true, "paths": { "@/*": ["./src/*"], + "@rivetkit/workflow-engine": ["../workflow-engine/src/index.ts"], + "@rivetkit/traces": ["../traces/src/index.ts"], // Used for test fixtures "rivetkit": ["./src/mod.ts"], "rivetkit/utils": ["./src/utils.ts"] diff --git a/rivetkit-typescript/packages/workflow-engine/CLAUDE.md b/rivetkit-typescript/packages/workflow-engine/CLAUDE.md new file mode 100644 index 0000000000..007a7a3387 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/CLAUDE.md @@ -0,0 +1,13 @@ +# Workflow Engine Guidance + +## Persist Schema Sync + +The workflow engine persistence schema is duplicated in RivetKit for inspector transport. +When updating `schemas/v1.bare` in this package, you must update the mirror at: + +- `rivetkit-typescript/packages/rivetkit/schemas/persist/v1.bare` + +After updating both, rebuild schemas: + +- `pnpm -C rivetkit-typescript/packages/workflow-engine run compile:bare` +- `pnpm -C rivetkit-typescript/packages/rivetkit run build:schema` diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts index 2c03d54e7a..1fc0ae6cb4 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/context.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -109,6 +109,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { private rollbackCheckpointSet: boolean; /** Track names used in current execution to detect duplicates */ private usedNamesInExecution = new Set(); + private historyNotifier?: () => void; private logger?: Logger; constructor( @@ -121,6 +122,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { mode: "forward" | "rollback" = "forward", rollbackActions?: RollbackAction[], rollbackCheckpointSet = false, + historyNotifier?: () => void, logger?: Logger, ) { this.currentLocation = location; @@ -128,6 +130,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { this.mode = mode; this.rollbackActions = rollbackActions; this.rollbackCheckpointSet = rollbackCheckpointSet; + this.historyNotifier = historyNotifier; this.logger = logger; } @@ -151,6 +154,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface { } } + private async flushStorage(): Promise { + await flush(this.storage, this.driver, this.historyNotifier); + } + /** * Create a new branch context for parallel/nested execution. */ @@ -168,6 +175,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { this.mode, this.rollbackActions, this.rollbackCheckpointSet, + this.historyNotifier, this.logger, ); } @@ -441,7 +449,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { // is to batch writes, not to avoid persistence entirely. if (!config.ephemeral) { this.log("debug", { msg: "flushing step", step: config.name, key }); - await flush(this.storage, this.driver); + await this.flushStorage(); } this.log("debug", { msg: "step completed", step: config.name, key }); @@ -454,7 +462,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { } entry.dirty = true; metadata.status = "exhausted"; - await flush(this.storage, this.driver); + await this.flushStorage(); throw new CriticalError(error.message); } @@ -467,7 +475,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { } entry.dirty = true; metadata.status = "exhausted"; - await flush(this.storage, this.driver); + await this.flushStorage(); throw error; } @@ -477,7 +485,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { entry.dirty = true; metadata.status = "failed"; - await flush(this.storage, this.driver); + await this.flushStorage(); throw new StepFailedError(config.name, error, metadata.attempts); } @@ -695,7 +703,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { } entry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); await this.forgetOldIterations( location, iteration + 1, @@ -726,7 +734,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { } entry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); await this.forgetOldIterations( location, iteration, @@ -781,6 +789,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { this.storage, this.driver, iterationLocation, + this.historyNotifier, ); } } @@ -848,7 +857,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { }); setEntry(this.storage, location, entry); entry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); } const now = Date.now(); @@ -860,7 +869,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { entry.kind.data.state = "completed"; } entry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); return; } @@ -874,7 +883,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { entry.kind.data.state = "completed"; } entry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); return; } @@ -927,7 +936,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { }); setEntry(this.storage, location, entry); entry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); this.rollbackCheckpointSet = true; } @@ -1045,7 +1054,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { }); setEntry(this.storage, countLocation, countEntry); - await flush(this.storage, this.driver); + await this.flushStorage(); return messages.map((message) => message.data as T); } @@ -1147,7 +1156,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { }); setEntry(this.storage, sleepLocation, sleepEntry); sleepEntry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); } const now = Date.now(); @@ -1173,7 +1182,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { data: { name: messageName, data: message.data }, }); setEntry(this.storage, messageLocation, messageEntry); - await flush(this.storage, this.driver); + await this.flushStorage(); return message.data as T; } @@ -1182,7 +1191,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { sleepEntry.kind.data.state = "completed"; } sleepEntry.dirty = true; - await flush(this.storage, this.driver); + await this.flushStorage(); return null; } @@ -1204,7 +1213,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { data: { name: messageName, data: message.data }, }); setEntry(this.storage, messageLocation, messageEntry); - await flush(this.storage, this.driver); + await this.flushStorage(); return message.data as T; } @@ -1272,7 +1281,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { setEntry(this.storage, sleepLocation, sleepEntry); sleepEntry.dirty = true; // Flush immediately to persist deadline before potential SleepError - await flush(this.storage, this.driver); + await this.flushStorage(); } return this.executeListenNUntilImpl( @@ -1407,7 +1416,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { }); setEntry(this.storage, countLocation, countEntry); - await flush(this.storage, this.driver); + await this.flushStorage(); return results; } @@ -1469,7 +1478,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { setEntry(this.storage, location, entry); entry.dirty = true; // Flush immediately to persist entry before branches execute - await flush(this.storage, this.driver); + await this.flushStorage(); } if (entry.kind.type !== "join") { @@ -1533,7 +1542,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { // Wait for ALL branches (no short-circuit on error) await Promise.allSettled(branchPromises); - await flush(this.storage, this.driver); + await this.flushStorage(); // Throw if any branches failed if (Object.keys(errors).length > 0) { @@ -1620,7 +1629,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { setEntry(this.storage, location, entry); entry.dirty = true; // Flush immediately to persist entry before branches execute - await flush(this.storage, this.driver); + await this.flushStorage(); } if (entry.kind.type !== "race") { @@ -1788,7 +1797,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { // If any branch needs to yield to the scheduler (sleep/message wait), // save state and re-throw the error to exit the workflow execution if (yieldError && !settled) { - await flush(this.storage, this.driver); + await this.flushStorage(); throw yieldError; } @@ -1805,13 +1814,14 @@ export class WorkflowContextImpl implements WorkflowContextInterface { this.storage, this.driver, branchLocation, + this.historyNotifier, ); } } } // Flush final state - await flush(this.storage, this.driver); + await this.flushStorage(); // Log late errors if any (these occurred after a winner was determined) if (lateErrors.length > 0) { @@ -1887,6 +1897,6 @@ export class WorkflowContextImpl implements WorkflowContextInterface { data: { originalType, originalName: name }, }); setEntry(this.storage, location, entry); - await flush(this.storage, this.driver); + await this.flushStorage(); } } diff --git a/rivetkit-typescript/packages/workflow-engine/src/index.ts b/rivetkit-typescript/packages/workflow-engine/src/index.ts index 62eb0a8c27..5e158f0016 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/index.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/index.ts @@ -52,6 +52,7 @@ export { consumeMessage, consumeMessages, createEntry, + createHistorySnapshot, createStorage, deleteEntriesWithPrefix, flush, @@ -66,6 +67,7 @@ export type { BranchConfig, BranchOutput, BranchStatus, + BranchStatusType, Entry, EntryKind, EntryKindType, @@ -84,6 +86,9 @@ export type { PathSegment, RaceEntry, RemovedEntry, + WorkflowEntryMetadataSnapshot, + WorkflowHistoryEntry, + WorkflowHistorySnapshot, RollbackCheckpointEntry, RollbackContextInterface, RunWorkflowOptions, @@ -142,6 +147,7 @@ import { } from "./keys.js"; import { createDefaultMessageDriver, + createHistorySnapshot, flush, generateId, loadMetadata, @@ -151,6 +157,7 @@ import type { RollbackContextInterface, RunWorkflowOptions, Storage, + WorkflowHistorySnapshot, WorkflowFunction, WorkflowHandle, WorkflowMessageDriver, @@ -177,6 +184,8 @@ interface LiveRuntime { isSleeping: boolean; } +type HistoryNotifier = (() => void) | undefined; + function createLiveRuntime(): LiveRuntime { return { pendingMessageNames: [], @@ -308,6 +317,7 @@ async function executeRollback( messageDriver: WorkflowMessageDriver, abortController: AbortController, storage: Storage, + historyNotifier?: HistoryNotifier, logger?: Logger, ): Promise { const rollbackActions: RollbackAction[] = []; @@ -321,6 +331,7 @@ async function executeRollback( "rollback", rollbackActions, false, + historyNotifier, logger, ); @@ -370,7 +381,7 @@ async function executeRollback( throw error; } finally { metadata.dirty = true; - await flush(storage, driver); + await flush(storage, driver, historyNotifier); } } } @@ -381,9 +392,10 @@ async function setSleepState( workflowId: string, deadline: number, messageNames?: string[], + historyNotifier?: HistoryNotifier, ): Promise> { storage.state = "sleeping"; - await flush(storage, driver); + await flush(storage, driver, historyNotifier); await driver.setAlarm(workflowId, deadline); return { @@ -397,9 +409,10 @@ async function setMessageWaitState( storage: Storage, driver: EngineDriver, messageNames: string[], + historyNotifier?: HistoryNotifier, ): Promise> { storage.state = "sleeping"; - await flush(storage, driver); + await flush(storage, driver, historyNotifier); return { state: "sleeping", waitingForMessages: messageNames }; } @@ -407,8 +420,9 @@ async function setMessageWaitState( async function setEvictedState( storage: Storage, driver: EngineDriver, + historyNotifier?: HistoryNotifier, ): Promise> { - await flush(storage, driver); + await flush(storage, driver, historyNotifier); return { state: storage.state }; } @@ -416,9 +430,10 @@ async function setRetryState( storage: Storage, driver: EngineDriver, workflowId: string, + historyNotifier?: HistoryNotifier, ): Promise> { storage.state = "sleeping"; - await flush(storage, driver); + await flush(storage, driver, historyNotifier); const retryAt = Date.now() + 100; await driver.setAlarm(workflowId, retryAt); @@ -430,10 +445,11 @@ async function setFailedState( storage: Storage, driver: EngineDriver, error: unknown, + historyNotifier?: HistoryNotifier, ): Promise { storage.state = "failed"; storage.error = extractErrorInfo(error); - await flush(storage, driver); + await flush(storage, driver, historyNotifier); } async function waitForSleep( @@ -486,6 +502,7 @@ async function executeLiveWorkflow( messageDriver: WorkflowMessageDriver, abortController: AbortController, runtime: LiveRuntime, + onHistoryUpdated?: (history: WorkflowHistorySnapshot) => void, logger?: Logger, ): Promise> { let lastResult: WorkflowResult | undefined; @@ -498,6 +515,7 @@ async function executeLiveWorkflow( driver, messageDriver, abortController, + onHistoryUpdated, logger, ); lastResult = result; @@ -612,6 +630,7 @@ export function runWorkflow( messageDriver, abortController, liveRuntime, + options.onHistoryUpdated, logger, ) : executeWorkflow( @@ -621,6 +640,7 @@ export function runWorkflow( driver, messageDriver, abortController, + options.onHistoryUpdated, logger, ); @@ -751,9 +771,16 @@ async function executeWorkflow( driver: EngineDriver, messageDriver: WorkflowMessageDriver, abortController: AbortController, + onHistoryUpdated?: (history: WorkflowHistorySnapshot) => void, logger?: Logger, ): Promise> { const storage = await loadStorage(driver, messageDriver); + const historyNotifier: HistoryNotifier = onHistoryUpdated + ? () => onHistoryUpdated(createHistorySnapshot(storage)) + : undefined; + if (historyNotifier) { + historyNotifier(); + } if (logger) { const entryKeys = Array.from(storage.history.entries.keys()); @@ -797,6 +824,7 @@ async function executeWorkflow( messageDriver, abortController, storage, + historyNotifier, logger, ); } catch (error) { @@ -807,7 +835,7 @@ async function executeWorkflow( } storage.state = "failed"; - await flush(storage, driver); + await flush(storage, driver, historyNotifier); const storedError = storage.error ? new Error(storage.error.message) @@ -828,6 +856,7 @@ async function executeWorkflow( "forward", undefined, false, + historyNotifier, logger, ); @@ -838,7 +867,7 @@ async function executeWorkflow( storage.state = "completed"; storage.output = output; - await flush(storage, driver); + await flush(storage, driver, historyNotifier); await driver.clearAlarm(workflowId); return { state: "completed", output }; @@ -850,6 +879,7 @@ async function executeWorkflow( workflowId, error.deadline, error.messageNames, + historyNotifier, ); } @@ -858,38 +888,45 @@ async function executeWorkflow( storage, driver, error.messageNames, + historyNotifier, ); } if (error instanceof EvictedError) { - return await setEvictedState(storage, driver); + return await setEvictedState(storage, driver, historyNotifier); } if (error instanceof StepFailedError) { - return await setRetryState(storage, driver, workflowId); + return await setRetryState( + storage, + driver, + workflowId, + historyNotifier, + ); } if (error instanceof RollbackCheckpointError) { - await setFailedState(storage, driver, error); + await setFailedState(storage, driver, error, historyNotifier); throw error; } // Unrecoverable error storage.error = extractErrorInfo(error); - storage.state = "rolling_back"; - await flush(storage, driver); + storage.state = "rolling_back"; + await flush(storage, driver, historyNotifier); - try { - await executeRollback( - workflowId, - workflowFn, - effectiveInput, - driver, - messageDriver, - abortController, - storage, - logger, - ); + try { + await executeRollback( + workflowId, + workflowFn, + effectiveInput, + driver, + messageDriver, + abortController, + storage, + historyNotifier, + logger, + ); } catch (rollbackError) { if (rollbackError instanceof EvictedError) { return { state: storage.state }; @@ -898,7 +935,7 @@ async function executeWorkflow( } storage.state = "failed"; - await flush(storage, driver); + await flush(storage, driver, historyNotifier); throw error; } diff --git a/rivetkit-typescript/packages/workflow-engine/src/storage.ts b/rivetkit-typescript/packages/workflow-engine/src/storage.ts index 1cea069d30..c981e3e0b8 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/storage.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/storage.ts @@ -38,6 +38,9 @@ import type { Location, Message, Storage, + WorkflowEntryMetadataSnapshot, + WorkflowHistoryEntry, + WorkflowHistorySnapshot, WorkflowMessageDriver, } from "./types.js"; @@ -60,6 +63,34 @@ export function createStorage(): Storage { }; } +/** + * Create a snapshot of workflow history for observers. + */ +export function createHistorySnapshot( + storage: Storage, +): WorkflowHistorySnapshot { + const entryMetadata = new Map(); + for (const [id, metadata] of storage.entryMetadata) { + const { dirty, ...rest } = metadata; + entryMetadata.set(id, rest); + } + + const entries: WorkflowHistoryEntry[] = []; + const entryKeys = Array.from(storage.history.entries.keys()).sort(); + for (const key of entryKeys) { + const entry = storage.history.entries.get(key); + if (!entry) continue; + const { dirty, ...rest } = entry; + entries.push(rest); + } + + return { + nameRegistry: [...storage.nameRegistry], + entries, + entryMetadata, + }; +} + /** * Generate a UUID v4. */ @@ -234,8 +265,10 @@ export async function loadMetadata( export async function flush( storage: Storage, driver: EngineDriver, + onHistoryUpdated?: () => void, ): Promise { const writes: KVWrite[] = []; + let historyUpdated = false; // Flush only new names (those added since last flush) for ( @@ -249,6 +282,7 @@ export async function flush( key: buildNameKey(i), value: serializeName(name), }); + historyUpdated = true; } } @@ -260,6 +294,7 @@ export async function flush( value: serializeEntry(entry), }); entry.dirty = false; + historyUpdated = true; } } @@ -271,6 +306,7 @@ export async function flush( value: serializeEntryMetadata(metadata), }); metadata.dirty = false; + historyUpdated = true; } } @@ -315,6 +351,10 @@ export async function flush( storage.flushedState = storage.state; storage.flushedOutput = storage.output; storage.flushedError = storage.error; + + if (historyUpdated && onHistoryUpdated) { + onHistoryUpdated(); + } } /** @@ -413,6 +453,7 @@ export async function deleteEntriesWithPrefix( storage: Storage, driver: EngineDriver, prefixLocation: Location, + onHistoryUpdated?: () => void, ): Promise { // Collect entry IDs for metadata cleanup const entryIds: string[] = []; @@ -434,6 +475,10 @@ export async function deleteEntriesWithPrefix( await Promise.all( entryIds.map((id) => driver.delete(buildEntryMetadataKey(id))), ); + + if (entryIds.length > 0 && onHistoryUpdated) { + onHistoryUpdated(); + } } /** diff --git a/rivetkit-typescript/packages/workflow-engine/src/types.ts b/rivetkit-typescript/packages/workflow-engine/src/types.ts index 2d7db85282..c21fd04821 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/types.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/types.ts @@ -204,6 +204,25 @@ export interface History { entries: Map; } +/** + * History entry snapshot without internal dirty flags. + */ +export type WorkflowHistoryEntry = Omit; + +/** + * Entry metadata snapshot without internal dirty flags. + */ +export type WorkflowEntryMetadataSnapshot = Omit; + +/** + * Snapshot of workflow history for observers. + */ +export interface WorkflowHistorySnapshot { + nameRegistry: string[]; + entries: WorkflowHistoryEntry[]; + entryMetadata: ReadonlyMap; +} + /** * Structured error information for workflow failures. */ @@ -382,6 +401,7 @@ export type WorkflowRunMode = "yield" | "live"; export interface RunWorkflowOptions { mode?: WorkflowRunMode; logger?: Logger; + onHistoryUpdated?: (history: WorkflowHistorySnapshot) => void; } export type WorkflowFunction = ( diff --git a/rivetkit-typescript/packages/workflow-engine/tests/history-updates.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/history-updates.test.ts new file mode 100644 index 0000000000..93de7e48a8 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/tests/history-updates.test.ts @@ -0,0 +1,54 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { + InMemoryDriver, + runWorkflow, + type WorkflowContextInterface, + type WorkflowHistorySnapshot, +} from "../src/testing.js"; + +const modes = ["yield", "live"] as const; + +for (const mode of modes) { + describe(`Workflow History Updates (${mode})`, { sequential: true }, () => { + let driver: InMemoryDriver; + + beforeEach(() => { + driver = new InMemoryDriver(); + driver.latency = 0; + }); + + it("emits history snapshots when entries change", async () => { + const updates: WorkflowHistorySnapshot[] = []; + + const workflow = async (ctx: WorkflowContextInterface) => { + await ctx.step("hello", async () => "world"); + }; + + const result = await runWorkflow( + "wf-history-1", + workflow, + undefined, + driver, + { + mode, + onHistoryUpdated: (snapshot) => { + updates.push(snapshot); + }, + }, + ).result; + + expect(result.state).toBe("completed"); + expect(updates.length).toBeGreaterThan(0); + + const last = updates[updates.length - 1]; + expect( + last.entries.some((entry) => entry.kind.type === "step"), + ).toBe(true); + expect( + Array.from(last.entryMetadata.values()).some( + (meta) => meta.status === "completed", + ), + ).toBe(true); + }); + }); +}