diff --git a/examples/sandbox-vercel/src/actors/workflow/approval.ts b/examples/sandbox-vercel/src/actors/workflow/approval.ts index 4c7cc97d74..86070ec7eb 100644 --- a/examples/sandbox-vercel/src/actors/workflow/approval.ts +++ b/examples/sandbox-vercel/src/actors/workflow/approval.ts @@ -1,9 +1,9 @@ -// APPROVAL REQUEST (Listen Demo) -// Demonstrates: Message listening with timeout for approval workflows +// APPROVAL REQUEST (Queue Wait Demo) +// Demonstrates: Queue waits with timeout for approval workflows // One actor per approval request - actor key is the request ID import { actor, event, queue } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { Loop, workflow } from "rivetkit/workflow"; import { actorCtx } from "./_helpers.ts"; export type RequestStatus = "pending" | "approved" | "rejected" | "timeout"; @@ -21,7 +21,7 @@ export type ApprovalRequest = { type State = ApprovalRequest; -const QUEUE_DECISION = workflowQueueName("decision"); +const QUEUE_DECISION = "decision" as const; const APPROVAL_TIMEOUT_MS = 30000; @@ -44,7 +44,7 @@ export const approval = actor({ createdAt: Date.now(), }), queues: { - [QUEUE_DECISION]: queue(), + decision: queue(), }, events: { requestUpdated: event(), @@ -84,11 +84,14 @@ export const approval = actor({ c.broadcast("requestCreated", c.state); }); - const decision = await loopCtx.listenWithTimeout( + const [decisionMessage] = await loopCtx.queue.next( "wait-decision", - "decision", - APPROVAL_TIMEOUT_MS, + { + names: [QUEUE_DECISION], + timeout: APPROVAL_TIMEOUT_MS, + }, ); + const decision = decisionMessage?.body ?? null; await loopCtx.step("update-status", async () => { c.state.deciding = false; diff --git a/examples/sandbox-vercel/src/actors/workflow/dashboard.ts b/examples/sandbox-vercel/src/actors/workflow/dashboard.ts index fbe1bc68e7..449e38244f 100644 --- a/examples/sandbox-vercel/src/actors/workflow/dashboard.ts +++ b/examples/sandbox-vercel/src/actors/workflow/dashboard.ts @@ -2,7 +2,7 @@ // Demonstrates: Parallel data fetching with join (wait-all) import { actor, event, queue } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { Loop, workflow } from "rivetkit/workflow"; import { actorCtx } from "./_helpers.ts"; export type UserStats = { @@ -45,7 +45,7 @@ export type DashboardState = { type State = DashboardState; -const QUEUE_REFRESH = workflowQueueName("refresh"); +const QUEUE_REFRESH = "refresh"; type RefreshMessage = Record; async function fetchUserStats(): Promise { @@ -119,7 +119,9 @@ export const dashboard = actor({ run: async (loopCtx) => { const c = actorCtx(loopCtx); - await loopCtx.listen("wait-refresh", "refresh"); + await loopCtx.queue.next("wait-refresh", { + names: [QUEUE_REFRESH], + }); ctx.log.info({ msg: "starting dashboard refresh" }); diff --git a/examples/sandbox-vercel/src/actors/workflow/history-examples.ts b/examples/sandbox-vercel/src/actors/workflow/history-examples.ts index 9d03824998..b70a1f1f19 100644 --- a/examples/sandbox-vercel/src/actors/workflow/history-examples.ts +++ b/examples/sandbox-vercel/src/actors/workflow/history-examples.ts @@ -1,5 +1,5 @@ import { actor, event, queue } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { Loop, workflow } from "rivetkit/workflow"; import { actorCtx } from "./_helpers.ts"; function delay(ms: number): Promise { @@ -250,12 +250,12 @@ export type WorkflowHistoryFullState = { completedAt?: number; }; -const QUEUE_ORDER_CREATED = workflowQueueName("order:created"); -const QUEUE_ORDER_UPDATED = workflowQueueName("order:updated"); -const QUEUE_ORDER_ITEM = workflowQueueName("order:item"); -const QUEUE_ORDER_ARTIFACT = workflowQueueName("order:artifact"); -const QUEUE_ORDER_READY = workflowQueueName("order:ready"); -const QUEUE_ORDER_OPTIONAL = workflowQueueName("order:optional"); +const QUEUE_ORDER_CREATED = "order:created"; +const QUEUE_ORDER_UPDATED = "order:updated"; +const QUEUE_ORDER_ITEM = "order:item"; +const QUEUE_ORDER_ARTIFACT = "order:artifact"; +const QUEUE_ORDER_READY = "order:ready"; +const QUEUE_ORDER_OPTIONAL = "order:optional"; type OrderCreatedMessage = { id: string }; type OrderUpdatedMessage = { id: string; status: string }; @@ -408,31 +408,36 @@ export const workflowHistoryFull = actor({ return { readyBy, readyBatchBy }; }); - await ctx.listenN("listen-order-created", "order:created", 1); - await ctx.listenWithTimeout( - "listen-order-updated-timeout", - "order:updated", - 250, - ); - await ctx.listenN("listen-batch-two", "order:item", 2); - await ctx.listenNWithTimeout( - "listen-artifacts-timeout", - "order:artifact", - 3, - 300, - ); - await ctx.listenWithTimeout("listen-optional", "order:optional", 200); - await ctx.listenUntil( - "listen-until", - "order:ready", - Date.now() + 300, - ); - await ctx.listenNUntil( - "listen-batch-until", - "order:ready", - 2, - Date.now() + 400, - ); + await ctx.queue.next("listen-order-created", { + names: [QUEUE_ORDER_CREATED], + count: 1, + }); + await ctx.queue.next("listen-order-updated-timeout", { + names: [QUEUE_ORDER_UPDATED], + timeout: 250, + }); + await ctx.queue.next("listen-batch-two", { + names: [QUEUE_ORDER_ITEM], + count: 2, + }); + await ctx.queue.next("listen-artifacts-timeout", { + names: [QUEUE_ORDER_ARTIFACT], + count: 3, + timeout: 300, + }); + await ctx.queue.next("listen-optional", { + names: [QUEUE_ORDER_OPTIONAL], + timeout: 200, + }); + await ctx.queue.next("listen-until", { + names: [QUEUE_ORDER_READY], + timeout: 300, + }); + await ctx.queue.next("listen-batch-until", { + names: [QUEUE_ORDER_READY], + count: 2, + timeout: 400, + }); await ctx.join("join-dependencies", { inventory: { diff --git a/examples/sandbox-vercel/src/actors/workflow/workflow-fixtures.ts b/examples/sandbox-vercel/src/actors/workflow/workflow-fixtures.ts index 8d7e06e951..3deb92001f 100644 --- a/examples/sandbox-vercel/src/actors/workflow/workflow-fixtures.ts +++ b/examples/sandbox-vercel/src/actors/workflow/workflow-fixtures.ts @@ -1,5 +1,5 @@ -import { actor } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { actor, queue } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; const WORKFLOW_GUARD_KV_KEY = "__rivet_actor_workflow_guard_triggered"; @@ -50,18 +50,25 @@ export const workflowQueueActor = actor({ state: { received: [] as unknown[], }, + queues: { + [WORKFLOW_QUEUE_NAME]: queue(), + }, run: workflow(async (ctx) => { await ctx.loop({ name: "queue", run: async (loopCtx) => { const actorLoopCtx = loopCtx as any; - const message = await loopCtx.listen( - "queue-wait", - WORKFLOW_QUEUE_NAME, - ); + const [message] = await loopCtx.queue.next("queue-wait", { + names: [WORKFLOW_QUEUE_NAME], + completable: true, + }); + if (!message || !message.complete) { + return Loop.continue(undefined); + } + const complete = message.complete; await loopCtx.step("store-message", async () => { actorLoopCtx.state.received.push(message.body); - await message.complete({ echo: message.body }); + await complete({ echo: message.body }); }); return Loop.continue(undefined); }, @@ -97,4 +104,4 @@ export const workflowSleepActor = actor({ }, }); -export { WORKFLOW_QUEUE_NAME, workflowQueueName }; +export { WORKFLOW_QUEUE_NAME }; diff --git a/examples/sandbox/src/actors/workflow/approval.ts b/examples/sandbox/src/actors/workflow/approval.ts index 4c7cc97d74..86070ec7eb 100644 --- a/examples/sandbox/src/actors/workflow/approval.ts +++ b/examples/sandbox/src/actors/workflow/approval.ts @@ -1,9 +1,9 @@ -// APPROVAL REQUEST (Listen Demo) -// Demonstrates: Message listening with timeout for approval workflows +// APPROVAL REQUEST (Queue Wait Demo) +// Demonstrates: Queue waits with timeout for approval workflows // One actor per approval request - actor key is the request ID import { actor, event, queue } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { Loop, workflow } from "rivetkit/workflow"; import { actorCtx } from "./_helpers.ts"; export type RequestStatus = "pending" | "approved" | "rejected" | "timeout"; @@ -21,7 +21,7 @@ export type ApprovalRequest = { type State = ApprovalRequest; -const QUEUE_DECISION = workflowQueueName("decision"); +const QUEUE_DECISION = "decision" as const; const APPROVAL_TIMEOUT_MS = 30000; @@ -44,7 +44,7 @@ export const approval = actor({ createdAt: Date.now(), }), queues: { - [QUEUE_DECISION]: queue(), + decision: queue(), }, events: { requestUpdated: event(), @@ -84,11 +84,14 @@ export const approval = actor({ c.broadcast("requestCreated", c.state); }); - const decision = await loopCtx.listenWithTimeout( + const [decisionMessage] = await loopCtx.queue.next( "wait-decision", - "decision", - APPROVAL_TIMEOUT_MS, + { + names: [QUEUE_DECISION], + timeout: APPROVAL_TIMEOUT_MS, + }, ); + const decision = decisionMessage?.body ?? null; await loopCtx.step("update-status", async () => { c.state.deciding = false; diff --git a/examples/sandbox/src/actors/workflow/dashboard.ts b/examples/sandbox/src/actors/workflow/dashboard.ts index fbe1bc68e7..449e38244f 100644 --- a/examples/sandbox/src/actors/workflow/dashboard.ts +++ b/examples/sandbox/src/actors/workflow/dashboard.ts @@ -2,7 +2,7 @@ // Demonstrates: Parallel data fetching with join (wait-all) import { actor, event, queue } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { Loop, workflow } from "rivetkit/workflow"; import { actorCtx } from "./_helpers.ts"; export type UserStats = { @@ -45,7 +45,7 @@ export type DashboardState = { type State = DashboardState; -const QUEUE_REFRESH = workflowQueueName("refresh"); +const QUEUE_REFRESH = "refresh"; type RefreshMessage = Record; async function fetchUserStats(): Promise { @@ -119,7 +119,9 @@ export const dashboard = actor({ run: async (loopCtx) => { const c = actorCtx(loopCtx); - await loopCtx.listen("wait-refresh", "refresh"); + await loopCtx.queue.next("wait-refresh", { + names: [QUEUE_REFRESH], + }); ctx.log.info({ msg: "starting dashboard refresh" }); diff --git a/examples/sandbox/src/actors/workflow/history-examples.ts b/examples/sandbox/src/actors/workflow/history-examples.ts index 9d03824998..b70a1f1f19 100644 --- a/examples/sandbox/src/actors/workflow/history-examples.ts +++ b/examples/sandbox/src/actors/workflow/history-examples.ts @@ -1,5 +1,5 @@ import { actor, event, queue } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { Loop, workflow } from "rivetkit/workflow"; import { actorCtx } from "./_helpers.ts"; function delay(ms: number): Promise { @@ -250,12 +250,12 @@ export type WorkflowHistoryFullState = { completedAt?: number; }; -const QUEUE_ORDER_CREATED = workflowQueueName("order:created"); -const QUEUE_ORDER_UPDATED = workflowQueueName("order:updated"); -const QUEUE_ORDER_ITEM = workflowQueueName("order:item"); -const QUEUE_ORDER_ARTIFACT = workflowQueueName("order:artifact"); -const QUEUE_ORDER_READY = workflowQueueName("order:ready"); -const QUEUE_ORDER_OPTIONAL = workflowQueueName("order:optional"); +const QUEUE_ORDER_CREATED = "order:created"; +const QUEUE_ORDER_UPDATED = "order:updated"; +const QUEUE_ORDER_ITEM = "order:item"; +const QUEUE_ORDER_ARTIFACT = "order:artifact"; +const QUEUE_ORDER_READY = "order:ready"; +const QUEUE_ORDER_OPTIONAL = "order:optional"; type OrderCreatedMessage = { id: string }; type OrderUpdatedMessage = { id: string; status: string }; @@ -408,31 +408,36 @@ export const workflowHistoryFull = actor({ return { readyBy, readyBatchBy }; }); - await ctx.listenN("listen-order-created", "order:created", 1); - await ctx.listenWithTimeout( - "listen-order-updated-timeout", - "order:updated", - 250, - ); - await ctx.listenN("listen-batch-two", "order:item", 2); - await ctx.listenNWithTimeout( - "listen-artifacts-timeout", - "order:artifact", - 3, - 300, - ); - await ctx.listenWithTimeout("listen-optional", "order:optional", 200); - await ctx.listenUntil( - "listen-until", - "order:ready", - Date.now() + 300, - ); - await ctx.listenNUntil( - "listen-batch-until", - "order:ready", - 2, - Date.now() + 400, - ); + await ctx.queue.next("listen-order-created", { + names: [QUEUE_ORDER_CREATED], + count: 1, + }); + await ctx.queue.next("listen-order-updated-timeout", { + names: [QUEUE_ORDER_UPDATED], + timeout: 250, + }); + await ctx.queue.next("listen-batch-two", { + names: [QUEUE_ORDER_ITEM], + count: 2, + }); + await ctx.queue.next("listen-artifacts-timeout", { + names: [QUEUE_ORDER_ARTIFACT], + count: 3, + timeout: 300, + }); + await ctx.queue.next("listen-optional", { + names: [QUEUE_ORDER_OPTIONAL], + timeout: 200, + }); + await ctx.queue.next("listen-until", { + names: [QUEUE_ORDER_READY], + timeout: 300, + }); + await ctx.queue.next("listen-batch-until", { + names: [QUEUE_ORDER_READY], + count: 2, + timeout: 400, + }); await ctx.join("join-dependencies", { inventory: { diff --git a/examples/sandbox/src/actors/workflow/workflow-fixtures.ts b/examples/sandbox/src/actors/workflow/workflow-fixtures.ts index 8d7e06e951..3deb92001f 100644 --- a/examples/sandbox/src/actors/workflow/workflow-fixtures.ts +++ b/examples/sandbox/src/actors/workflow/workflow-fixtures.ts @@ -1,5 +1,5 @@ -import { actor } from "rivetkit"; -import { Loop, workflow, workflowQueueName } from "rivetkit/workflow"; +import { actor, queue } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; const WORKFLOW_GUARD_KV_KEY = "__rivet_actor_workflow_guard_triggered"; @@ -50,18 +50,25 @@ export const workflowQueueActor = actor({ state: { received: [] as unknown[], }, + queues: { + [WORKFLOW_QUEUE_NAME]: queue(), + }, run: workflow(async (ctx) => { await ctx.loop({ name: "queue", run: async (loopCtx) => { const actorLoopCtx = loopCtx as any; - const message = await loopCtx.listen( - "queue-wait", - WORKFLOW_QUEUE_NAME, - ); + const [message] = await loopCtx.queue.next("queue-wait", { + names: [WORKFLOW_QUEUE_NAME], + completable: true, + }); + if (!message || !message.complete) { + return Loop.continue(undefined); + } + const complete = message.complete; await loopCtx.step("store-message", async () => { actorLoopCtx.state.received.push(message.body); - await message.complete({ echo: message.body }); + await complete({ echo: message.body }); }); return Loop.continue(undefined); }, @@ -97,4 +104,4 @@ export const workflowSleepActor = actor({ }, }); -export { WORKFLOW_QUEUE_NAME, workflowQueueName }; +export { WORKFLOW_QUEUE_NAME }; diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts index 17e7d935b6..d74bf01a06 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts @@ -1,8 +1,8 @@ import { Loop } from "@rivetkit/workflow-engine"; -import { actor } from "@/actor/mod"; +import { actor, queue } from "@/actor/mod"; import { db } from "@/db/mod"; import { WORKFLOW_GUARD_KV_KEY } from "@/workflow/constants"; -import { workflow, workflowQueueName } from "@/workflow/mod"; +import { workflow } from "@/workflow/mod"; import type { registry } from "./registry"; const WORKFLOW_QUEUE_NAME = "workflow-default"; @@ -52,18 +52,25 @@ export const workflowQueueActor = actor({ state: { received: [] as unknown[], }, + queues: { + [WORKFLOW_QUEUE_NAME]: queue(), + }, run: workflow(async (ctx) => { await ctx.loop({ name: "queue", run: async (loopCtx) => { const actorLoopCtx = loopCtx as any; - const message = await loopCtx.listen( - "queue-wait", - WORKFLOW_QUEUE_NAME, - ); + const [message] = await loopCtx.queue.next("queue-wait", { + names: [WORKFLOW_QUEUE_NAME], + completable: true, + }); + if (!message || !message.complete) { + return Loop.continue(undefined); + } + const complete = message.complete; await loopCtx.step("store-message", async () => { actorLoopCtx.state.received.push(message.body); - await message.complete({ echo: message.body }); + await complete({ echo: message.body }); }); return Loop.continue(undefined); }, @@ -75,7 +82,7 @@ export const workflowQueueActor = actor({ const client = c.client(); const handle = client.workflowQueueActor.getForId(c.actorId); return await handle.send( - workflowQueueName(WORKFLOW_QUEUE_NAME), + WORKFLOW_QUEUE_NAME, payload, { wait: true, timeout: 1_000 }, ); @@ -175,4 +182,4 @@ export const workflowSleepActor = actor({ }, }); -export { WORKFLOW_QUEUE_NAME, workflowQueueName }; +export { WORKFLOW_QUEUE_NAME }; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 83e35bdcc3..152d2c563e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -81,14 +81,62 @@ export const RunConfigSchema = z.object({ /** Inspector integration for long-running run handlers. */ inspector: RunInspectorConfigSchema.optional(), }); -export type RunConfig = z.infer; +type RunConfigRuntime = z.infer; +export type RunConfig< + TState = unknown, + TConnParams = unknown, + TConnState = unknown, + TVars = unknown, + TInput = unknown, + TDatabase extends AnyDatabaseProvider = AnyDatabaseProvider, + TEvents extends EventSchemaConfig = Record, + TQueues extends QueueSchemaConfig = Record, +> = Omit & { + run: ( + c: RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ) => void | Promise; +}; + +type AnyRunConfig = RunConfig< + any, + any, + any, + any, + any, + AnyDatabaseProvider, + any, + any +>; + +export const RUN_FUNCTION_CONFIG_SYMBOL = Symbol.for( + "rivetkit.run_function_config", +); + +interface RunFunctionConfig { + name?: string; + icon?: string; + inspector?: RunInspectorConfig; +} + +type RunFunctionWithConfig = ((...args: any[]) => any) & { + [RUN_FUNCTION_CONFIG_SYMBOL]?: RunFunctionConfig; +}; // Run can be either a function or an object with name/icon/run const zRunHandler = z.union([zFunction(), RunConfigSchema]).optional(); /** Extract the run function from either a function or RunConfig object. */ export function getRunFunction( - run: ((...args: any[]) => any) | RunConfig | undefined, + run: ((...args: any[]) => any) | AnyRunConfig | undefined, ): ((...args: any[]) => any) | undefined { if (!run) return undefined; if (typeof run === "function") return run; @@ -97,17 +145,28 @@ export function getRunFunction( /** Extract run metadata (name/icon) from RunConfig if provided. */ export function getRunMetadata( - run: ((...args: any[]) => any) | RunConfig | undefined, + run: ((...args: any[]) => any) | AnyRunConfig | undefined, ): { name?: string; icon?: string } { - if (!run || typeof run === "function") return {}; + if (!run) return {}; + if (typeof run === "function") { + const config = (run as RunFunctionWithConfig)[ + RUN_FUNCTION_CONFIG_SYMBOL + ]; + if (!config) return {}; + return { name: config.name, icon: config.icon }; + } return { name: run.name, icon: run.icon }; } /** Extract run inspector configuration if provided. */ export function getRunInspectorConfig( - run: ((...args: any[]) => any) | RunConfig | undefined, + run: ((...args: any[]) => any) | AnyRunConfig | undefined, ): RunInspectorConfig | undefined { - if (!run || typeof run === "function") return undefined; + if (!run) return undefined; + if (typeof run === "function") { + return (run as RunFunctionWithConfig)[RUN_FUNCTION_CONFIG_SYMBOL] + ?.inspector; + } return run.inspector; } @@ -467,7 +526,16 @@ interface BaseActorConfig< TQueues >, ) => void | Promise) - | RunConfig; + | RunConfig< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; /** * Called when the actor's state changes. diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts index d21cfa3ad5..5a2fc5a777 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts @@ -35,7 +35,7 @@ interface QueueWaiter { } interface MessageListener { - nameSet: Set; + nameSet?: Set; resolve: () => void; reject: (error: Error) => void; actorAbortCleanup?: () => void; @@ -345,12 +345,17 @@ export class QueueManager< } async waitForNames( - names: string[], + names: readonly string[] | undefined, abortSignal?: AbortSignal, ): Promise { - const nameSet = new Set(names); + const nameSet = + names && names.length > 0 ? new Set(names) : undefined; const existing = await this.#loadQueueMessages(); - if (existing.some((message) => nameSet.has(message.name))) { + if (nameSet) { + if (existing.some((message) => nameSet.has(message.name))) { + return; + } + } else if (existing.length > 0) { return; } @@ -500,7 +505,7 @@ export class QueueManager< return; } for (const listener of [...this.#messageListeners]) { - if (!listener.nameSet.has(name)) { + if (listener.nameSet && !listener.nameSet.has(name)) { continue; } this.#removeMessageListener(listener); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts index 4e834fb83b..65eaace13f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts @@ -17,8 +17,8 @@ export type QueueMessageOf = Omit< body: Body; }; -type QueueName = keyof TQueues & string; -type QueueFilterName = +export type QueueName = keyof TQueues & string; +export type QueueFilterName = keyof TQueues extends never ? string : QueueName; type QueueMessageForName< @@ -54,7 +54,7 @@ type QueueCompletableMessageForName< ): Promise; }; -type QueueResultMessageForName< +export type QueueResultMessageForName< TQueues extends QueueSchemaConfig, TName extends QueueFilterName, TCompletable extends boolean, diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts index b0e5078db2..37a8472e6e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts @@ -1,7 +1,6 @@ import { describe, expect, test } from "vitest"; import { WORKFLOW_QUEUE_NAME, - workflowQueueName, } from "../../../fixtures/driver-test-suite/workflow"; import type { DriverTestConfig } from "../mod"; import { setupDriverTest, waitFor } from "../utils"; @@ -21,11 +20,11 @@ export function runActorWorkflowTests(driverTestConfig: DriverTestConfig) { expect(state.guardTriggered).toBe(true); }); - test("consumes queue messages via workflow listen", async (c) => { + test("consumes queue messages via workflow queue.next", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const actor = client.workflowQueueActor.getOrCreate(["workflow-queue"]); - await actor.send(workflowQueueName(WORKFLOW_QUEUE_NAME), { + await actor.send(WORKFLOW_QUEUE_NAME, { hello: "world", }); @@ -34,7 +33,7 @@ export function runActorWorkflowTests(driverTestConfig: DriverTestConfig) { expect(messages).toEqual([{ hello: "world" }]); }); - test("workflow listen supports completing wait sends", async (c) => { + test("workflow queue.next supports completing wait sends", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const actor = client.workflowQueueActor.getOrCreate([ "workflow-queue-wait", diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts index af39f3cc1b..89d67bc30b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts @@ -2,6 +2,17 @@ import type { RunContext } from "@/actor/contexts/run"; import type { Client } from "@/client/client"; import type { Registry } from "@/registry"; import type { AnyDatabaseProvider, InferDatabaseClient } from "@/actor/database"; +import type { + QueueFilterName, + QueueNextOptions, + QueueResultMessageForName, +} from "@/actor/instance/queue"; +import type { + EventSchemaConfig, + InferEventArgs, + InferSchemaMap, + QueueSchemaConfig, +} from "@/actor/schema"; import type { WorkflowContextInterface } from "@rivetkit/workflow-engine"; import type { BranchConfig, @@ -10,10 +21,72 @@ import type { LoopConfig, LoopResult, StepConfig, - WorkflowListenMessage, + WorkflowQueueMessage, } from "@rivetkit/workflow-engine"; import { WORKFLOW_GUARD_KV_KEY } from "./constants"; +type WorkflowActorQueueNextOptions< + TName extends string, + TCompletable extends boolean, +> = Omit, "signal">; + +type WorkflowActorQueueNextOptionsFallback = Omit< + QueueNextOptions, + "signal" +>; + +type ActorWorkflowLoopConfig< + S, + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = Omit, "run"> & { + run: ( + ctx: ActorWorkflowContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + state: S, + ) => Promise>; +}; + +type ActorWorkflowBranchConfig< + TOutput, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = { + run: ( + ctx: ActorWorkflowContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ) => Promise; +}; + export class ActorWorkflowContext< TState, TConnParams, @@ -21,17 +94,37 @@ export class ActorWorkflowContext< TVars, TInput, TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig = Record, + TQueues extends QueueSchemaConfig = Record, > implements WorkflowContextInterface { #inner: WorkflowContextInterface; - #runCtx: RunContext; + #runCtx: RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; #actorAccessDepth = 0; #allowActorAccess = false; #guardViolation = false; constructor( inner: WorkflowContextInterface, - runCtx: RunContext, + runCtx: RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, ) { this.#inner = inner; this.#runCtx = runCtx; @@ -45,39 +138,136 @@ export class ActorWorkflowContext< return this.#inner.abortSignal; } - async step( - nameOrConfig: string | Parameters[0], - run?: () => Promise, - ): Promise { + get queue() { + const self = this; + function next< + const TName extends QueueFilterName, + const TCompletable extends boolean = false, + >( + name: string, + opts?: WorkflowActorQueueNextOptions, + ): Promise>>; + function next( + name: string, + opts?: WorkflowActorQueueNextOptionsFallback, + ): Promise< + Array< + QueueResultMessageForName< + TQueues, + QueueFilterName, + TCompletable + > + > + >; + async function next( + name: string, + opts?: WorkflowActorQueueNextOptions, + ): Promise>> { + const messages = await self.#inner.queue.next(name, opts); + return messages.map((message) => self.#toActorQueueMessage(message)); + } + + function send( + name: K, + body: InferSchemaMap[K], + ): Promise; + function send( + name: keyof TQueues extends never ? string : never, + body: unknown, + ): Promise; + async function send(name: string, body: unknown): Promise { + await self.#runCtx.queue.send(name as never, body as never); + } + + return { + next, + send, + }; + } + + async step( + nameOrConfig: string | Parameters[0], + run?: () => Promise, + ): Promise { if (typeof nameOrConfig === "string") { if (!run) { throw new Error("Step run function missing"); } - return await this.#wrapActive(() => - this.#inner.step(nameOrConfig, () => - this.#withActorAccess(run), - ), - ); - } - const stepConfig = nameOrConfig as StepConfig; - const config: StepConfig = { - ...stepConfig, - run: () => this.#withActorAccess(stepConfig.run), - }; - return await this.#wrapActive(() => this.#inner.step(config)); + return await this.#wrapActive(() => + this.#inner.step(nameOrConfig, () => this.#withActorAccess(run)), + ); } + const stepConfig = nameOrConfig as StepConfig; + const config: StepConfig = { + ...stepConfig, + run: () => this.#withActorAccess(stepConfig.run), + }; + return await this.#wrapActive(() => this.#inner.step(config)); + } + async loop( + name: string, + run: ( + ctx: ActorWorkflowContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ) => Promise>, + ): Promise; async loop( name: string, run: ( ctx: WorkflowContextInterface, ) => Promise>, ): Promise; + async loop( + config: ActorWorkflowLoopConfig< + S, + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise; async loop(config: LoopConfig): Promise; async loop( - nameOrConfig: string | LoopConfig, + nameOrConfig: + | string + | LoopConfig + | ActorWorkflowLoopConfig< + unknown, + unknown, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, run?: ( - ctx: WorkflowContextInterface, + ctx: ActorWorkflowContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, ) => Promise>, ): Promise { if (typeof nameOrConfig === "string") { @@ -106,68 +296,34 @@ export class ActorWorkflowContext< return this.#inner.sleepUntil(name, timestampMs); } - listen( - name: string, - messageName: string | string[], - ): Promise> { - return this.#inner.listen(name, messageName); - } - - listenN( - name: string, - messageName: string, - limit: number, - ): Promise { - return this.#inner.listenN(name, messageName, limit); - } - - listenWithTimeout( - name: string, - messageName: string, - timeoutMs: number, - ): Promise { - return this.#inner.listenWithTimeout(name, messageName, timeoutMs); - } - - listenUntil( - name: string, - messageName: string, - timestampMs: number, - ): Promise { - return this.#inner.listenUntil(name, messageName, timestampMs); - } - - listenNWithTimeout( - name: string, - messageName: string, - limit: number, - timeoutMs: number, - ): Promise { - return this.#inner.listenNWithTimeout( - name, - messageName, - limit, - timeoutMs, - ); - } - - listenNUntil( - name: string, - messageName: string, - limit: number, - timestampMs: number, - ): Promise { - return this.#inner.listenNUntil(name, messageName, limit, timestampMs); - } - async rollbackCheckpoint(name: string): Promise { await this.#wrapActive(() => this.#inner.rollbackCheckpoint(name)); } + async join< + T extends Record< + string, + ActorWorkflowBranchConfig< + unknown, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + > + >, + >( + name: string, + branches: T, + ): Promise<{ [K in keyof T]: Awaited> }>; async join>>( name: string, branches: T, - ): Promise<{ [K in keyof T]: BranchOutput }> { + ): Promise<{ [K in keyof T]: BranchOutput }>; + async join(name: string, branches: Record>) { const wrappedBranches = Object.fromEntries( Object.entries(branches).map(([key, branch]) => [ key, @@ -176,13 +332,30 @@ export class ActorWorkflowContext< branch.run(this.#createChildContext(ctx)), }, ]), - ) as T; - - return (await this.#wrapActive(() => + ) as Record>; + return await this.#wrapActive(() => this.#inner.join(name, wrappedBranches), - )) as { [K in keyof T]: BranchOutput }; + ); } + async race( + name: string, + branches: Array<{ + name: string; + run: ( + ctx: ActorWorkflowContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ) => Promise; + }>, + ): Promise<{ winner: string; value: T }>; async race( name: string, branches: Array<{ @@ -246,8 +419,37 @@ export class ActorWorkflowContext< return this.#runCtx.actorId; } - broadcast>(name: string, ...args: Args): void { - this.#runCtx.broadcast(name, ...args); + broadcast( + name: K, + ...args: InferEventArgs[K]> + ): void; + broadcast( + name: keyof TEvents extends never ? string : never, + ...args: Array + ): void; + broadcast(name: string, ...args: Array): void { + this.#runCtx.broadcast( + name as never, + ...((args as unknown[]) as never[]), + ); + } + + #toActorQueueMessage( + message: WorkflowQueueMessage, + ): WorkflowQueueMessage & { id: bigint } { + let id: bigint; + try { + id = BigInt(message.id); + } catch { + throw new Error(`Invalid queue message id "${message.id}"`); + } + return { + id, + name: message.name, + body: message.body, + createdAt: message.createdAt, + ...(message.complete ? { complete: message.complete } : {}), + }; } async #wrapActive(run: () => Promise): Promise { @@ -321,7 +523,9 @@ export class ActorWorkflowContext< TConnState, TVars, TInput, - TDatabase + TDatabase, + TEvents, + TQueues > { return new ActorWorkflowContext(ctx, this.#runCtx); } diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts index f262ec5c9d..3213597adb 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts @@ -9,72 +9,66 @@ import type { WorkflowMessageDriver, } from "@rivetkit/workflow-engine"; -const WORKFLOW_QUEUE_PREFIX = "__workflow:"; - -export function workflowQueueName(name: string): string { - return `${WORKFLOW_QUEUE_PREFIX}${name}`; -} - -function stripWorkflowQueueName(name: string): string | null { - if (!name.startsWith(WORKFLOW_QUEUE_PREFIX)) { - return null; - } - return name.slice(WORKFLOW_QUEUE_PREFIX.length); -} - function stripWorkflowKey(prefixed: Uint8Array): Uint8Array { return prefixed.slice(KEYS.WORKFLOW_PREFIX.length); } class ActorWorkflowMessageDriver implements WorkflowMessageDriver { #actor: AnyActorInstance; - #runCtx: RunContext; - #completionHandles = new Map Promise>(); + #runCtx: RunContext; constructor( actor: AnyActorInstance, - runCtx: RunContext, + runCtx: RunContext, ) { this.#actor = actor; this.#runCtx = runCtx; } async loadMessages(): Promise { - const queueMessages = await this.#runCtx.keepAwake( - this.#actor.queueManager.getMessages(), - ); - - const workflowMessages: Message[] = []; - for (const queueMessage of queueMessages) { - const workflowName = stripWorkflowQueueName(queueMessage.name); - if (!workflowName) continue; - const id = queueMessage.id.toString(); - this.#completionHandles.set(id, async (response?: unknown) => { - await this.#runCtx.keepAwake( - this.#actor.queueManager.completeMessage(queueMessage, response), - ); - }); - workflowMessages.push({ - id, - name: workflowName, - data: queueMessage.body, - sentAt: queueMessage.createdAt, - complete: async (response?: unknown) => { - await this.completeMessage(id, response); - }, - }); - } - - return workflowMessages; + // Actor-backed workflows use receiveMessages() directly and do not + // mirror queue messages into workflow-engine storage. + return []; } async addMessage(message: Message): Promise { await this.#runCtx.keepAwake( - this.#actor.queueManager.enqueue( - workflowQueueName(message.name), - message.data, + this.#actor.queueManager.enqueue(message.name, message.data), + ); + } + + async receiveMessages(opts: { + names?: readonly string[]; + count: number; + completable: boolean; + }): Promise { + const messages = await this.#runCtx.keepAwake( + this.#actor.queueManager.receive( + opts.names && opts.names.length > 0 ? [...opts.names] : undefined, + opts.count, + 0, + undefined, + opts.completable, ), ); + return messages.map((message) => ({ + id: message.id.toString(), + name: message.name, + data: message.body, + sentAt: message.createdAt, + ...(opts.completable + ? { + complete: async (response?: unknown) => { + await this.#runCtx.keepAwake( + this.#actor.queueManager.completeMessage( + message, + response, + ), + ); + }, + } + : {}), + })); } async deleteMessages(messageIds: string[]): Promise { @@ -105,13 +99,6 @@ class ActorWorkflowMessageDriver implements WorkflowMessageDriver { } async completeMessage(messageId: string, response?: unknown): Promise { - const complete = this.#completionHandles.get(messageId); - if (complete) { - await complete(response); - this.#completionHandles.delete(messageId); - return; - } - let parsedId: bigint; try { parsedId = BigInt(messageId); @@ -129,21 +116,17 @@ export class ActorWorkflowDriver implements EngineDriver { readonly workerPollInterval = 100; readonly messageDriver: WorkflowMessageDriver; #actor: AnyActorInstance; - #runCtx: RunContext; + #runCtx: RunContext; constructor( actor: AnyActorInstance, - runCtx: RunContext, + runCtx: RunContext, ) { this.#actor = actor; this.#runCtx = runCtx; this.messageDriver = new ActorWorkflowMessageDriver(actor, runCtx); } - #log(msg: string, data?: Record) { - this.#runCtx.log.info({ msg: `[workflow-driver] ${msg}`, ...data }); - } - async get(key: Uint8Array): Promise { const [value] = await this.#runCtx.keepAwake( this.#actor.driver.kvBatchGet(this.#actor.id, [ @@ -231,7 +214,9 @@ export class ActorWorkflowDriver implements EngineDriver { messageNames: string[], abortSignal: AbortSignal, ): Promise { - const queueNames = messageNames.map((name) => workflowQueueName(name)); - return this.#actor.queueManager.waitForNames(queueNames, abortSignal); + return this.#actor.queueManager.waitForNames( + messageNames.length > 0 ? messageNames : undefined, + abortSignal, + ); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts index 75bb637e28..23ef812b8c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts @@ -2,16 +2,16 @@ import { ACTOR_CONTEXT_INTERNAL_SYMBOL } from "@/actor/contexts/base/actor"; import type { RunContext } from "@/actor/contexts/run"; import type { AnyDatabaseProvider } from "@/actor/database"; import type { AnyActorInstance } from "@/actor/instance/mod"; -import type { RunConfig } from "@/actor/config"; +import type { EventSchemaConfig, QueueSchemaConfig } from "@/actor/schema"; +import { RUN_FUNCTION_CONFIG_SYMBOL } from "@/actor/config"; import { stringifyError } from "@/utils"; import { runWorkflow } from "@rivetkit/workflow-engine"; import invariant from "invariant"; import { ActorWorkflowContext } from "./context"; -import { ActorWorkflowDriver, workflowQueueName } from "./driver"; +import { ActorWorkflowDriver } from "./driver"; import { createWorkflowInspectorAdapter } from "./inspector"; export { Loop } from "@rivetkit/workflow-engine"; -export { workflowQueueName } from "./driver"; export { ActorWorkflowContext } from "./context"; export function workflow< @@ -21,6 +21,8 @@ export function workflow< TVars, TInput, TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig = Record, + TQueues extends QueueSchemaConfig = Record, >( fn: ( ctx: ActorWorkflowContext< @@ -29,22 +31,37 @@ export function workflow< TConnState, TVars, TInput, - TDatabase + TDatabase, + TEvents, + TQueues >, ) => Promise, -): RunConfig { +): ( + c: RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, +) => Promise { const workflowInspector = createWorkflowInspectorAdapter(); async function run( runCtx: RunContext< TState, TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ): Promise { + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise { const actor = ( runCtx as unknown as { [ACTOR_CONTEXT_INTERNAL_SYMBOL]?: AnyActorInstance; @@ -92,9 +109,16 @@ export function workflow< }); } - return { + const runWithConfig = run as typeof run & { + [RUN_FUNCTION_CONFIG_SYMBOL]?: { + icon?: string; + inspector?: { workflow: typeof workflowInspector.adapter }; + }; + }; + runWithConfig[RUN_FUNCTION_CONFIG_SYMBOL] = { icon: "diagram-project", - run: run as RunConfig["run"], inspector: { workflow: workflowInspector.adapter }, }; + + return runWithConfig; } diff --git a/rivetkit-typescript/packages/rivetkit/tests/actor-types.test.ts b/rivetkit-typescript/packages/rivetkit/tests/actor-types.test.ts index 6eee81fdb7..780670681f 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/actor-types.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/actor-types.test.ts @@ -3,6 +3,7 @@ import { actor, event, queue } from "@/actor/mod"; import type { ActorContext, ActorContextOf } from "@/actor/contexts"; import type { ActorDefinition } from "@/actor/definition"; import type { DatabaseProviderContext } from "@/db/config"; +import { workflow } from "@/workflow/mod"; describe("ActorDefinition", () => { describe("schema config types", () => { @@ -171,4 +172,71 @@ describe("ActorDefinition", () => { expectTypeOf().toEqualTypeOf<{ input: string }>(); }); }); + + describe("workflow context type inference", () => { + it("infers queue and event types for workflow ctx", () => { + actor({ + state: {}, + queues: { + foo: queue<{ fooBody: string }>(), + bar: queue<{ barBody: number }>(), + }, + events: { + updated: event<{ count: number }>(), + pair: event<[number, string]>(), + }, + run: workflow(async (ctx) => { + const [single] = await ctx.queue.next("wait-single", { + names: ["foo"] as const, + }); + if (single && single.name === "foo") { + expectTypeOf(single.body).toEqualTypeOf<{ + fooBody: string; + }>(); + } + + const [union] = await ctx.queue.next("wait-union", { + names: ["foo", "bar"], + }); + if (union?.name === "foo") { + expectTypeOf(union.body).toEqualTypeOf<{ + fooBody: string; + }>(); + } + if (union?.name === "bar") { + expectTypeOf(union.body).toEqualTypeOf<{ + barBody: number; + }>(); + } + + ctx.broadcast("updated", { count: 1 }); + ctx.broadcast("pair", 1, "ok"); + // @ts-expect-error wrong payload shape + ctx.broadcast("updated", { count: "no" }); + // @ts-expect-error unknown event name + ctx.broadcast("missing", { count: 1 }); + }), + actions: {}, + }); + }); + + it("does not require explicit queue.next body generic for single-queue actors", () => { + type Decision = { approved: boolean; approver: string }; + actor({ + state: {}, + queues: { + decision: queue(), + }, + run: workflow(async (ctx) => { + const [message] = await ctx.queue.next("wait-decision", { + names: ["decision"], + }); + if (message) { + expectTypeOf(message.body).toEqualTypeOf(); + } + }), + actions: {}, + }); + }); + }); }); diff --git a/rivetkit-typescript/packages/workflow-engine/QUICKSTART.md b/rivetkit-typescript/packages/workflow-engine/QUICKSTART.md index 5875d2f870..f0ad053745 100644 --- a/rivetkit-typescript/packages/workflow-engine/QUICKSTART.md +++ b/rivetkit-typescript/packages/workflow-engine/QUICKSTART.md @@ -28,7 +28,9 @@ async function orderWorkflow(ctx: WorkflowContextInterface, orderId: string) { }); // Wait for shipping confirmation (external message) - const tracking = await ctx.listen("wait-shipping", "shipment-confirmed"); + const [tracking] = await ctx.queue.next("wait-shipping", { + names: ["shipment-confirmed"], + }); // Send notification await ctx.step("notify-customer", async () => { @@ -213,46 +215,34 @@ await ctx.sleepUntil("wait-midnight", midnightTimestamp); Short sleeps (< `driver.workerPollInterval`) wait in memory. Longer sleeps yield to the scheduler and set an alarm for wake-up. -### Messages (Listen) +### Queue Messages Wait for external events delivered via `handle.message()`. ```typescript // Wait for a single message -const data = await ctx.listen("payment", "payment-completed"); +const [data] = await ctx.queue.next("payment", { + names: ["payment-completed"], +}); // Wait for N messages -const items = await ctx.listenN("batch", "item-added", 10); - -// Wait with timeout (returns null on timeout) -const result = await ctx.listenWithTimeout( - "api-response", - "response-received", - 30000 -); +const items = await ctx.queue.next("batch", { + names: ["item-added"], + count: 10, +}); -// Wait until timestamp (returns null on timeout) -const result = await ctx.listenUntil( - "api-response", - "response-received", - deadline -); +// Wait with timeout (returns [] on timeout) +const result = await ctx.queue.next("api-response", { + names: ["response-received"], + timeout: 30000, +}); // Wait for up to N messages with timeout -const items = await ctx.listenNWithTimeout( - "batch", - "item-added", - 10, // max items - 60000 // timeout ms -); - -// Wait for up to N messages until timestamp -const items = await ctx.listenNUntil( - "batch", - "item-added", - 10, // max items - deadline // timestamp -); +const timedBatch = await ctx.queue.next("batch-timeout", { + names: ["item-added"], + count: 10, + timeout: 60000, +}); ``` **Message delivery:** Messages are loaded once at workflow start. If a message is sent during execution, the workflow yields and picks it up on the next run. @@ -429,7 +419,7 @@ type WorkflowState = ## Best Practices -1. **Unique step names** - Each step/loop/sleep/listen within a scope must have a unique name +1. **Unique step names** - Each step/loop/sleep/queue.next within a scope must have a unique name 2. **Deterministic code** - Workflow code outside of steps must be deterministic. Don't use `Math.random()`, `Date.now()`, or read external state outside steps. diff --git a/rivetkit-typescript/packages/workflow-engine/architecture.md b/rivetkit-typescript/packages/workflow-engine/architecture.md index 59e2f6f9a6..93e60c2fc3 100644 --- a/rivetkit-typescript/packages/workflow-engine/architecture.md +++ b/rivetkit-typescript/packages/workflow-engine/architecture.md @@ -143,7 +143,7 @@ This optimization reduces storage size when the same names appear many times. ┌─────────────────────────────────────────────────────────────┐ │ WorkflowContextImpl │ │ Implements WorkflowContext interface │ -│ - step(), loop(), sleep(), listen(), join(), race() │ +│ - step(), loop(), sleep(), queue.next(), join(), race() │ │ - Manages current location │ │ - Creates branch contexts for parallel execution │ └─────────────────────────────────────────────────────────────┘ @@ -202,7 +202,7 @@ This optimization reduces storage size when the same names appear many times. ### Sleep/Message Yielding ``` -1. ctx.sleep() or ctx.listen() called +1. ctx.sleep() or ctx.queue.next() called 2. Check if deadline passed or message available (in memory) 3. If not ready: a. Throw SleepError or MessageWaitError diff --git a/rivetkit-typescript/packages/workflow-engine/docs/control-flow.md b/rivetkit-typescript/packages/workflow-engine/docs/control-flow.md index 66625d5012..d51e1bab16 100644 --- a/rivetkit-typescript/packages/workflow-engine/docs/control-flow.md +++ b/rivetkit-typescript/packages/workflow-engine/docs/control-flow.md @@ -74,17 +74,19 @@ const { winner, value } = await ctx.race("timeout", [ ## Messages in Control Flow -`ctx.listen()` and its variants pause the workflow until a message arrives. Message names are part of history, so keep them stable and unique. +`ctx.queue.next()` pauses the workflow until matching messages arrive. Queue wait names are part of history, so keep them stable and unique. ```ts -const approval = await ctx.listen("approval", "approval-granted"); +const [approval] = await ctx.queue.next("approval", { + names: ["approval-granted"], +}); ``` Messages are loaded at workflow start. If a message arrives during execution, the workflow yields and picks it up on the next run. ## Best Practices -- Use stable names for steps, loops, joins, races, and listens. +- Use stable names for steps, loops, joins, races, and queue waits. - Keep all nondeterministic work inside steps. - Use loop state to avoid native `while`/`for` loops. - Handle cancellation via `ctx.abortSignal` in long-running branches. @@ -92,4 +94,4 @@ Messages are loaded at workflow start. If a message arrives during execution, th ## Related - `rivetkit-typescript/packages/workflow-engine/QUICKSTART.md:155` for loop usage. -- `rivetkit-typescript/packages/workflow-engine/QUICKSTART.md:207` for message waits. +- `rivetkit-typescript/packages/workflow-engine/QUICKSTART.md:207` for queue waits. diff --git a/rivetkit-typescript/packages/workflow-engine/docs/long-running-workflows.md b/rivetkit-typescript/packages/workflow-engine/docs/long-running-workflows.md index f52ee553a6..b0bce652df 100644 --- a/rivetkit-typescript/packages/workflow-engine/docs/long-running-workflows.md +++ b/rivetkit-typescript/packages/workflow-engine/docs/long-running-workflows.md @@ -4,11 +4,13 @@ Long-running workflows can pause, sleep, and resume across process restarts. Thi ## Yielding Execution -Use sleep and listen helpers to yield control while waiting: +Use sleep and queue helpers to yield control while waiting: ```ts await ctx.sleep("wait-5-min", 5 * 60 * 1000); -const message = await ctx.listen("wait-approval", "approval"); +const [message] = await ctx.queue.next("wait-approval", { + names: ["approval"], +}); ``` When a workflow yields, `runWorkflow` returns a `WorkflowResult` with `state: "sleeping"`. The driver alarm or a message wake-up triggers the next run. diff --git a/rivetkit-typescript/packages/workflow-engine/docs/waiting-for-events-and-human-in-the-loop.md b/rivetkit-typescript/packages/workflow-engine/docs/waiting-for-events-and-human-in-the-loop.md index 4e8b751c5c..edfa100266 100644 --- a/rivetkit-typescript/packages/workflow-engine/docs/waiting-for-events-and-human-in-the-loop.md +++ b/rivetkit-typescript/packages/workflow-engine/docs/waiting-for-events-and-human-in-the-loop.md @@ -8,43 +8,49 @@ Workflows can pause until external events arrive. This enables human approvals, - Messages are loaded at workflow start. - If a message arrives while the workflow is running, the workflow yields and picks it up on the next run. -In live mode (`runWorkflow(..., { mode: "live" })`), incoming messages can also wake a workflow waiting on `ctx.listen()`. +In live mode (`runWorkflow(..., { mode: "live" })`), incoming messages can also wake a workflow waiting on `ctx.queue.next()`. -## Listening for Messages +## Waiting for Queue Messages ```ts -const approval = await ctx.listen("wait-approval", "approval-granted"); +const [approval] = await ctx.queue.next("wait-approval", { + names: ["approval-granted"], +}); ``` -Use the `listen*` variants to wait for multiple messages or apply timeouts: +Use `count` and `timeout` to wait for batches or apply deadlines: ```ts -const items = await ctx.listenN("batch", "item-added", 10); -const result = await ctx.listenWithTimeout("approval", "approval-granted", 60000); +const items = await ctx.queue.next("batch", { + names: ["item-added"], + count: 10, +}); +const result = await ctx.queue.next("approval", { + names: ["approval-granted"], + timeout: 60000, +}); ``` ## Deadlines and Timeouts -Use `listenUntil` or `listenWithTimeout` to model approval windows: +Use `timeout` to model approval windows: ```ts -const approval = await ctx.listenUntil( - "approval-window", - "approval-granted", - Date.now() + 24 * 60 * 60 * 1000, -); +const [approval] = await ctx.queue.next("approval-window", { + names: ["approval-granted"], + timeout: 24 * 60 * 60 * 1000, +}); ``` -If the deadline passes, the method returns `null` instead of throwing. +If the deadline passes, `ctx.queue.next(...)` returns `[]`. ## Human-in-the-Loop Example ```ts -const approval = await ctx.listenWithTimeout( - "manual-approval", - "approval-granted", - 30 * 60 * 1000, -); +const [approval] = await ctx.queue.next("manual-approval", { + names: ["approval-granted"], + timeout: 30 * 60 * 1000, +}); if (!approval) { await ctx.step("notify-timeout", () => sendTimeoutNotice()); @@ -62,5 +68,5 @@ await ctx.step("proceed", () => runApprovedWork()); ## Related -- `rivetkit-typescript/packages/workflow-engine/QUICKSTART.md:207` for listen helpers. +- `rivetkit-typescript/packages/workflow-engine/QUICKSTART.md:207` for queue waits. - `rivetkit-typescript/packages/workflow-engine/architecture.md:218` for message delivery details. diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts index d6e8d8a47b..3bb29e0fc1 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/context.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -24,14 +24,13 @@ import { registerName, } from "./location.js"; import { - consumeMessage, consumeMessages, createEntry, deleteEntriesWithPrefix, flush, - getEntry, getOrCreateMetadata, loadMetadata, + peekMessages, setEntry, } from "./storage.js"; import type { @@ -49,7 +48,9 @@ import type { StepConfig, Storage, WorkflowContextInterface, - WorkflowListenMessage, + WorkflowQueue, + WorkflowQueueMessage, + WorkflowQueueNextOptions, WorkflowMessageDriver, } from "./types.js"; import { sleep } from "./utils.js"; @@ -66,7 +67,7 @@ export const DEFAULT_LOOP_HISTORY_EVERY = 20; export const DEFAULT_LOOP_HISTORY_KEEP = 20; export const DEFAULT_STEP_TIMEOUT = 30000; // 30 seconds -const LISTEN_HISTORY_MESSAGE_MARKER = "__rivetWorkflowListenMessage"; +const QUEUE_HISTORY_MESSAGE_MARKER = "__rivetWorkflowQueueMessage"; /** * Calculate backoff delay with exponential backoff. @@ -113,6 +114,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { private rollbackCheckpointSet: boolean; /** Track names used in current execution to detect duplicates */ private usedNamesInExecution = new Set(); + private pendingCompletableMessageIds = new Set(); private historyNotifier?: () => void; private logger?: Logger; @@ -142,6 +144,13 @@ export class WorkflowContextImpl implements WorkflowContextInterface { return this.abortController.signal; } + get queue(): WorkflowQueue { + return { + next: async (name, opts) => await this.queueNext(name, opts), + send: async (name, body) => await this.queueSend(name, body), + }; + } + isEvicted(): boolean { return this.abortSignal.aborted; } @@ -206,12 +215,12 @@ export class WorkflowContextImpl implements WorkflowContextInterface { private checkDuplicateName(name: string): void { const fullKey = locationToKey(this.storage, this.currentLocation) + "/" + name; - if (this.usedNamesInExecution.has(fullKey)) { - throw new HistoryDivergedError( - `Duplicate entry name "${name}" at location "${locationToKey(this.storage, this.currentLocation)}". ` + - `Each step/loop/sleep/listen/join/race must have a unique name within its scope.`, - ); - } + if (this.usedNamesInExecution.has(fullKey)) { + throw new HistoryDivergedError( + `Duplicate entry name "${name}" at location "${locationToKey(this.storage, this.currentLocation)}". ` + + `Each step/loop/sleep/queue.next/join/race must have a unique name within its scope.`, + ); + } this.usedNamesInExecution.add(fullKey); } @@ -953,66 +962,54 @@ export class WorkflowContextImpl implements WorkflowContextInterface { this.rollbackCheckpointSet = true; } - // === Listen === - // - // IMPORTANT: Messages are loaded once at workflow start (in loadStorage). - // If a message is sent via handle.message() DURING workflow execution, - // it won't be visible until the next execution. The workflow will yield - // (SleepError/MessageWaitError), then on the next run, loadStorage() will - // pick up the new message. This is intentional - no polling during execution. - - async listen( - name: string, - messageName: string | string[], - ): Promise> { - this.assertNotInProgress(); - this.checkEvicted(); + // === Queue === - this.entryInProgress = true; - try { - const messages = await this.executeListenN(name, messageName, 1); - const message = messages[0]; - if (!message) { - throw new HistoryDivergedError("Expected message for listen()"); - } - return this.toListenMessage(message); - } finally { - this.entryInProgress = false; + private async queueSend(name: string, body: unknown): Promise { + const message: Message = { + id: crypto.randomUUID(), + name, + data: body, + sentAt: Date.now(), + }; + if (!this.messageDriver.receiveMessages) { + this.storage.messages.push(message); } + await this.messageDriver.addMessage(message); } - async listenN( + private async queueNext( name: string, - messageName: string, - limit: number, - ): Promise { + opts?: WorkflowQueueNextOptions, + ): Promise>> { this.assertNotInProgress(); this.checkEvicted(); this.entryInProgress = true; try { - const messages = await this.executeListenN(name, messageName, limit); - await Promise.all( - messages.map((message) => this.completeConsumedMessage(message)), - ); - return messages.map((message) => message.data as T); + return await this.executeQueueNext(name, opts); } finally { this.entryInProgress = false; } } - private async executeListenN( + private async executeQueueNext( name: string, - messageName: string | string[], - limit: number, - ): Promise { - const messageNames = this.normalizeMessageNames(messageName); + opts?: WorkflowQueueNextOptions, + ): Promise>> { + if (this.pendingCompletableMessageIds.size > 0) { + throw new Error( + "Previous completable queue message is not completed. Call `message.complete(...)` before receiving the next message.", + ); + } + + const resolvedOpts = opts ?? {}; + const messageNames = this.normalizeQueueNames(resolvedOpts.names); const messageNameLabel = this.messageNamesLabel(messageNames); + const count = Math.max(1, resolvedOpts.count ?? 1); + const completable = resolvedOpts.completable === true; - // Check for duplicate name in current execution this.checkDuplicateName(name); - // Check for replay: first check if we have a count entry const countLocation = appendName( this.storage, this.currentLocation, @@ -1020,99 +1017,103 @@ export class WorkflowContextImpl implements WorkflowContextInterface { ); const countKey = locationToKey(this.storage, countLocation); const existingCount = this.storage.history.entries.get(countKey); - - // Mark count entry as visited this.markVisited(countKey); - this.stopRollbackIfMissing(existingCount); - if (existingCount && existingCount.kind.type === "message") { - // Replay: read all recorded messages - const count = existingCount.kind.data.data as number; - const results: Message[] = []; - - for (let i = 0; i < count; i++) { - const messageLocation = appendName( - this.storage, - this.currentLocation, - `${name}:${i}`, - ); - const messageKey = locationToKey(this.storage, messageLocation); + let deadline: number | undefined; + let deadlineEntry: Entry | undefined; + if (resolvedOpts.timeout !== undefined) { + const deadlineLocation = appendName( + this.storage, + this.currentLocation, + `${name}:deadline`, + ); + const deadlineKey = locationToKey(this.storage, deadlineLocation); + deadlineEntry = this.storage.history.entries.get(deadlineKey); + this.markVisited(deadlineKey); + this.stopRollbackIfMissing(deadlineEntry); + + if (deadlineEntry && deadlineEntry.kind.type === "sleep") { + deadline = deadlineEntry.kind.data.deadline; + } else { + deadline = Date.now() + resolvedOpts.timeout; + const created = createEntry(deadlineLocation, { + type: "sleep", + data: { deadline, state: "pending" }, + }); + setEntry(this.storage, deadlineLocation, created); + created.dirty = true; + await this.flushStorage(); + deadlineEntry = created; + } + } - // Mark each message entry as visited - this.markVisited(messageKey); + if (existingCount && existingCount.kind.type === "message") { + const replayCount = existingCount.kind.data.data as number; + return await this.readReplayQueueMessages( + name, + replayCount, + completable, + ); + } - const existingMessage = - this.storage.history.entries.get(messageKey); - if ( - existingMessage && - existingMessage.kind.type === "message" - ) { - results.push( - this.fromHistoryListenMessage( - existingMessage.kind.data.name, - existingMessage.kind.data.data, - ), - ); - } + const now = Date.now(); + if (deadline !== undefined && now >= deadline) { + if (deadlineEntry && deadlineEntry.kind.type === "sleep") { + deadlineEntry.kind.data.state = "completed"; + deadlineEntry.dirty = true; } - - return results; + await this.recordQueueCountEntry( + countLocation, + `${messageNameLabel}:count`, + 0, + ); + return []; } - // Try to consume messages immediately - const messages = await consumeMessages( - this.storage, - this.messageDriver, + const received = await this.receiveMessagesNow( messageNames, - limit, + count, + completable, ); - - if (messages.length > 0) { - // Record each message in history with indexed names - for (let i = 0; i < messages.length; i++) { - const messageLocation = appendName( - this.storage, - this.currentLocation, - `${name}:${i}`, - ); - const messageEntry = createEntry(messageLocation, { - type: "message", - data: { - name: messages[i].name, - data: this.toHistoryListenMessage(messages[i]), - }, - }); - setEntry(this.storage, messageLocation, messageEntry); - - // Mark as visited - this.markVisited(locationToKey(this.storage, messageLocation)); + if (received.length > 0) { + const historyMessages = received.map((message) => + this.toWorkflowQueueMessage(message), + ); + if (deadlineEntry && deadlineEntry.kind.type === "sleep") { + deadlineEntry.kind.data.state = "interrupted"; + deadlineEntry.dirty = true; } - - // Record the count for replay - const countEntry = createEntry(countLocation, { - type: "message", - data: { - name: `${messageNameLabel}:count`, - data: messages.length, - }, - }); - setEntry(this.storage, countLocation, countEntry); - - await this.flushStorage(); - - return messages; + await this.recordQueueMessages( + name, + countLocation, + messageNames, + historyMessages, + ); + const queueMessages = received.map((message, index) => + this.createQueueMessage(message, completable, { + historyLocation: appendName( + this.storage, + this.currentLocation, + `${name}:${index}`, + ), + }), + ); + return queueMessages; } - // No messages found, throw to yield to scheduler - throw new MessageWaitError(messageNames); + if (deadline === undefined) { + throw new MessageWaitError(messageNames); + } + throw new SleepError(deadline, messageNames); } - private normalizeMessageNames(messageName: string | string[]): string[] { - const names = Array.isArray(messageName) ? messageName : [messageName]; + private normalizeQueueNames(names?: readonly string[]): string[] { + if (!names || names.length === 0) { + return []; + } const deduped: string[] = []; const seen = new Set(); - for (const name of names) { if (seen.has(name)) { continue; @@ -1120,466 +1121,280 @@ export class WorkflowContextImpl implements WorkflowContextInterface { seen.add(name); deduped.push(name); } - - if (deduped.length === 0) { - throw new Error("listen() requires at least one message name"); - } - return deduped; } private messageNamesLabel(messageNames: string[]): string { + if (messageNames.length === 0) { + return "*"; + } return messageNames.length === 1 ? messageNames[0] : messageNames.join("|"); } - private toListenMessage(message: Message): WorkflowListenMessage { - return { - id: message.id, - name: message.name, - body: message.data as T, - complete: async (response?: unknown) => { - if (message.complete) { - await message.complete(response); - return; - } - if (this.messageDriver.completeMessage) { - await this.messageDriver.completeMessage(message.id, response); - } - }, - }; - } - - private async completeConsumedMessage(message: Message): Promise { - if (message.complete) { - await message.complete(); - return; - } - if (message.id && this.messageDriver.completeMessage) { - await this.messageDriver.completeMessage(message.id); - } - } - - private toHistoryListenMessage(message: Message): unknown { - return { - [LISTEN_HISTORY_MESSAGE_MARKER]: 1, - id: message.id, - name: message.name, - body: message.data, - }; - } - - private fromHistoryListenMessage(name: string, value: unknown): Message { - if ( - typeof value === "object" && - value !== null && - (value as Record)[LISTEN_HISTORY_MESSAGE_MARKER] === 1 - ) { - const serialized = value as Record; - const id = - typeof serialized.id === "string" ? serialized.id : ""; - const serializedName = - typeof serialized.name === "string" ? serialized.name : name; - const complete = async (response?: unknown) => { - if (!id || !this.messageDriver.completeMessage) { - return; - } - await this.messageDriver.completeMessage(id, response); - }; - - return { - id, - name: serializedName, - data: serialized.body, - sentAt: 0, - complete, - }; - } - - return { - id: "", - name, - data: value, - sentAt: 0, - }; - } - - async listenWithTimeout( - name: string, - messageName: string, - timeoutMs: number, - ): Promise { - const deadline = Date.now() + timeoutMs; - return this.listenUntil(name, messageName, deadline); - } - - async listenUntil( - name: string, - messageName: string, - timestampMs: number, - ): Promise { - this.assertNotInProgress(); - this.checkEvicted(); - - this.entryInProgress = true; - try { - return await this.executeListenUntil( - name, - messageName, - timestampMs, - ); - } finally { - this.entryInProgress = false; - } - } - - private async executeListenUntil( - name: string, - messageName: string, - deadline: number, - ): Promise { - // Check for duplicate name in current execution - this.checkDuplicateName(name); - - const sleepLocation = appendName( - this.storage, - this.currentLocation, - name, - ); - const messageLocation = appendName( - this.storage, - this.currentLocation, - `${name}:message`, - ); - const sleepKey = locationToKey(this.storage, sleepLocation); - const messageKey = locationToKey(this.storage, messageLocation); - - // Mark entries as visited for validateComplete - this.markVisited(sleepKey); - this.markVisited(messageKey); - - const existingSleep = this.storage.history.entries.get(sleepKey); - - this.stopRollbackIfMissing(existingSleep); - - // Check for replay - if (existingSleep && existingSleep.kind.type === "sleep") { - const sleepData = existingSleep.kind.data; - if (sleepData.state === "completed") { - return null; - } - - if (sleepData.state === "interrupted") { - const existingMessage = this.storage.history.entries.get(messageKey); - if ( - existingMessage && - existingMessage.kind.type === "message" - ) { - const replayedMessage = this.fromHistoryListenMessage( - existingMessage.kind.data.name, - existingMessage.kind.data.data, - ); - await this.completeConsumedMessage(replayedMessage); - return replayedMessage.data as T; - } - throw new HistoryDivergedError( - "Expected message entry after interrupted sleep", - ); - } - - this.stopRollbackIfIncomplete(true); - - deadline = sleepData.deadline; - } else { - this.stopRollbackIfIncomplete(true); - - // Create sleep entry - const sleepEntry = createEntry(sleepLocation, { - type: "sleep", - data: { deadline, state: "pending" }, + private async receiveMessagesNow( + messageNames: string[], + count: number, + completable: boolean, + ): Promise { + if (this.messageDriver.receiveMessages) { + return await this.messageDriver.receiveMessages({ + names: messageNames.length > 0 ? messageNames : undefined, + count, + completable, }); - setEntry(this.storage, sleepLocation, sleepEntry); - sleepEntry.dirty = true; - await this.flushStorage(); } - - const now = Date.now(); - const remaining = deadline - now; - - // Deadline passed, check for message one more time - if (remaining <= 0) { - const message = await consumeMessage( + if (completable) { + return peekMessages( this.storage, - this.messageDriver, - messageName, + messageNames.length > 0 ? messageNames : [], + count, ); - const sleepEntry = getEntry(this.storage, sleepLocation)!; - - if (message) { - if (sleepEntry.kind.type === "sleep") { - sleepEntry.kind.data.state = "interrupted"; - } - sleepEntry.dirty = true; - - const messageEntry = createEntry(messageLocation, { - type: "message", - data: { - name: message.name, - data: this.toHistoryListenMessage(message), - }, - }); - setEntry(this.storage, messageLocation, messageEntry); - await this.flushStorage(); - await this.completeConsumedMessage(message); - - return message.data as T; - } - - if (sleepEntry.kind.type === "sleep") { - sleepEntry.kind.data.state = "completed"; - } - sleepEntry.dirty = true; - await this.flushStorage(); - return null; } - - // Check for message (messages are loaded at workflow start, no polling needed) - const message = await consumeMessage( + return await consumeMessages( this.storage, this.messageDriver, - messageName, + messageNames.length > 0 ? messageNames : [], + count, ); - if (message) { - const sleepEntry = getEntry(this.storage, sleepLocation)!; - if (sleepEntry.kind.type === "sleep") { - sleepEntry.kind.data.state = "interrupted"; - } - sleepEntry.dirty = true; + } + private async recordQueueMessages( + name: string, + countLocation: Location, + messageNames: string[], + messages: Array>, + ): Promise { + for (let i = 0; i < messages.length; i++) { + const messageLocation = appendName( + this.storage, + this.currentLocation, + `${name}:${i}`, + ); const messageEntry = createEntry(messageLocation, { type: "message", data: { - name: message.name, - data: this.toHistoryListenMessage(message), + name: messages[i].name, + data: this.toHistoryQueueMessage(messages[i]), }, }); setEntry(this.storage, messageLocation, messageEntry); - await this.flushStorage(); - await this.completeConsumedMessage(message); - - return message.data as T; + this.markVisited(locationToKey(this.storage, messageLocation)); } + await this.recordQueueCountEntry( + countLocation, + `${this.messageNamesLabel(messageNames)}:count`, + messages.length, + ); + } - // Message not available, yield to scheduler until deadline or message - throw new SleepError(deadline, [messageName]); + private async recordQueueCountEntry( + countLocation: Location, + countLabel: string, + count: number, + ): Promise { + const countEntry = createEntry(countLocation, { + type: "message", + data: { + name: countLabel, + data: count, + }, + }); + setEntry(this.storage, countLocation, countEntry); + await this.flushStorage(); } - async listenNWithTimeout( + private async readReplayQueueMessages( name: string, - messageName: string, - limit: number, - timeoutMs: number, - ): Promise { - this.assertNotInProgress(); - this.checkEvicted(); - - this.entryInProgress = true; - try { - return await this.executeListenNWithTimeout( - name, - messageName, - limit, - timeoutMs, + count: number, + completable: boolean, + ): Promise>> { + const results: Array> = []; + for (let i = 0; i < count; i++) { + const messageLocation = appendName( + this.storage, + this.currentLocation, + `${name}:${i}`, + ); + const messageKey = locationToKey(this.storage, messageLocation); + this.markVisited(messageKey); + const existingMessage = this.storage.history.entries.get(messageKey); + if (!existingMessage || existingMessage.kind.type !== "message") { + throw new HistoryDivergedError( + `Expected queue message "${name}:${i}" in history`, + ); + } + const parsed = this.fromHistoryQueueMessage( + existingMessage.kind.data.name, + existingMessage.kind.data.data, + ); + results.push( + this.createQueueMessage(parsed.message, completable, { + historyLocation: messageLocation, + completed: parsed.completed, + replay: true, + }), ); - } finally { - this.entryInProgress = false; } + return results; } - private async executeListenNWithTimeout( - name: string, - messageName: string, - limit: number, - timeoutMs: number, - ): Promise { - // Check for duplicate name in current execution - this.checkDuplicateName(name); - - // Use a sleep entry to store the deadline for replay - const sleepLocation = appendName( - this.storage, - this.currentLocation, - `${name}:deadline`, - ); - const sleepKey = locationToKey(this.storage, sleepLocation); - const existingSleep = this.storage.history.entries.get(sleepKey); + private toWorkflowQueueMessage(message: Message): WorkflowQueueMessage { + return { + id: message.id, + name: message.name, + body: message.data as T, + createdAt: message.sentAt, + }; + } - this.markVisited(sleepKey); + private createQueueMessage( + message: Message, + completable: boolean, + opts?: { + historyLocation?: Location; + completed?: boolean; + replay?: boolean; + }, + ): WorkflowQueueMessage { + const queueMessage = this.toWorkflowQueueMessage(message); + if (!completable) { + return queueMessage; + } + + if (opts?.replay && opts.completed) { + return { + ...queueMessage, + complete: async () => { + // No-op: this message was already completed in a prior run. + }, + }; + } - this.stopRollbackIfMissing(existingSleep); + const messageId = message.id; + this.pendingCompletableMessageIds.add(messageId); + let completed = false; - let deadline: number; + return { + ...queueMessage, + complete: async (response?: unknown) => { + if (completed) { + throw new Error("Queue message already completed"); + } + completed = true; + try { + await this.completeMessage(message, response); + await this.markQueueMessageCompleted(opts?.historyLocation); + this.pendingCompletableMessageIds.delete(messageId); + } catch (error) { + completed = false; + throw error; + } + }, + }; + } - if (existingSleep && existingSleep.kind.type === "sleep") { - // Replay: use stored deadline - deadline = existingSleep.kind.data.deadline; - } else { - // New execution: calculate and store deadline - deadline = Date.now() + timeoutMs; - const sleepEntry = createEntry(sleepLocation, { - type: "sleep", - data: { deadline, state: "pending" }, - }); - setEntry(this.storage, sleepLocation, sleepEntry); - sleepEntry.dirty = true; - // Flush immediately to persist deadline before potential SleepError - await this.flushStorage(); + private async markQueueMessageCompleted( + historyLocation: Location | undefined, + ): Promise { + if (!historyLocation) { + return; } - - return this.executeListenNUntilImpl( - name, - messageName, - limit, - deadline, + const key = locationToKey(this.storage, historyLocation); + const entry = this.storage.history.entries.get(key); + if (!entry || entry.kind.type !== "message") { + return; + } + const parsed = this.fromHistoryQueueMessage( + entry.kind.data.name, + entry.kind.data.data, + ); + entry.kind.data.data = this.toHistoryQueueMessage( + this.toWorkflowQueueMessage(parsed.message), + true, ); + entry.dirty = true; + await this.flushStorage(); } - async listenNUntil( - name: string, - messageName: string, - limit: number, - timestampMs: number, - ): Promise { - this.assertNotInProgress(); - this.checkEvicted(); - - // Check for duplicate name in current execution - this.checkDuplicateName(name); - - this.entryInProgress = true; - try { - return await this.executeListenNUntilImpl( - name, - messageName, - limit, - timestampMs, - ); - } finally { - this.entryInProgress = false; + private async completeMessage( + message: Message, + response?: unknown, + ): Promise { + if (message.complete) { + await message.complete(response); + return; } - } - - /** - * Internal implementation for listenNUntil with proper replay support. - * Stores the count and individual messages for deterministic replay. - */ - private async executeListenNUntilImpl( - name: string, - messageName: string, - limit: number, - deadline: number, - ): Promise { - // Check for replay: look for count entry - const countLocation = appendName( - this.storage, - this.currentLocation, - `${name}:count`, + if (this.messageDriver.completeMessage) { + await this.messageDriver.completeMessage(message.id, response); + return; + } + const deleted = await this.messageDriver.deleteMessages([message.id]); + if (!deleted.includes(message.id)) { + return; + } + const idx = this.storage.messages.findIndex((entry) => + entry.id === message.id ); - const countKey = locationToKey(this.storage, countLocation); - const existingCount = this.storage.history.entries.get(countKey); - - this.markVisited(countKey); - - this.stopRollbackIfMissing(existingCount); - - if (existingCount && existingCount.kind.type === "message") { - // Replay: read all recorded messages - const count = existingCount.kind.data.data as number; - const results: T[] = []; - - for (let i = 0; i < count; i++) { - const messageLocation = appendName( - this.storage, - this.currentLocation, - `${name}:${i}`, - ); - const messageKey = locationToKey(this.storage, messageLocation); - - this.markVisited(messageKey); - - const existingMessage = this.storage.history.entries.get(messageKey); - if ( - existingMessage && - existingMessage.kind.type === "message" - ) { - const replayedMessage = this.fromHistoryListenMessage( - existingMessage.kind.data.name, - existingMessage.kind.data.data, - ); - await this.completeConsumedMessage(replayedMessage); - results.push(replayedMessage.data as T); - } - } - - return results; + if (idx !== -1) { + this.storage.messages.splice(idx, 1); } + } - // New execution: collect messages until timeout or limit reached - const results: T[] = []; - - for (let i = 0; i < limit; i++) { - const now = Date.now(); - if (now >= deadline) { - break; - } - - // Try to consume a message - const message = await consumeMessage( - this.storage, - this.messageDriver, - messageName, - ); - if (!message) { - // No message available - check if we should wait - if (results.length === 0) { - // No messages yet - yield to scheduler until deadline or message - throw new SleepError(deadline, [messageName]); - } - // We have some messages - return what we have - break; - } + private toHistoryQueueMessage( + message: WorkflowQueueMessage, + completed = false, + ): unknown { + return { + [QUEUE_HISTORY_MESSAGE_MARKER]: 1, + id: message.id, + name: message.name, + body: message.body, + createdAt: message.createdAt, + completed, + }; + } - // Record the message - const messageLocation = appendName( - this.storage, - this.currentLocation, - `${name}:${i}`, - ); - const messageEntry = createEntry(messageLocation, { - type: "message", - data: { - name: message.name, - data: this.toHistoryListenMessage(message), + private fromHistoryQueueMessage( + name: string, + value: unknown, + ): { message: Message; completed: boolean } { + if ( + typeof value === "object" && + value !== null && + (value as Record)[QUEUE_HISTORY_MESSAGE_MARKER] === 1 + ) { + const serialized = value as Record; + const id = + typeof serialized.id === "string" ? serialized.id : ""; + const serializedName = + typeof serialized.name === "string" ? serialized.name : name; + const createdAt = + typeof serialized.createdAt === "number" ? serialized.createdAt : 0; + const completed = + typeof serialized.completed === "boolean" + ? serialized.completed + : false; + return { + message: { + id, + name: serializedName, + data: serialized.body, + sentAt: createdAt, }, - }); - setEntry(this.storage, messageLocation, messageEntry); - this.markVisited(locationToKey(this.storage, messageLocation)); - await this.completeConsumedMessage(message); - - results.push(message.data as T); + completed, + }; } - - // Record the count for replay - const countEntry = createEntry(countLocation, { - type: "message", - data: { name: `${messageName}:count`, data: results.length }, - }); - setEntry(this.storage, countLocation, countEntry); - - await this.flushStorage(); - - return results; + return { + message: { + id: "", + name, + data: value, + sentAt: 0, + }, + completed: false, + }; } // === Join === diff --git a/rivetkit-typescript/packages/workflow-engine/src/index.ts b/rivetkit-typescript/packages/workflow-engine/src/index.ts index e9f231a3c8..511a6a1109 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/index.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/index.ts @@ -100,7 +100,9 @@ export type { WorkflowContextInterface, WorkflowFunction, WorkflowHandle, - WorkflowListenMessage, + WorkflowQueue, + WorkflowQueueMessage, + WorkflowQueueNextOptions, WorkflowMessageDriver, WorkflowResult, WorkflowRunMode, @@ -200,9 +202,13 @@ function notifyMessage(runtime: LiveRuntime, name: string): void { for (let i = 0; i < runtime.messageWaiters.length; i++) { const waiter = runtime.messageWaiters[i]; - const matchIndex = runtime.pendingMessageNames.findIndex((pending) => - waiter.names.includes(pending), - ); + const matchIndex = waiter.names.length === 0 + ? runtime.pendingMessageNames.length > 0 + ? 0 + : -1 + : runtime.pendingMessageNames.findIndex((pending) => + waiter.names.includes(pending) + ); if (matchIndex !== -1) { runtime.pendingMessageNames.splice(matchIndex, 1); runtime.messageWaiters.splice(i, 1); @@ -221,9 +227,13 @@ async function waitForMessage( throw new EvictedError(); } - const matchIndex = runtime.pendingMessageNames.findIndex((pending) => - names.includes(pending), - ); + const matchIndex = names.length === 0 + ? runtime.pendingMessageNames.length > 0 + ? 0 + : -1 + : runtime.pendingMessageNames.findIndex((pending) => + names.includes(pending) + ); if (matchIndex !== -1) { runtime.pendingMessageNames.splice(matchIndex, 1); return; @@ -525,12 +535,11 @@ async function executeLiveWorkflow( return result; } - const hasMessages = - result.waitingForMessages && result.waitingForMessages.length > 0; - const hasDeadline = result.sleepUntil !== undefined; + const hasMessages = result.waitingForMessages !== undefined; + const hasDeadline = result.sleepUntil !== undefined; if (hasMessages && hasDeadline) { - // Wait for EITHER a message OR the deadline (for listenWithTimeout) + // Wait for EITHER a message OR the deadline (for queue.next timeout) try { const messagePromise = driver.waitForMessages ? awaitWithEviction( diff --git a/rivetkit-typescript/packages/workflow-engine/src/storage.ts b/rivetkit-typescript/packages/workflow-engine/src/storage.ts index 8c6fff7965..62c3671f19 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/storage.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/storage.ts @@ -398,6 +398,31 @@ export async function consumeMessage( return messages[0] ?? null; } +/** + * Peek up to N messages from the queue without consuming them. + */ +export function peekMessages( + storage: Storage, + messageName: string | string[], + limit: number, +): Message[] { + const messageNameSet = new Set( + Array.isArray(messageName) ? messageName : [messageName], + ); + const includeAll = messageNameSet.size === 0; + const results: Message[] = []; + for (const message of storage.messages) { + if (!includeAll && !messageNameSet.has(message.name)) { + continue; + } + results.push(message); + if (results.length >= limit) { + break; + } + } + return results; +} + /** * Consume up to N messages from the queue. * @@ -415,13 +440,14 @@ export async function consumeMessages( const messageNameSet = new Set( Array.isArray(messageName) ? messageName : [messageName], ); + const includeAll = messageNameSet.size === 0; // Find all matching messages up to limit (don't modify memory yet) const toConsume: { message: Message; index: number }[] = []; let count = 0; for (let i = 0; i < storage.messages.length && count < limit; i++) { - if (messageNameSet.has(storage.messages[i].name)) { + if (includeAll || messageNameSet.has(storage.messages[i].name)) { toConsume.push({ message: storage.messages[i], index: i }); count++; } diff --git a/rivetkit-typescript/packages/workflow-engine/src/types.ts b/rivetkit-typescript/packages/workflow-engine/src/types.ts index 9b2d918b35..335d249d68 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/types.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/types.ts @@ -204,13 +204,45 @@ export interface Message { } /** - * Message handle returned by listen(). + * Options for receiving queue messages in workflows. */ -export interface WorkflowListenMessage { - id: string; +export interface WorkflowQueueNextOptions { + /** + * Queue names to receive from. + * If omitted, receives from all queue names. + */ + names?: readonly string[]; + /** Maximum number of messages to receive. Defaults to 1. */ + count?: number; + /** + * Timeout in milliseconds. + * Omit to wait indefinitely. + */ + timeout?: number; + /** Whether returned messages must be manually completed. */ + completable?: boolean; +} + +/** + * Message returned by workflow queue operations. + */ +export interface WorkflowQueueMessage { + id: string | bigint; name: string; - body: T; - complete(response?: unknown): Promise; + body: TBody; + createdAt: number; + complete?(response?: unknown): Promise; +} + +/** + * Workflow queue interface. + */ +export interface WorkflowQueue { + next( + name: string, + opts?: WorkflowQueueNextOptions, + ): Promise>>; + send(name: string, body: unknown): Promise; } /** @@ -276,6 +308,17 @@ export interface Storage { export interface WorkflowMessageDriver { loadMessages(): Promise; addMessage(message: Message): Promise; + /** + * Optionally receive messages directly from the host queue implementation. + * This is used by actor-backed workflows to reuse native queue behavior. + * + * The operation must be non-blocking and return immediately. + */ + receiveMessages?(opts: { + names?: readonly string[]; + count: number; + completable: boolean; + }): Promise; /** * Delete the specified messages and return the IDs that were successfully removed. */ @@ -353,6 +396,7 @@ export type BranchOutput = T extends BranchConfig ? O : never; export interface WorkflowContextInterface { readonly workflowId: string; readonly abortSignal: AbortSignal; + readonly queue: WorkflowQueue; step(name: string, run: () => Promise): Promise; step(config: StepConfig): Promise; @@ -368,34 +412,6 @@ export interface WorkflowContextInterface { sleep(name: string, durationMs: number): Promise; sleepUntil(name: string, timestampMs: number): Promise; - listen( - name: string, - messageName: string | string[], - ): Promise>; - listenN(name: string, messageName: string, limit: number): Promise; - listenWithTimeout( - name: string, - messageName: string, - timeoutMs: number, - ): Promise; - listenUntil( - name: string, - messageName: string, - timestampMs: number, - ): Promise; - listenNWithTimeout( - name: string, - messageName: string, - limit: number, - timeoutMs: number, - ): Promise; - listenNUntil( - name: string, - messageName: string, - limit: number, - timestampMs: number, - ): Promise; - rollbackCheckpoint(name: string): Promise; join>>( diff --git a/rivetkit-typescript/packages/workflow-engine/tests/handle.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/handle.test.ts index 149da0baa0..7a166fe6c9 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/handle.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/handle.test.ts @@ -18,7 +18,12 @@ for (const mode of modes) { it("should send messages via handle", async () => { const workflow = async (ctx: WorkflowContextInterface) => { - const message = await ctx.listen("wait", "message-name"); + const [message] = await ctx.queue.next("wait", { + names: ["message-name"], + }); + if (!message) { + throw new Error("Expected message"); + } return message.body; }; diff --git a/rivetkit-typescript/packages/workflow-engine/tests/messages.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/messages.test.ts index 27da0e0467..b67fefbcf4 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/messages.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/messages.test.ts @@ -31,10 +31,12 @@ for (const mode of modes) { it("should wait for messages", async () => { const workflow = async (ctx: WorkflowContextInterface) => { - const message = await ctx.listen( - "wait-message", - "my-message", - ); + const [message] = await ctx.queue.next("wait-message", { + names: ["my-message"], + }); + if (!message) { + throw new Error("Expected message"); + } return message.body; }; @@ -55,12 +57,14 @@ for (const mode of modes) { expect(result.output).toBe("payload"); }); - it("should listen for any message in a name set", async () => { + it("should wait for any message in a name set", async () => { const workflow = async (ctx: WorkflowContextInterface) => { - const message = await ctx.listen("wait-many", [ - "first", - "second", - ]); + const [message] = await ctx.queue.next("wait-many", { + names: ["first", "second"], + }); + if (!message) { + throw new Error("Expected message"); + } return { name: message.name, body: message.body }; }; @@ -107,13 +111,15 @@ for (const mode of modes) { ), ); - const workflow = async (ctx: WorkflowContextInterface) => { - const message = await ctx.listen( - "wait-message", - "my-message", - ); - return message.body; - }; + const workflow = async (ctx: WorkflowContextInterface) => { + const [message] = await ctx.queue.next("wait-message", { + names: ["my-message"], + }); + if (!message) { + throw new Error("Expected message"); + } + return message.body; + }; const result = await runWorkflow( "wf-1", @@ -129,7 +135,7 @@ for (const mode of modes) { expect(result.output).toBe("hello"); }); - it("listen should return a completable message handle", async () => { + it("queue.next should return completable messages", async () => { const completions: Array<{ id: string; response?: unknown }> = []; const pending = [ buildMessagePayload("my-message", "hello", "msg-1") as { @@ -165,11 +171,20 @@ for (const mode of modes) { }; driver.messageDriver = messageDriver; - const workflow = async (ctx: WorkflowContextInterface) => { - const message = await ctx.listen("wait-message", "my-message"); - await message.complete({ ok: true }); - return message.body; - }; + const workflow = async (ctx: WorkflowContextInterface) => { + const [message] = await ctx.queue.next("wait-message", { + names: ["my-message"], + completable: true, + }); + if (!message) { + throw new Error("Expected message"); + } + if (!message.complete) { + throw new Error("Expected completable message"); + } + await message.complete({ ok: true }); + return message.body; + }; const result = await runWorkflow( "wf-1", @@ -186,7 +201,142 @@ for (const mode of modes) { expect(completions).toEqual([{ id: "msg-1", response: { ok: true } }]); }); - it("should collect multiple messages with listenN", async () => { + it("replay should not block the next completable queue.next", async () => { + if (mode !== "yield") { + return; + } + + await driver.set( + buildMessageKey("msg-1"), + serializeMessage(buildMessagePayload("my-message", "one", "msg-1")), + ); + await driver.set( + buildMessageKey("msg-2"), + serializeMessage(buildMessagePayload("my-message", "two", "msg-2")), + ); + + const workflow = async (ctx: WorkflowContextInterface) => { + const [first] = await ctx.queue.next("wait-first", { + names: ["my-message"], + completable: true, + }); + if (!first || !first.complete) { + throw new Error("Expected first completable message"); + } + const completeFirst = first.complete; + await ctx.step("complete-first", async () => { + await completeFirst({ ok: "first" }); + return first.body; + }); + + await ctx.sleep("between", 120); + + const [second] = await ctx.queue.next("wait-second", { + names: ["my-message"], + completable: true, + }); + if (!second || !second.complete) { + throw new Error("Expected second completable message"); + } + const completeSecond = second.complete; + await ctx.step("complete-second", async () => { + await completeSecond({ ok: "second" }); + return second.body; + }); + + return [first.body, second.body] as const; + }; + + const firstRun = await runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ).result; + expect(firstRun.state).toBe("sleeping"); + + await new Promise((resolve) => setTimeout(resolve, 140)); + + const secondRun = await runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ).result; + expect(secondRun.state).toBe("completed"); + expect(secondRun.output).toEqual(["one", "two"]); + }); + + it("replay should keep blocking if completable message was not completed", async () => { + if (mode !== "yield") { + return; + } + + await driver.set( + buildMessageKey("msg-1"), + serializeMessage( + buildMessagePayload("my-message", "one", "msg-1"), + ), + ); + await driver.set( + buildMessageKey("msg-2"), + serializeMessage( + buildMessagePayload("my-message", "two", "msg-2"), + ), + ); + + const workflow = async (ctx: WorkflowContextInterface) => { + const [first] = await ctx.queue.next("wait-first", { + names: ["my-message"], + completable: true, + }); + if (!first || !first.complete) { + throw new Error("Expected first completable message"); + } + + // Intentionally do not complete the message. + await ctx.sleep("between", 120); + + await ctx.queue.next("wait-second", { + names: ["my-message"], + completable: true, + }); + + return first.body; + }; + + const firstRun = await runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ).result; + expect(firstRun.state).toBe("sleeping"); + + await new Promise((resolve) => setTimeout(resolve, 140)); + + const secondRunHandle = runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ); + await expect(secondRunHandle.result).rejects.toThrow( + "Previous completable queue message is not completed.", + ); + + const queued = await driver.messageDriver.loadMessages(); + expect(queued.map((message) => message.id).sort()).toEqual([ + "msg-1", + "msg-2", + ]); + }); + + it("should collect multiple messages with queue.next count", async () => { await driver.set( buildMessageKey("1"), serializeMessage(buildMessagePayload("batch", "a", "1")), @@ -196,9 +346,13 @@ for (const mode of modes) { serializeMessage(buildMessagePayload("batch", "b", "2")), ); - const workflow = async (ctx: WorkflowContextInterface) => { - return await ctx.listenN("batch-wait", "batch", 2); - }; + const workflow = async (ctx: WorkflowContextInterface) => { + const messages = await ctx.queue.next("batch-wait", { + names: ["batch"], + count: 2, + }); + return messages.map((message) => message.body); + }; const result = await runWorkflow( "wf-1", @@ -214,14 +368,14 @@ for (const mode of modes) { expect(result.output).toEqual(["a", "b"]); }); - it("should time out listenWithTimeout", async () => { - const workflow = async (ctx: WorkflowContextInterface) => { - return await ctx.listenWithTimeout( - "timeout", - "missing", - 50, - ); - }; + it("should time out queue.next", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + const messages = await ctx.queue.next("timeout", { + names: ["missing"], + timeout: 50, + }); + return messages[0]?.body ?? null; + }; if (mode === "yield") { const result1 = await runWorkflow( @@ -258,7 +412,7 @@ for (const mode of modes) { expect(result.output).toBeNull(); }); - it("should return a message before listenUntil deadline", async () => { + it("should return a message before queue.next timeout", async () => { const messageId = generateId(); await driver.set( buildMessageKey(messageId), @@ -267,13 +421,13 @@ for (const mode of modes) { ), ); - const workflow = async (ctx: WorkflowContextInterface) => { - return await ctx.listenUntil( - "deadline", - "deadline", - Date.now() + 1000, - ); - }; + const workflow = async (ctx: WorkflowContextInterface) => { + const messages = await ctx.queue.next("deadline", { + names: ["deadline"], + timeout: 1000, + }); + return messages[0]?.body ?? null; + }; const result = await runWorkflow( "wf-1", @@ -289,15 +443,15 @@ for (const mode of modes) { expect(result.output).toBe("data"); }); - it("should wait for listenNWithTimeout messages", async () => { - const workflow = async (ctx: WorkflowContextInterface) => { - return await ctx.listenNWithTimeout( - "batch", - "batch", - 2, - 5000, - ); - }; + it("should wait for queue.next timeout messages", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + const messages = await ctx.queue.next("batch", { + names: ["batch"], + count: 2, + timeout: 5000, + }); + return messages.map((message) => message.body); + }; const handle = runWorkflow("wf-1", workflow, undefined, driver, { mode, @@ -327,7 +481,7 @@ for (const mode of modes) { expect(result.output).toEqual(["first"]); }); - it("should respect limits and FIFO ordering in listenNUntil", async () => { + it("should respect limits and FIFO ordering in queue.next", async () => { await driver.set( buildMessageKey("1"), serializeMessage(buildMessagePayload("fifo", "first", "1")), @@ -341,14 +495,14 @@ for (const mode of modes) { serializeMessage(buildMessagePayload("fifo", "third", "3")), ); - const workflow = async (ctx: WorkflowContextInterface) => { - return await ctx.listenNUntil( - "fifo", - "fifo", - 2, - Date.now() + 1000, - ); - }; + const workflow = async (ctx: WorkflowContextInterface) => { + const messages = await ctx.queue.next("fifo", { + names: ["fifo"], + count: 2, + timeout: 1000, + }); + return messages.map((message) => message.body); + }; const result = await runWorkflow( "wf-1", @@ -381,9 +535,14 @@ for (const mode of modes) { return "ready"; }); - const message = await ctx.listen("wait", "mid"); - return message.body; - }; + const [message] = await ctx.queue.next("wait", { + names: ["mid"], + }); + if (!message) { + throw new Error("Expected message"); + } + return message.body; + }; const handle = runWorkflow("wf-1", workflow, undefined, driver, { mode, diff --git a/rivetkit-typescript/packages/workflow-engine/tests/removals.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/removals.test.ts index a6ec0165c7..356828ab57 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/removals.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/removals.test.ts @@ -47,9 +47,11 @@ for (const mode of modes) { return Loop.continue({ count: state.count + 1 }); }, }); - await ctx.sleep("old-sleep", 0); - await ctx.listen("old-listen", "old-message"); - await ctx.join("old-join", { + await ctx.sleep("old-sleep", 0); + await ctx.queue.next("old-listen", { + names: ["old-message"], + }); + await ctx.join("old-join", { branch: { run: async () => "ok", },