Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions examples/sandbox/src/actors/workflow/workflow-fixtures.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import {
uniqueVarActor,
} from "./vars";
import {
workflowAccessActor,
workflowCounterActor,
workflowQueueActor,
workflowSleepActor,
Expand Down Expand Up @@ -151,6 +152,7 @@ export const registry = setup({
// From workflow.ts
workflowCounterActor,
workflowQueueActor,
workflowAccessActor,
workflowSleepActor,
// From actor-db-raw.ts
dbActorRaw,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Loop } from "@rivetkit/workflow-engine";
import { actor } from "@/actor/mod";
import { db } from "@/db/mod";
import { WORKFLOW_GUARD_KV_KEY } from "@/workflow/constants";
import { workflow, workflowQueueName } from "@/workflow/mod";
import type { registry } from "./registry";
Comment on lines 1 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The imports are not sorted according to Biome's rules. They should be sorted alphabetically. Run 'biome check --apply' to automatically fix the import sorting.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +3 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are not sorted alphabetically. Biome linter typically requires imports to be sorted. Reorder these imports alphabetically.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


const WORKFLOW_QUEUE_NAME = "workflow-default";

Expand Down Expand Up @@ -55,19 +57,95 @@ export const workflowQueueActor = actor({
name: "queue",
run: async (loopCtx) => {
const actorLoopCtx = loopCtx as any;
const payload = await loopCtx.listen(
const message = await loopCtx.listen(
"queue-wait",
WORKFLOW_QUEUE_NAME,
);
await loopCtx.step("store-message", async () => {
actorLoopCtx.state.received.push(payload);
actorLoopCtx.state.received.push(message.body);
await message.complete({ echo: message.body });
});
return Loop.continue(undefined);
},
});
}),
actions: {
getMessages: (c) => c.state.received,
sendAndWait: async (c, payload: unknown) => {
const client = c.client<typeof registry>();
const handle = client.workflowQueueActor.getForId(c.actorId);
return await handle.queue[workflowQueueName(WORKFLOW_QUEUE_NAME)].send(
payload,
{ wait: true, timeout: 1_000 },
);
},
},
});

export const workflowAccessActor = actor({
db: db({
onMigrate: async (rawDb) => {
await rawDb.execute(`
CREATE TABLE IF NOT EXISTS workflow_access_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at INTEGER NOT NULL
)
`);
},
}),
state: {
outsideDbError: null as string | null,
outsideClientError: null as string | null,
insideDbCount: 0,
insideClientAvailable: false,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "access",
run: async (loopCtx) => {
const actorLoopCtx = loopCtx as any;
let outsideDbError: string | null = null;
let outsideClientError: string | null = null;

try {
// Accessing db outside a step should throw.
// biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor.
actorLoopCtx.db;
} catch (error) {
outsideDbError =
error instanceof Error ? error.message : String(error);
}

try {
actorLoopCtx.client<typeof registry>();
} catch (error) {
outsideClientError =
error instanceof Error ? error.message : String(error);
}

await loopCtx.step("access-step", async () => {
await actorLoopCtx.db.execute(
`INSERT INTO workflow_access_log (created_at) VALUES (${Date.now()})`,
);
const counts = (await actorLoopCtx.db.execute(
`SELECT COUNT(*) as count FROM workflow_access_log`,
)) as Array<{ count: number }>;
const client = actorLoopCtx.client<typeof registry>();

actorLoopCtx.state.outsideDbError = outsideDbError;
actorLoopCtx.state.outsideClientError = outsideClientError;
actorLoopCtx.state.insideDbCount = counts[0]?.count ?? 0;
actorLoopCtx.state.insideClientAvailable =
typeof client.workflowQueueActor.getForId === "function";
});

await loopCtx.sleep("idle", 25);
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,10 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
}

/** Deletes messages matching the provided IDs. Returns the IDs that were removed. */
async deleteMessagesById(ids: bigint[]): Promise<bigint[]> {
async deleteMessagesById(
ids: bigint[],
options: { resolveWaiters?: boolean } = {},
): Promise<bigint[]> {
if (ids.length === 0) {
return [];
}
Expand All @@ -431,10 +434,20 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
if (toRemove.length === 0) {
return [];
}
await this.#removeMessages(toRemove, { resolveWaiters: true });
await this.#removeMessages(toRemove, {
resolveWaiters: options.resolveWaiters ?? true,
});
return toRemove.map((entry) => entry.id);
}

/** Completes a previously removed message by resolving its waiter, if one exists. */
async completeById(messageId: bigint, response?: unknown): Promise<void> {
this.#resolveCompletionWaiter(messageId, {
status: "completed",
response,
});
}

async #drainMessages(
nameSet: Set<string>,
count: number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,55 @@ export function runActorWorkflowTests(driverTestConfig: DriverTestConfig) {

test("consumes queue messages via workflow listen", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const actor = client.workflowQueueActor.getOrCreate([
"workflow-queue",
]);
const actor = client.workflowQueueActor.getOrCreate(["workflow-queue"]);

const queueHandle =
actor.queue[workflowQueueName(WORKFLOW_QUEUE_NAME)];
const queueHandle = actor.queue[workflowQueueName(WORKFLOW_QUEUE_NAME)];
await queueHandle.send({ hello: "world" });

await waitFor(driverTestConfig, 200);
const messages = await actor.getMessages();
expect(messages).toEqual([{ hello: "world" }]);
});

test("sleeps and resumes between ticks", async (c) => {
test("workflow listen supports completing wait sends", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const actor = client.workflowSleepActor.getOrCreate([
"workflow-sleep",
const actor = client.workflowQueueActor.getOrCreate([
"workflow-queue-wait",
]);

const result = await actor.sendAndWait({ value: 123 });
expect(result).toEqual({
status: "completed",
response: { echo: { value: 123 } },
});
});

test("db and client are step-only in workflow context", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const actor = client.workflowAccessActor.getOrCreate([
"workflow-access",
]);

let state = await actor.getState();
for (let i = 0; i < 20 && state.insideDbCount === 0; i++) {
await waitFor(driverTestConfig, 50);
state = await actor.getState();
}

expect(state.outsideDbError).toBe(
"db is only available inside workflow steps",
);
expect(state.outsideClientError).toBe(
"client is only available inside workflow steps",
);
expect(state.insideDbCount).toBeGreaterThan(0);
expect(state.insideClientAvailable).toBe(true);
});

test("sleeps and resumes between ticks", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const actor = client.workflowSleepActor.getOrCreate(["workflow-sleep"]);

const initial = await actor.getState();
await waitFor(driverTestConfig, 200);
const next = await actor.getState();
Expand Down
44 changes: 31 additions & 13 deletions rivetkit-typescript/packages/rivetkit/src/workflow/context.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { RunContext } from "@/actor/contexts/run";
import type { AnyDatabaseProvider } from "@/actor/database";
import type { Client } from "@/client/client";
import type { Registry } from "@/registry";
import type { AnyDatabaseProvider, InferDatabaseClient } from "@/actor/database";
Comment on lines +2 to +4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are not sorted alphabetically. Biome linter typically requires imports to be sorted. Reorder these imports alphabetically.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

import type { WorkflowContextInterface } from "@rivetkit/workflow-engine";
import type {
BranchConfig,
Expand All @@ -8,6 +10,7 @@ import type {
LoopConfig,
LoopResult,
StepConfig,
WorkflowListenMessage,
} from "@rivetkit/workflow-engine";
import { WORKFLOW_GUARD_KV_KEY } from "./constants";

Expand Down Expand Up @@ -42,27 +45,27 @@ export class ActorWorkflowContext<
return this.#inner.abortSignal;
}

async step<T>(
nameOrConfig: string | Parameters<WorkflowContextInterface["step"]>[0],
run?: () => Promise<T>,
): Promise<T> {
async step<T>(
nameOrConfig: string | Parameters<WorkflowContextInterface["step"]>[0],
run?: () => Promise<T>,
): Promise<T> {
if (typeof nameOrConfig === "string") {
if (!run) {
throw new Error("Step run function missing");
}
return await this.#wrapActive(() =>
this.#inner.step(nameOrConfig, () =>
this.#withActorAccess(run),
),
);
}
return await this.#wrapActive(() =>
this.#inner.step(nameOrConfig, () =>
this.#withActorAccess(run),
),
);
}
Comment on lines +48 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The step method is incorrectly indented. Remove the extra indentation at the beginning of the method definition to align it with other class methods.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +48 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is inconsistent in the step method implementation. The return statement and its content are indented more than they should be. Fix the indentation to match the surrounding code structure.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

const stepConfig = nameOrConfig as StepConfig<T>;
const config: StepConfig<T> = {
...stepConfig,
run: () => this.#withActorAccess(stepConfig.run),
};
return await this.#wrapActive(() => this.#inner.step(config));
}
}

async loop<T>(
name: string,
Expand Down Expand Up @@ -103,7 +106,10 @@ export class ActorWorkflowContext<
return this.#inner.sleepUntil(name, timestampMs);
}

listen<T>(name: string, messageName: string): Promise<T> {
listen<T>(
name: string,
messageName: string | string[],
): Promise<WorkflowListenMessage<T>> {
return this.#inner.listen(name, messageName);
}

Expand Down Expand Up @@ -212,6 +218,18 @@ export class ActorWorkflowContext<
return this.#runCtx.vars as TVars extends never ? never : TVars;
}

client<R extends Registry<any>>(): Client<R> {
this.#ensureActorAccess("client");
return this.#runCtx.client<R>();
}

get db(): TDatabase extends never ? never : InferDatabaseClient<TDatabase> {
this.#ensureActorAccess("db");
return this.#runCtx.db as TDatabase extends never
? never
: InferDatabaseClient<TDatabase>;
}

get log() {
return this.#runCtx.log;
}
Expand Down
Loading
Loading