diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts index 72fcf28612..30a72a31cf 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -68,6 +68,7 @@ import { workflowCounterActor, workflowQueueActor, workflowSleepActor, + workflowStopTeardownActor, } from "./workflow"; // Consolidated setup with all actors @@ -155,6 +156,7 @@ export const registry = setup({ workflowQueueActor, workflowAccessActor, workflowSleepActor, + workflowStopTeardownActor, // From actor-db-raw.ts dbActorRaw, // From actor-db-drizzle.ts diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts index 220652333d..70d1b41a0d 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts @@ -182,4 +182,41 @@ export const workflowSleepActor = actor({ }, }); +export const workflowStopTeardownActor = actor({ + state: { + wakeAts: [] as number[], + sleepAts: [] as number[], + }, + queues: { + never: queue(), + }, + onWake: (c) => { + c.state.wakeAts.push(Date.now()); + }, + onSleep: (c) => { + c.state.sleepAts.push(Date.now()); + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "wait-forever", + run: async (loopCtx) => { + await loopCtx.queue.next("wait-for-never", { + names: ["never"], + }); + return Loop.continue(undefined); + }, + }); + }), + actions: { + getTimeline: (c) => ({ + wakeAts: [...c.state.wakeAts], + sleepAts: [...c.state.sleepAts], + }), + }, + options: { + sleepTimeout: 75, + runStopTimeout: 2_000, + }, +}); + export { WORKFLOW_QUEUE_NAME }; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index ef794abb9a..053ef373e9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -1272,6 +1272,16 @@ export class ActorInstance< if (runResult instanceof Promise) { this.#runPromise = runResult .then(() => { + if (this.#stopCalled) { + if (runSpan.isActive()) { + this.endTraceSpan(runSpan, { code: "OK" }); + } + this.#rLog.debug({ + msg: "run handler exited during actor stop", + }); + return; + } + // Run handler exited normally - this should crash the actor this.emitTraceEvent( "actor.crash", @@ -1288,6 +1298,17 @@ export class ActorInstance< this.startDestroy(); }) .catch((error) => { + if (this.#stopCalled) { + if (runSpan.isActive()) { + this.endTraceSpan(runSpan, { code: "OK" }); + } + this.#rLog.debug({ + msg: "run handler threw during actor stop", + error: stringifyError(error), + }); + return; + } + // Run handler threw an error - crash the actor this.emitTraceEvent( "actor.crash", diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts index 37a8472e6e..b81987f594 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts @@ -79,6 +79,27 @@ export function runActorWorkflowTests(driverTestConfig: DriverTestConfig) { expect(next.ticks).toBeGreaterThan(initial.ticks); }); + test.skipIf(driverTestConfig.skip?.sleep)( + "workflow run teardown does not wait for runStopTimeout", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = client.workflowStopTeardownActor.getOrCreate([ + "workflow-stop-teardown", + ]); + + await actor.getTimeline(); + await waitFor(driverTestConfig, 1_200); + const timeline = await actor.getTimeline(); + + expect(timeline.wakeAts.length).toBeGreaterThanOrEqual(2); + expect(timeline.sleepAts.length).toBeGreaterThanOrEqual(1); + + const firstSleepDelayMs = + timeline.sleepAts[0] - timeline.wakeAts[0]; + expect(firstSleepDelayMs).toBeLessThan(1_000); + }, + ); + // NOTE: Test for workflow persistence across actor sleep is complex because // calling c.sleep() during a workflow prevents clean shutdown. The workflow // persistence is implicitly tested by the "sleeps and resumes between ticks" diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts index 23ef812b8c..ec5c063735 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts @@ -47,21 +47,21 @@ export function workflow< TEvents, TQueues >, -) => Promise { +) => Promise { const workflowInspector = createWorkflowInspectorAdapter(); async function run( runCtx: RunContext< TState, TConnParams, - TConnState, - TVars, - TInput, - TDatabase, - TEvents, - TQueues - >, - ): Promise { + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise { const actor = ( runCtx as unknown as { [ACTOR_CONTEXT_INTERNAL_SYMBOL]?: AnyActorInstance; @@ -83,30 +83,28 @@ export function workflow< }, ); - runCtx.abortSignal.addEventListener( - "abort", - () => { - handle.evict(); - }, - { once: true }, - ); - - runCtx.waitUntil( - handle.result - .then(() => { - // Ignore normal completion; the actor will be restarted if needed. - }) - .catch((error) => { - runCtx.log.error({ - msg: "workflow run failed", - error: stringifyError(error), - }); - }), - ); + const onAbort = () => { + handle.evict(); + }; + if (runCtx.abortSignal.aborted) { + onAbort(); + } else { + runCtx.abortSignal.addEventListener("abort", onAbort, { + once: true, + }); + } - return await new Promise(() => { - // Intentionally never resolve to keep the run handler alive. - }); + try { + await handle.result; + } catch (error) { + runCtx.log.error({ + msg: "workflow run failed", + error: stringifyError(error), + }); + throw error; + } finally { + runCtx.abortSignal.removeEventListener("abort", onAbort); + } } const runWithConfig = run as typeof run & {