Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { actor } from "rivetkit";
import { db } from "rivetkit/db";

type LifecycleCounts = {
create: number;
migrate: number;
cleanup: number;
};

const clientActorIds = new WeakMap<object, string>();

const createCounts = new Map<string, number>();
const migrateCounts = new Map<string, number>();
const cleanupCounts = new Map<string, number>();

function increment(map: Map<string, number>, actorId: string) {
map.set(actorId, (map.get(actorId) ?? 0) + 1);
}

function getCounts(actorId: string): LifecycleCounts {
return {
create: createCounts.get(actorId) ?? 0,
migrate: migrateCounts.get(actorId) ?? 0,
cleanup: cleanupCounts.get(actorId) ?? 0,
};
}

const baseProvider = db({
onMigrate: async (dbHandle) => {
await dbHandle.execute(`
CREATE TABLE IF NOT EXISTS lifecycle_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
value TEXT NOT NULL,
created_at INTEGER NOT NULL
)
`);
},
});

const lifecycleProvider = {
createClient: async (ctx: Parameters<typeof baseProvider.createClient>[0]) => {
const client = await baseProvider.createClient(ctx);
clientActorIds.set(client as object, ctx.actorId);
increment(createCounts, ctx.actorId);
return client;
},
onMigrate: async (client: Parameters<typeof baseProvider.onMigrate>[0]) => {
const actorId = clientActorIds.get(client as object);
if (actorId) {
increment(migrateCounts, actorId);
}
await baseProvider.onMigrate(client);
},
onDestroy: async (client: Parameters<NonNullable<typeof baseProvider.onDestroy>>[0]) => {
const actorId = clientActorIds.get(client as object);
if (actorId) {
increment(cleanupCounts, actorId);
}
await baseProvider.onDestroy?.(client);
},
};

const failingLifecycleProvider = {
createClient: async (ctx: Parameters<typeof baseProvider.createClient>[0]) => {
const client = await baseProvider.createClient(ctx);
clientActorIds.set(client as object, ctx.actorId);
increment(createCounts, ctx.actorId);
return client;
},
onMigrate: async (client: Parameters<typeof baseProvider.onMigrate>[0]) => {
const actorId = clientActorIds.get(client as object);
if (actorId) {
increment(migrateCounts, actorId);
}
throw new Error("forced migrate failure");
},
onDestroy: async (client: Parameters<NonNullable<typeof baseProvider.onDestroy>>[0]) => {
const actorId = clientActorIds.get(client as object);
if (actorId) {
increment(cleanupCounts, actorId);
}
await baseProvider.onDestroy?.(client);
},
};

export const dbLifecycle = actor({
db: lifecycleProvider,
actions: {
getActorId: (c) => c.actorId,
ping: () => "pong",
insertValue: async (c, value: string) => {
await c.db.execute(
`INSERT INTO lifecycle_data (value, created_at) VALUES ('${value}', ${Date.now()})`,
);
},
getCount: async (c) => {
const results = await c.db.execute<{ count: number }>(
`SELECT COUNT(*) as count FROM lifecycle_data`,
);
return results[0]?.count ?? 0;
},
triggerSleep: (c) => {
c.sleep();
},
triggerDestroy: (c) => {
c.destroy();
},
},
options: {
sleepTimeout: 100,
},
});

export const dbLifecycleFailing = actor({
db: failingLifecycleProvider,
actions: {
ping: () => "pong",
},
});

export const dbLifecycleObserver = actor({
actions: {
getCounts: (_c, actorId: string) => {
return getCounts(actorId);
},
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from "./action-types";
import { dbActorRaw } from "./actor-db-raw";
import { dbActorDrizzle } from "./actor-db-drizzle";
import { dbLifecycle, dbLifecycleFailing, dbLifecycleObserver } from "./db-lifecycle";
import { onStateChangeActor } from "./actor-onstatechange";
import { counterWithParams } from "./conn-params";
import { connStateActor } from "./conn-state";
Expand Down Expand Up @@ -165,10 +166,14 @@ export const registry = setup({
workflowStopTeardownActor,
// From actor-db-raw.ts
dbActorRaw,
// From actor-db-drizzle.ts
dbActorDrizzle,
// From stateless.ts
statelessActor,
// From actor-db-drizzle.ts
dbActorDrizzle,
// From db-lifecycle.ts
dbLifecycle,
dbLifecycleFailing,
dbLifecycleObserver,
// From stateless.ts
statelessActor,
// From access-control.ts
accessControlActor,
accessControlNoQueuesActor,
Expand Down
3 changes: 3 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export interface ActorDriver {
/**
* SQLite VFS instance for creating KV-backed databases.
* If not provided, the database provider will need an override.
*
* wa-sqlite's async build is not re-entrant per module instance. Drivers
* should scope this instance to a single actor when using KV-backed SQLite.
*/
sqliteVfs?: SqliteVfs;

Expand Down
123 changes: 84 additions & 39 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type SpanStatusInput,
type Traces,
} from "@rivetkit/traces";
import type { SqliteVfs } from "@rivetkit/sqlite-vfs";
import invariant from "invariant";
import type { ActorKey } from "@/actor/mod";
import type { Client } from "@/client/client";
Expand Down Expand Up @@ -169,7 +170,8 @@ export class ActorInstance<

// MARK: - Variables & Database
#vars?: V;
#db!: InferDatabaseClient<DB>;
#db?: InferDatabaseClient<DB>;
#sqliteVfs?: SqliteVfs;

// MARK: - Background Tasks
#backgroundPromises: Promise<void>[] = [];
Expand Down Expand Up @@ -475,49 +477,53 @@ export class ActorInstance<
mode,
});

// Clear sleep timeout
if (this.#sleepTimeout) {
clearTimeout(this.#sleepTimeout);
this.#sleepTimeout = undefined;
}

// Abort listeners
try {
this.#abortController.abort();
} catch { }
// Clear sleep timeout
if (this.#sleepTimeout) {
clearTimeout(this.#sleepTimeout);
this.#sleepTimeout = undefined;
}

// Wait for run handler to complete
await this.#waitForRunHandler(this.#config.options.runStopTimeout);
// Abort listeners
try {
this.#abortController.abort();
} catch { }

// Call onStop lifecycle
if (mode === "sleep") {
await this.#callOnSleep();
} else if (mode === "destroy") {
await this.#callOnDestroy();
} else {
assertUnreachable(mode);
}
// Wait for run handler to complete
await this.#waitForRunHandler(this.#config.options.runStopTimeout);

// Disconnect non-hibernatable connections
await this.#disconnectConnections();
// Call onStop lifecycle
if (mode === "sleep") {
await this.#callOnSleep();
} else if (mode === "destroy") {
await this.#callOnDestroy();
} else {
assertUnreachable(mode);
}

// Wait for background tasks
await this.#waitBackgroundPromises(
this.#config.options.waitUntilTimeout,
);
// Disconnect non-hibernatable connections
await this.#disconnectConnections();

// Clear timeouts and save state
this.#rLog.info({ msg: "clearing pending save timeouts" });
this.stateManager.clearPendingSaveTimeout();
this.#rLog.info({ msg: "saving state immediately" });
await this.stateManager.saveState({
immediate: true,
allowStoppingState: true,
});
// Wait for background tasks
await this.#waitBackgroundPromises(
this.#config.options.waitUntilTimeout,
);

// Wait for write queues
await this.stateManager.waitForPendingWrites();
await this.#scheduleManager.waitForPendingAlarmWrites();
// Clear timeouts and save state
this.#rLog.info({ msg: "clearing pending save timeouts" });
this.stateManager.clearPendingSaveTimeout();
this.#rLog.info({ msg: "saving state immediately" });
await this.stateManager.saveState({
immediate: true,
allowStoppingState: true,
});

// Wait for write queues
await this.stateManager.waitForPendingWrites();
await this.#scheduleManager.waitForPendingAlarmWrites();
} finally {
await this.#cleanupDatabase();
}
}

// MARK: - Sleep
Expand Down Expand Up @@ -1401,8 +1407,14 @@ export class ActorInstance<
return;
}

let client: InferDatabaseClient<DB> | undefined;
try {
const client = await this.#config.db.createClient({
// Every actor gets its own SqliteVfs/wa-sqlite instance. The async
// wa-sqlite build is not re-entrant, and sharing one instance across
// actors can cause cross-actor contention and runtime corruption.
this.#sqliteVfs ??= this.driver.sqliteVfs;

client = await this.#config.db.createClient({
actorId: this.#actorId,
overrideRawDatabaseClient: this.driver.overrideRawDatabaseClient
? () => this.driver.overrideRawDatabaseClient!(this.#actorId)
Expand All @@ -1415,13 +1427,24 @@ export class ActorInstance<
batchGet: (keys) => this.driver.kvBatchGet(this.#actorId, keys),
batchDelete: (keys) => this.driver.kvBatchDelete(this.#actorId, keys),
},
sqliteVfs: this.driver.sqliteVfs,
sqliteVfs: this.#sqliteVfs,
});
this.#rLog.info({ msg: "database migration starting" });
await this.#config.db.onMigrate?.(client);
this.#rLog.info({ msg: "database migration complete" });
this.#db = client;
} catch (error) {
if (client) {
try {
await this.#config.db.onDestroy?.(client);
} catch (cleanupError) {
this.#rLog.error({
msg: "database setup cleanup failed",
error: stringifyError(cleanupError),
});
}
}
this.#sqliteVfs = undefined;
if (error instanceof Error) {
this.#rLog.error({
msg: "database setup failed",
Expand All @@ -1439,6 +1462,28 @@ export class ActorInstance<
}
}

async #cleanupDatabase() {
const client = this.#db;
this.#db = undefined;
this.#sqliteVfs = undefined;

if (!client) {
return;
}
if (!("db" in this.#config) || !this.#config.db) {
return;
}

try {
await this.#config.db.onDestroy?.(client);
} catch (error) {
this.#rLog.error({
msg: "database cleanup failed",
error: stringifyError(error),
});
}
}

async #disconnectConnections() {
const promises: Promise<unknown>[] = [];
this.#rLog.debug({
Expand Down
3 changes: 2 additions & 1 deletion rivetkit-typescript/packages/rivetkit/src/db/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ export interface DatabaseProviderContext {

/**
* SQLite VFS instance for creating KV-backed databases.
* Each driver creates its own instance to avoid concurrency issues.
* This should be actor-scoped because wa-sqlite is not re-entrant per
* module instance.
*/
sqliteVfs?: SqliteVfs;
}
Expand Down
Loading
Loading