diff --git a/frontend/src/components/actors/actors-actor-details.tsx b/frontend/src/components/actors/actors-actor-details.tsx index ece70b89ba..7394dc04f3 100644 --- a/frontend/src/components/actors/actors-actor-details.tsx +++ b/frontend/src/components/actors/actors-actor-details.tsx @@ -127,6 +127,21 @@ export function ActorTabs({ > Connections + + + Queue + + + Workflow + )} + + {guardContent || } + + + {guardContent || } + + + {guardContent || } + { + if (!c.state.disconnectInsertEnabled) { + return; + } + + if (c.state.disconnectInsertDelayMs > 0) { + await new Promise((resolve) => + setTimeout(resolve, c.state.disconnectInsertDelayMs), + ); + } + + await c.db.execute( + `INSERT INTO test_data (value, payload, created_at) VALUES ('__disconnect__', '', ${Date.now()})`, + ); + }, actions: { + configureDisconnectInsert: (c, enabled: boolean, delayMs: number) => { + c.state.disconnectInsertEnabled = enabled; + c.state.disconnectInsertDelayMs = Math.max( + 0, + Math.floor(delayMs), + ); + }, + getDisconnectInsertCount: async (c) => { + const results = await c.db.execute<{ count: number }>( + `SELECT COUNT(*) as count FROM test_data WHERE value = '__disconnect__'`, + ); + return results[0]?.count ?? 0; + }, reset: async (c) => { await c.db.execute(`DELETE FROM test_data`); }, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts index 1eac26184e..8f1b2c7d50 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts @@ -2,6 +2,10 @@ import { actor } from "rivetkit"; import { db } from "rivetkit/db"; export const dbActorRaw = actor({ + state: { + disconnectInsertEnabled: false, + disconnectInsertDelayMs: 0, + }, db: db({ onMigrate: async (db) => { await db.execute(` @@ -14,7 +18,35 @@ export const dbActorRaw = actor({ `); }, }), + onDisconnect: async (c) => { + if (!c.state.disconnectInsertEnabled) { + return; + } + + if (c.state.disconnectInsertDelayMs > 0) { + await new Promise((resolve) => + setTimeout(resolve, c.state.disconnectInsertDelayMs), + ); + } + + await c.db.execute( + `INSERT INTO test_data (value, payload, created_at) VALUES ('__disconnect__', '', ${Date.now()})`, + ); + }, actions: { + configureDisconnectInsert: (c, enabled: boolean, delayMs: number) => { + c.state.disconnectInsertEnabled = enabled; + c.state.disconnectInsertDelayMs = Math.max( + 0, + Math.floor(delayMs), + ); + }, + getDisconnectInsertCount: async (c) => { + const results = await c.db.execute<{ count: number }>( + `SELECT COUNT(*) as count FROM test_data WHERE value = '__disconnect__'`, + ); + return results[0]?.count ?? 0; + }, reset: async (c) => { await c.db.execute(`DELETE FROM test_data`); }, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts index 4c3e20f234..93614c95d8 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts @@ -260,14 +260,17 @@ export class Conn< }); } - this.#actor.connectionManager.connDisconnected(this); + try { + await this.#actor.connectionManager.connDisconnected(this); + } finally { + this[CONN_DRIVER_SYMBOL] = undefined; + } } else { this.#actor.rLog.warn({ msg: "missing connection driver state for disconnect", conn: this.id, }); + this[CONN_DRIVER_SYMBOL] = undefined; } - - this[CONN_DRIVER_SYMBOL] = undefined; } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts index 11ec88fb3d..0788d037e3 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts @@ -50,6 +50,7 @@ export class ConnectionManager< > { #actor: ActorInstance; #connections = new Map>(); + #pendingDisconnectCount = 0; /** Connections that have had their state changed and need to be persisted. */ #connsWithPersistChanged = new Set(); @@ -70,6 +71,10 @@ export class ConnectionManager< return this.#connsWithPersistChanged; } + get pendingDisconnectCount(): number { + return this.#pendingDisconnectCount; + } + clearConnWithPersistChanged() { this.#connsWithPersistChanged.clear(); } @@ -307,9 +312,8 @@ export class ConnectionManager< this.#actor.eventManager.removeSubscription(eventName, conn, true); } - this.#actor.resetSleepTimer(); - this.#actor.inspector.emitter.emit("connectionsUpdated"); + this.#pendingDisconnectCount += 1; const attributes = { "rivet.conn.id": conn.id, @@ -321,7 +325,6 @@ export class ConnectionManager< attributes, ); - // Trigger disconnect try { if (this.#actor.config.onDisconnect) { const result = this.#actor.traces.withSpan(span, () => @@ -336,23 +339,9 @@ export class ConnectionManager< span, ); if (result instanceof Promise) { - result - .then(() => { - this.#actor.endTraceSpan(span, { code: "OK" }); - }) - .catch((error) => { - this.#actor.endTraceSpan(span, { - code: "ERROR", - message: stringifyError(error), - }); - this.#actor.rLog.error({ - msg: "error in `onDisconnect`", - error: stringifyError(error), - }); - }); - } else { - this.#actor.endTraceSpan(span, { code: "OK" }); + await result; } + this.#actor.endTraceSpan(span, { code: "OK" }); } else { this.#actor.emitTraceEvent( "connection.disconnect", @@ -370,29 +359,35 @@ export class ConnectionManager< msg: "error in `onDisconnect`", error: stringifyError(error), }); - } - - // Remove from connsWithPersistChanged after onDisconnect to handle any - // state changes made during the disconnect callback. Disconnected connections - // are removed from KV storage via kvBatchDelete below, not through the - // normal persist save flow, so they should not trigger persist saves. - this.#connsWithPersistChanged.delete(conn.id); - - // Remove from KV storage - if (conn.isHibernatable) { - const key = makeConnKey(conn.id); - try { - await this.#actor.driver.kvBatchDelete(this.#actor.id, [key]); - this.#actor.rLog.debug({ - msg: "removed connection from KV", - connId: conn.id, - }); - } catch (err) { - this.#actor.rLog.error({ - msg: "kvBatchDelete failed for conn", - err: stringifyError(err), - }); + } finally { + // Remove from connsWithPersistChanged after onDisconnect to handle any + // state changes made during the disconnect callback. Disconnected connections + // are removed from KV storage via kvBatchDelete below, not through the + // normal persist save flow, so they should not trigger persist saves. + this.#connsWithPersistChanged.delete(conn.id); + + // Remove from KV storage. + if (conn.isHibernatable) { + const key = makeConnKey(conn.id); + try { + await this.#actor.driver.kvBatchDelete(this.#actor.id, [key]); + this.#actor.rLog.debug({ + msg: "removed connection from KV", + connId: conn.id, + }); + } catch (err) { + this.#actor.rLog.error({ + msg: "kvBatchDelete failed for conn", + err: stringifyError(err), + }); + } } + + this.#pendingDisconnectCount = Math.max( + 0, + this.#pendingDisconnectCount - 1, + ); + this.#actor.resetSleepTimer(); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index d1dac6d1ce..5def66f188 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -75,6 +75,7 @@ enum CanSleep { NotReady, NotStarted, ActiveConns, + ActiveDisconnectCallbacks, ActiveHonoHttpRequests, ActiveKeepAwake, ActiveRun, @@ -1533,6 +1534,10 @@ export class ActorInstance< // } } + if (this.connectionManager.pendingDisconnectCount > 0) { + return CanSleep.ActiveDisconnectCallbacks; + } + return CanSleep.Yes; } diff --git a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts index 3829beb9c9..3582d51103 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts @@ -91,13 +91,21 @@ class DbMutex { /** * Create a sqlite-proxy async callback from a wa-sqlite Database */ -function createProxyCallback(waDb: Database, mutex: DbMutex) { +function createProxyCallback( + waDb: Database, + mutex: DbMutex, + isClosed: () => boolean, +) { return async ( sql: string, params: any[], method: "run" | "all" | "values" | "get", ): Promise<{ rows: any }> => { return mutex.run(async () => { + if (isClosed()) { + throw new Error("database is closed"); + } + if (method === "run") { await waDb.run(sql, params); return { rows: [] }; @@ -194,9 +202,15 @@ export function db< const kvStore = createActorKvStore(ctx.kv); const waDb = await ctx.sqliteVfs.open(ctx.actorId, kvStore); waDbInstance = waDb; + let closed = false; + const ensureOpen = () => { + if (closed) { + throw new Error("database is closed"); + } + }; // Create the async proxy callback - const callback = createProxyCallback(waDb, mutex); + const callback = createProxyCallback(waDb, mutex, () => closed); // Create the drizzle instance using sqlite-proxy const client = proxyDrizzle(callback, config); @@ -209,6 +223,8 @@ export function db< ...args: unknown[] ): Promise => { return mutex.run(async () => { + ensureOpen(); + if (args.length > 0) { const result = await waDb.query(query, args); return result.rows.map((row: unknown[]) => { @@ -244,8 +260,14 @@ export function db< }); }, close: async () => { - await waDb.close(); - waDbInstance = null; + await mutex.run(async () => { + if (closed) { + return; + } + closed = true; + await waDb.close(); + waDbInstance = null; + }); }, } satisfies RawAccess); }, diff --git a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts index f5a894802e..8e8957bc69 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts @@ -94,6 +94,12 @@ export function db({ const kvStore = createActorKvStore(ctx.kv); const db = await ctx.sqliteVfs.open(ctx.actorId, kvStore); + let closed = false; + const ensureOpen = () => { + if (closed) { + throw new Error("database is closed"); + } + }; return { execute: async < @@ -103,6 +109,8 @@ export function db({ ...args: unknown[] ): Promise => { return mutex.run(async () => { + ensureOpen(); + if (args.length > 0) { // Use parameterized query when args are provided const { rows, columns } = await db.query(query, args); @@ -132,7 +140,13 @@ export function db({ }); }, close: async () => { - await db.close(); + await mutex.run(async () => { + if (closed) { + return; + } + closed = true; + await db.close(); + }); }, } satisfies RawAccess; }, diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts index a5b327c743..8c2566a227 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts @@ -93,6 +93,20 @@ export function runActorDbTests(driverTestConfig: DriverTestConfig) { } }); + test("completes onDisconnect DB writes before sleeping", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const key = `db-${variant}-disconnect-${crypto.randomUUID()}`; + + const actor = getDbActor(client, variant).getOrCreate([key]); + await actor.reset(); + await actor.configureDisconnectInsert(true, 250); + + await waitFor(driverTestConfig, SLEEP_WAIT_MS + 250); + await actor.configureDisconnectInsert(false, 0); + + expect(await actor.getDisconnectInsertCount()).toBe(1); + }); + test("handles high-volume inserts", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const actor = getDbActor(client, variant).getOrCreate([ diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts index 323b7ebd2a..fc9cb5154f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts @@ -79,6 +79,20 @@ function createPreparedDatabaseAdapter( }; } +function configureSqliteRuntimeDatabase( + rawDb: SqliteRawDatabase, + path: string, +): void { + // Wait briefly when the database file is still being released by another + // process during restarts to reduce transient "database is locked" failures. + rawDb.exec("PRAGMA busy_timeout = 5000"); + + // WAL improves concurrent read/write behavior for file-backed databases. + if (path !== ":memory:") { + rawDb.exec("PRAGMA journal_mode = WAL"); + } +} + export function loadSqliteRuntime(): SqliteRuntime { const requireFn = getRequireFn(); const loadErrors: string[] = []; @@ -92,6 +106,7 @@ export function loadSqliteRuntime(): SqliteRuntime { kind: "bun", open: (path) => { const rawDb = new bunSqlite.Database(path); + configureSqliteRuntimeDatabase(rawDb, path); const query = (sql: string) => { if (!rawDb.query) { throw new Error("bun:sqlite database missing query method"); @@ -115,6 +130,7 @@ export function loadSqliteRuntime(): SqliteRuntime { kind: "node", open: (path) => { const rawDb = new nodeSqlite.DatabaseSync(path); + configureSqliteRuntimeDatabase(rawDb, path); const prepare = (sql: string) => { if (!rawDb.prepare) { throw new Error( @@ -147,6 +163,7 @@ export function loadSqliteRuntime(): SqliteRuntime { kind: "better-sqlite3", open: (path) => { const rawDb = new BetterSqlite3(path); + configureSqliteRuntimeDatabase(rawDb, path); const prepare = (sql: string) => { if (!rawDb.prepare) { throw new Error( diff --git a/rivetkit-typescript/packages/sqlite-vfs-test/tests/sqlite-lock-repro.test.ts b/rivetkit-typescript/packages/sqlite-vfs-test/tests/sqlite-lock-repro.test.ts new file mode 100644 index 0000000000..d9015dcf83 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs-test/tests/sqlite-lock-repro.test.ts @@ -0,0 +1,78 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; + +interface SqliteLockError extends Error { + code?: string; + errcode?: number; + errstr?: string; +} + +const nodeMajorVersion = Number(process.versions.node.split(".")[0] ?? "0"); +const supportsNodeSqlite = Number.isFinite(nodeMajorVersion) && nodeMajorVersion >= 22; +const lockReproTest = supportsNodeSqlite ? it : it.skip; + +describe("sqlite lock repro", () => { + let tempDir: string | undefined; + + afterEach(() => { + if (tempDir) { + rmSync(tempDir, { recursive: true, force: true }); + tempDir = undefined; + } + }); + + lockReproTest( + "throws database is locked on stmt.get when another handle holds an exclusive txn", + async () => { + const { DatabaseSync } = await import("node:sqlite"); + + tempDir = mkdtempSync(join(tmpdir(), "sqlite-lock-repro-")); + const dbPath = join(tempDir, "actor.db"); + + const writer = new DatabaseSync(dbPath); + const reader = new DatabaseSync(dbPath); + + try { + writer.exec( + "CREATE TABLE IF NOT EXISTS kv (key BLOB PRIMARY KEY NOT NULL, value BLOB NOT NULL)", + ); + writer + .prepare("INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)") + .run(new Uint8Array([1]), new Uint8Array([2])); + + // Prepare the statement before the lock to match the failing runtime stack. + const readerStmt = reader.prepare( + "SELECT value FROM kv WHERE key = ?", + ); + + writer.exec("BEGIN EXCLUSIVE"); + writer + .prepare("INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)") + .run(new Uint8Array([3]), new Uint8Array([4])); + + let thrown: unknown; + try { + readerStmt.get(new Uint8Array([1])); + } catch (error) { + thrown = error; + } + + expect(thrown).toBeDefined(); + const sqliteError = thrown as SqliteLockError; + expect(sqliteError.code).toBe("ERR_SQLITE_ERROR"); + expect(sqliteError.errcode).toBe(5); + expect(sqliteError.errstr).toBe("database is locked"); + } finally { + try { + writer.exec("ROLLBACK"); + } catch { + // Ignore rollback failures when setup failed before BEGIN EXCLUSIVE. + } + writer.close(); + reader.close(); + } + }, + ); +});