diff --git a/.github/workflows/pkg-pr-new.yaml b/.github/workflows/pkg-pr-new.yaml index 61d775b4e7..dbcd6d0c93 100644 --- a/.github/workflows/pkg-pr-new.yaml +++ b/.github/workflows/pkg-pr-new.yaml @@ -14,5 +14,5 @@ jobs: with: node-version: '22' - run: pnpm install - - run: npx turbo build:publish -F rivetkit -F '@rivetkit/*' -F '!@rivetkit/example-registry' -F '!@rivetkit/mcp-hub' + - run: npx turbo build:publish -F rivetkit -F '@rivetkit/*' -F '!@rivetkit/mcp-hub' - run: pnpm dlx pkg-pr-new publish 'shared/typescript/*' 'engine/sdks/typescript/runner/' 'engine/sdks/typescript/runner-protocol/' 'rivetkit-typescript/packages/*' --packageManager pnpm --template './examples/*' diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts index 3f4d117145..3cd294593a 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/context.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -384,18 +384,18 @@ export class WorkflowContextImpl implements WorkflowContextInterface { const stepData = existing.kind.data; - // Replay successful result - if (stepData.output !== undefined) { - this.log("debug", { msg: "replaying step from history", step: config.name, key }); - return stepData.output as T; - } - - // Check if we should retry const metadata = await loadMetadata( this.storage, this.driver, existing.id, ); + + // Replay successful result (including void steps). + if (metadata.status === "completed" || stepData.output !== undefined) { + return stepData.output as T; + } + + // Check if we should retry const maxRetries = config.maxRetries ?? DEFAULT_MAX_RETRIES; if (metadata.attempts >= maxRetries) { @@ -628,6 +628,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { this.markVisited(key); let entry: Entry; + let metadata: EntryMetadata | undefined; let state: S; let iteration: number; let rollbackSingleIteration = false; @@ -643,6 +644,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { } const loopData = existing.kind.data; + metadata = await loadMetadata(this.storage, this.driver, existing.id); if (rollbackMode) { if (loopData.output !== undefined) { @@ -653,6 +655,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface { rollbackOutput = undefined; } + if (metadata.status === "completed") { + return loopData.output as T; + } + // Loop already completed if (loopData.output !== undefined) { return loopData.output as T; @@ -677,6 +683,13 @@ export class WorkflowContextImpl implements WorkflowContextInterface { data: { state, iteration }, }); setEntry(this.storage, location, entry); + metadata = getOrCreateMetadata(this.storage, entry.id); + } + + if (metadata) { + metadata.status = "running"; + metadata.error = undefined; + metadata.dirty = true; } // TODO: Add validation for commitInterval (must be > 0) @@ -733,6 +746,11 @@ export class WorkflowContextImpl implements WorkflowContextInterface { entry.kind.data.iteration = iteration; } entry.dirty = true; + if (metadata) { + metadata.status = "completed"; + metadata.completedAt = Date.now(); + metadata.dirty = true; + } await this.flushStorage(); await this.forgetOldIterations( diff --git a/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts index a0820db091..1eee9b967d 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts @@ -103,6 +103,41 @@ for (const mode of modes) { expect(result.output).toBe("done"); }); + it("should replay void loop output on restart", async () => { + let callCount = 0; + + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.loop("void-output", async () => { + callCount++; + return Loop.break(undefined); + }); + }; + + const result1 = await runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ).result; + + expect(result1.state).toBe("completed"); + expect(result1.output).toBeUndefined(); + expect(callCount).toBe(1); + + const result2 = await runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ).result; + + expect(result2.state).toBe("completed"); + expect(result2.output).toBeUndefined(); + expect(callCount).toBe(1); + }); + it("should resume loop from saved state", async () => { let iteration = 0; diff --git a/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts index 7d9d009aba..1fc23db948 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts @@ -99,6 +99,25 @@ for (const mode of modes) { expect(callCount).toBe(1); }); + it("should replay void step on restart", async () => { + let callCount = 0; + + const workflow = async (ctx: WorkflowContextInterface) => { + const result = await ctx.step("void-step", async () => { + callCount++; + }); + return result; + }; + + await runWorkflow("wf-1", workflow, undefined, driver, { mode }) + .result; + expect(callCount).toBe(1); + + await runWorkflow("wf-1", workflow, undefined, driver, { mode }) + .result; + expect(callCount).toBe(1); + }); + it("should execute multiple steps in sequence", async () => { const workflow = async (ctx: WorkflowContextInterface) => { const a = await ctx.step("step-a", async () => 1);