Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import {
workflowCounterActor,
workflowQueueActor,
workflowSleepActor,
workflowStopTeardownActor,
} from "./workflow";

// Consolidated setup with all actors
Expand Down Expand Up @@ -155,6 +156,7 @@ export const registry = setup({
workflowQueueActor,
workflowAccessActor,
workflowSleepActor,
workflowStopTeardownActor,
// From actor-db-raw.ts
dbActorRaw,
// From actor-db-drizzle.ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,41 @@ export const workflowSleepActor = actor({
},
});

export const workflowStopTeardownActor = actor({
state: {
wakeAts: [] as number[],
sleepAts: [] as number[],
},
queues: {
never: queue<unknown>(),
},
onWake: (c) => {
c.state.wakeAts.push(Date.now());
},
onSleep: (c) => {
c.state.sleepAts.push(Date.now());
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "wait-forever",
run: async (loopCtx) => {
await loopCtx.queue.next("wait-for-never", {
names: ["never"],
});
return Loop.continue(undefined);
},
});
}),
actions: {
getTimeline: (c) => ({
wakeAts: [...c.state.wakeAts],
sleepAts: [...c.state.sleepAts],
}),
},
options: {
sleepTimeout: 75,
runStopTimeout: 2_000,
},
});

export { WORKFLOW_QUEUE_NAME };
21 changes: 21 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,16 @@ export class ActorInstance<
if (runResult instanceof Promise) {
this.#runPromise = runResult
.then(() => {
if (this.#stopCalled) {
if (runSpan.isActive()) {
this.endTraceSpan(runSpan, { code: "OK" });
}
this.#rLog.debug({
msg: "run handler exited during actor stop",
});
return;
}

// Run handler exited normally - this should crash the actor
this.emitTraceEvent(
"actor.crash",
Expand All @@ -1288,6 +1298,17 @@ export class ActorInstance<
this.startDestroy();
})
.catch((error) => {
if (this.#stopCalled) {
if (runSpan.isActive()) {
this.endTraceSpan(runSpan, { code: "OK" });
}
this.#rLog.debug({
msg: "run handler threw during actor stop",
error: stringifyError(error),
});
return;
}

// Run handler threw an error - crash the actor
this.emitTraceEvent(
"actor.crash",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,27 @@ export function runActorWorkflowTests(driverTestConfig: DriverTestConfig) {
expect(next.ticks).toBeGreaterThan(initial.ticks);
});

test.skipIf(driverTestConfig.skip?.sleep)(
"workflow run teardown does not wait for runStopTimeout",
async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const actor = client.workflowStopTeardownActor.getOrCreate([
"workflow-stop-teardown",
]);

await actor.getTimeline();
await waitFor(driverTestConfig, 1_200);
const timeline = await actor.getTimeline();

expect(timeline.wakeAts.length).toBeGreaterThanOrEqual(2);
expect(timeline.sleepAts.length).toBeGreaterThanOrEqual(1);

const firstSleepDelayMs =
timeline.sleepAts[0] - timeline.wakeAts[0];
expect(firstSleepDelayMs).toBeLessThan(1_000);
},
);

// NOTE: Test for workflow persistence across actor sleep is complex because
// calling c.sleep() during a workflow prevents clean shutdown. The workflow
// persistence is implicitly tested by the "sleeps and resumes between ticks"
Expand Down
62 changes: 30 additions & 32 deletions rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ export function workflow<
TEvents,
TQueues
>,
) => Promise<never> {
) => Promise<void> {
const workflowInspector = createWorkflowInspectorAdapter();

async function run(
runCtx: RunContext<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>,
): Promise<never> {
TConnState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>,
): Promise<void> {
const actor = (
runCtx as unknown as {
[ACTOR_CONTEXT_INTERNAL_SYMBOL]?: AnyActorInstance;
Expand All @@ -83,30 +83,28 @@ export function workflow<
},
);

runCtx.abortSignal.addEventListener(
"abort",
() => {
handle.evict();
},
{ once: true },
);

runCtx.waitUntil(
handle.result
.then(() => {
// Ignore normal completion; the actor will be restarted if needed.
})
.catch((error) => {
runCtx.log.error({
msg: "workflow run failed",
error: stringifyError(error),
});
}),
);
const onAbort = () => {
handle.evict();
};
if (runCtx.abortSignal.aborted) {
onAbort();
} else {
runCtx.abortSignal.addEventListener("abort", onAbort, {
once: true,
});
}

return await new Promise<never>(() => {
// Intentionally never resolve to keep the run handler alive.
});
try {
await handle.result;
} catch (error) {
runCtx.log.error({
msg: "workflow run failed",
error: stringifyError(error),
});
throw error;
} finally {
runCtx.abortSignal.removeEventListener("abort", onAbort);
}
}

const runWithConfig = run as typeof run & {
Expand Down
Loading