diff --git a/engine/packages/pegboard/src/actor_kv/mod.rs b/engine/packages/pegboard/src/actor_kv/mod.rs index cf57bdf78a..d47d89c44c 100644 --- a/engine/packages/pegboard/src/actor_kv/mod.rs +++ b/engine/packages/pegboard/src/actor_kv/mod.rs @@ -13,6 +13,9 @@ mod entry; mod utils; const VERSION: &str = env!("CARGO_PKG_VERSION"); + +// Keep the KV validation limits below in sync with +// rivetkit-typescript/packages/rivetkit/src/drivers/file-system/kv-limits.ts. const MAX_KEY_SIZE: usize = 2 * 1024; const MAX_VALUE_SIZE: usize = 128 * 1024; const MAX_KEYS: usize = 128; diff --git a/engine/packages/pegboard/src/actor_kv/utils.rs b/engine/packages/pegboard/src/actor_kv/utils.rs index c88b396e90..a8489fcf7b 100644 --- a/engine/packages/pegboard/src/actor_kv/utils.rs +++ b/engine/packages/pegboard/src/actor_kv/utils.rs @@ -83,7 +83,8 @@ pub fn validate_entries( for value in values { ensure!( value.len() <= MAX_VALUE_SIZE, - "value is too large (max 128 KiB)" + "value is too large (max {} KiB)", + MAX_VALUE_SIZE / 1024 ); } diff --git a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts index 255af1ac9c..b1a7f145a2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts @@ -4,7 +4,7 @@ import { type SqliteRemoteDatabase, } from "drizzle-orm/sqlite-proxy"; import type { DatabaseProvider, RawAccess } from "../config"; -import type { KvVfsOptions } from "../sqlite-vfs"; +import { AsyncMutex, createActorKvStore, toSqliteBindings } from "../shared"; export * from "./sqlite-core"; @@ -27,73 +27,12 @@ interface DatabaseFactoryConfig< migrations?: any; } -/** - * Create a KV store wrapper that uses the actor driver's KV operations - */ -function createActorKvStore(kv: { - batchPut: (entries: [Uint8Array, Uint8Array][]) => Promise; - batchGet: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>; - batchDelete: (keys: Uint8Array[]) => Promise; -}): KvVfsOptions { - return { - get: async (key: Uint8Array) => { - const results = await kv.batchGet([key]); - return results[0]; - }, - getBatch: async (keys: Uint8Array[]) => { - return await kv.batchGet(keys); - }, - put: async (key: Uint8Array, value: Uint8Array) => { - await kv.batchPut([[key, value]]); - }, - putBatch: async (entries: [Uint8Array, Uint8Array][]) => { - await kv.batchPut(entries); - }, - deleteBatch: async (keys: Uint8Array[]) => { - await kv.batchDelete(keys); - }, - }; -} - -/** - * Mutex to serialize async operations on a @rivetkit/sqlite database handle. - * @rivetkit/sqlite is not safe for concurrent operations on the same handle. - */ -class DbMutex { - #locked = false; - #waiting: (() => void)[] = []; - - async acquire(): Promise { - while (this.#locked) { - await new Promise((resolve) => this.#waiting.push(resolve)); - } - this.#locked = true; - } - - release(): void { - this.#locked = false; - const next = this.#waiting.shift(); - if (next) { - next(); - } - } - - async run(fn: () => Promise): Promise { - await this.acquire(); - try { - return await fn(); - } finally { - this.release(); - } - } -} - /** * Create a sqlite-proxy async callback from a @rivetkit/sqlite Database */ function createProxyCallback( waDb: Database, - mutex: DbMutex, + mutex: AsyncMutex, isClosed: () => boolean, ) { return async ( @@ -107,12 +46,12 @@ function createProxyCallback( } if (method === "run") { - await waDb.run(sql, params); + await waDb.run(sql, toSqliteBindings(params)); return { rows: [] }; } // For all/get/values, use parameterized query - const result = await waDb.query(sql, params); + const result = await waDb.query(sql, toSqliteBindings(params)); // drizzle's mapResultRow accesses rows by column index (positional arrays) // so we return raw arrays for all methods @@ -131,7 +70,7 @@ function createProxyCallback( */ async function runInlineMigrations( waDb: Database, - mutex: DbMutex, + mutex: AsyncMutex, migrations: any, ): Promise { // Create migrations table @@ -174,8 +113,9 @@ async function runInlineMigrations( // Record migration await mutex.run(() => - waDb.exec( - `INSERT INTO __drizzle_migrations (hash, created_at) VALUES ('${entry.tag}', ${entry.when})`, + waDb.run( + "INSERT INTO __drizzle_migrations (hash, created_at) VALUES (?, ?)", + [entry.tag, entry.when], ), ); } @@ -188,7 +128,7 @@ export function db< ): DatabaseProvider & RawAccess> { // Store the @rivetkit/sqlite Database instance alongside the drizzle client let waDbInstance: Database | null = null; - const mutex = new DbMutex(); + const mutex = new AsyncMutex(); return { createClient: async (ctx) => { @@ -226,18 +166,17 @@ export function db< ensureOpen(); if (args.length > 0) { - const result = await waDb.query(query, args); + const result = await waDb.query( + query, + toSqliteBindings(args), + ); return result.rows.map((row: unknown[]) => { const obj: Record = {}; - for ( - let i = 0; - i < result.columns.length; - i++ - ) { - obj[result.columns[i]] = row[i]; - } - return obj; - }) as TRow[]; + for (let i = 0; i < result.columns.length; i++) { + obj[result.columns[i]] = row[i]; + } + return obj; + }) as TRow[]; } // Use exec for non-parameterized queries since // @rivetkit/sqlite's query() can crash on some statements. @@ -246,17 +185,17 @@ export function db< await waDb.exec( query, (row: unknown[], columns: string[]) => { - if (!columnNames) { - columnNames = columns; - } - const obj: Record = {}; - for (let i = 0; i < row.length; i++) { - obj[columnNames[i]] = row[i]; - } - results.push(obj); - }, - ); - return results as TRow[]; + if (!columnNames) { + columnNames = columns; + } + const obj: Record = {}; + for (let i = 0; i < row.length; i++) { + obj[columnNames[i]] = row[i]; + } + results.push(obj); + }, + ); + return results as TRow[]; }); }, close: async () => { diff --git a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts index 7521752f68..f0a053a19d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts @@ -1,5 +1,5 @@ -import type { KvVfsOptions } from "./sqlite-vfs"; import type { DatabaseProvider, RawAccess } from "./config"; +import { AsyncMutex, createActorKvStore, toSqliteBindings } from "./shared"; export type { RawAccess } from "./config"; @@ -7,34 +7,6 @@ interface DatabaseFactoryConfig { onMigrate?: (db: RawAccess) => Promise | void; } -/** - * Create a KV store wrapper that uses the actor driver's KV operations - */ -function createActorKvStore(kv: { - batchPut: (entries: [Uint8Array, Uint8Array][]) => Promise; - batchGet: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>; - batchDelete: (keys: Uint8Array[]) => Promise; -}): KvVfsOptions { - return { - get: async (key: Uint8Array) => { - const results = await kv.batchGet([key]); - return results[0]; - }, - getBatch: async (keys: Uint8Array[]) => { - return await kv.batchGet(keys); - }, - put: async (key: Uint8Array, value: Uint8Array) => { - await kv.batchPut([[key, value]]); - }, - putBatch: async (entries: [Uint8Array, Uint8Array][]) => { - await kv.batchPut(entries); - }, - deleteBatch: async (keys: Uint8Array[]) => { - await kv.batchDelete(keys); - }, - }; -} - export function db({ onMigrate, }: DatabaseFactoryConfig = {}): DatabaseProvider { @@ -70,23 +42,12 @@ export function db({ const kvStore = createActorKvStore(ctx.kv); const db = await ctx.sqliteVfs.open(ctx.actorId, kvStore); let closed = false; + const mutex = new AsyncMutex(); const ensureOpen = () => { if (closed) { throw new Error("database is closed"); } }; - let op: Promise = Promise.resolve(); - - const serialize = async (fn: () => Promise): Promise => { - // Ensure @rivetkit/sqlite calls are not concurrent. Actors can process multiple - // actions concurrently, and @rivetkit/sqlite is not re-entrant. - const next = op.then(fn, fn); - op = next.then( - () => undefined, - () => undefined, - ); - return await next; - }; return { execute: async < @@ -95,7 +56,7 @@ export function db({ query: string, ...args: unknown[] ): Promise => { - return await serialize(async () => { + return await mutex.run(async () => { ensureOpen(); // `db.exec` does not support binding `?` placeholders. @@ -104,6 +65,7 @@ export function db({ // Keep using `db.exec` for non-parameterized SQL because it // supports multi-statement migrations. if (args.length > 0) { + const bindings = toSqliteBindings(args); const token = query.trimStart().slice(0, 16).toUpperCase(); const returnsRows = token.startsWith("SELECT") || @@ -111,7 +73,7 @@ export function db({ token.startsWith("WITH"); if (returnsRows) { - const { rows, columns } = await db.query(query, args); + const { rows, columns } = await db.query(query, bindings); return rows.map((row: unknown[]) => { const rowObj: Record = {}; for (let i = 0; i < columns.length; i++) { @@ -121,7 +83,7 @@ export function db({ }) as TRow[]; } - await db.run(query, args); + await db.run(query, bindings); return [] as TRow[]; } @@ -141,7 +103,7 @@ export function db({ }); }, close: async () => { - await serialize(async () => { + await mutex.run(async () => { if (closed) { return; } diff --git a/rivetkit-typescript/packages/rivetkit/src/db/shared.ts b/rivetkit-typescript/packages/rivetkit/src/db/shared.ts new file mode 100644 index 0000000000..48d83518ea --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/db/shared.ts @@ -0,0 +1,92 @@ +import type { DatabaseProviderContext } from "./config"; +import type { Database } from "@rivetkit/sqlite-vfs"; +import type { KvVfsOptions } from "./sqlite-vfs"; + +type ActorKvOperations = DatabaseProviderContext["kv"]; +type SqliteBindings = NonNullable[1]>; + +function isSqliteBindingValue(value: unknown): boolean { + if ( + value === null || + typeof value === "number" || + typeof value === "string" || + typeof value === "bigint" || + value instanceof Uint8Array + ) { + return true; + } + + if (Array.isArray(value)) { + return value.every((item) => typeof item === "number"); + } + + return false; +} + +export function toSqliteBindings(args: unknown[]): SqliteBindings { + for (const value of args) { + if (!isSqliteBindingValue(value)) { + throw new Error( + `unsupported sqlite binding type: ${typeof value}`, + ); + } + } + + return args as SqliteBindings; +} + +/** + * Create a KV store wrapper that uses the actor driver's KV operations. + */ +export function createActorKvStore(kv: ActorKvOperations): KvVfsOptions { + return { + get: async (key: Uint8Array) => { + const results = await kv.batchGet([key]); + return results[0] ?? null; + }, + getBatch: async (keys: Uint8Array[]) => { + return await kv.batchGet(keys); + }, + put: async (key: Uint8Array, value: Uint8Array) => { + await kv.batchPut([[key, value]]); + }, + putBatch: async (entries: [Uint8Array, Uint8Array][]) => { + await kv.batchPut(entries); + }, + deleteBatch: async (keys: Uint8Array[]) => { + await kv.batchDelete(keys); + }, + }; +} + +/** + * Serialize async operations on a shared non-reentrant resource. + */ +export class AsyncMutex { + #locked = false; + #waiting: (() => void)[] = []; + + async acquire(): Promise { + while (this.#locked) { + await new Promise((resolve) => this.#waiting.push(resolve)); + } + this.#locked = true; + } + + release(): void { + this.#locked = false; + const next = this.#waiting.shift(); + if (next) { + next(); + } + } + + async run(fn: () => Promise): Promise { + await this.acquire(); + try { + return await fn(); + } finally { + this.release(); + } + } +} diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts index c404a775c7..424944f9f9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts @@ -37,6 +37,12 @@ import { type SqliteRuntime, type SqliteRuntimeDatabase, } from "./sqlite-runtime"; +import { + estimateKvSize, + validateKvEntries, + validateKvKey, + validateKvKeys, +} from "./kv-limits"; // Actor handler to track running instances @@ -1322,7 +1328,10 @@ export class FileSystemGlobalState { } throw new Error(`Actor ${actorId} state not loaded`); } + const db = this.#getOrCreateActorKvDatabase(actorId); + const totalSize = estimateKvSize(db); + validateKvEntries(entries, totalSize); this.#putKvEntriesInDb(db, entries); }); } @@ -1344,6 +1353,8 @@ export class FileSystemGlobalState { } } + validateKvKeys(keys); + const db = this.#getOrCreateActorKvDatabase(actorId); const results: (Uint8Array | null)[] = []; for (const key of keys) { @@ -1372,9 +1383,11 @@ export class FileSystemGlobalState { } throw new Error(`Actor ${actorId} state not loaded`); } + if (keys.length === 0) { return; } + validateKvKeys(keys); const db = this.#getOrCreateActorKvDatabase(actorId); db.exec("BEGIN"); @@ -1410,6 +1423,7 @@ export class FileSystemGlobalState { throw new Error(`Actor ${actorId} state not loaded`); } } + validateKvKey(prefix, "prefix key"); const db = this.#getOrCreateActorKvDatabase(actorId); const upperBound = computePrefixUpperBound(prefix); diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/kv-limits.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/kv-limits.ts new file mode 100644 index 0000000000..5f204f7224 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/kv-limits.ts @@ -0,0 +1,70 @@ +import type { SqliteRuntimeDatabase } from "./sqlite-runtime"; + +// Keep these limits in sync with engine/packages/pegboard/src/actor_kv/mod.rs. +const KV_MAX_KEY_SIZE = 2 * 1024; +const KV_MAX_VALUE_SIZE = 128 * 1024; +const KV_MAX_KEYS = 128; +const KV_MAX_PUT_PAYLOAD_SIZE = 976 * 1024; +const KV_MAX_STORAGE_SIZE = 1024 * 1024 * 1024; +const KV_KEY_WRAPPER_OVERHEAD_SIZE = 2; + +export function estimateKvSize(db: SqliteRuntimeDatabase): number { + const row = db.get<{ total: number | bigint | null }>( + "SELECT COALESCE(SUM(LENGTH(key) + LENGTH(value)), 0) AS total FROM kv", + ); + return row ? Number(row.total ?? 0) : 0; +} + +export function validateKvKey( + key: Uint8Array, + keyLabel: "key" | "prefix key" = "key", +): void { + if (key.byteLength + KV_KEY_WRAPPER_OVERHEAD_SIZE > KV_MAX_KEY_SIZE) { + throw new Error(`${keyLabel} is too long (max 2048 bytes)`); + } +} + +export function validateKvKeys(keys: Uint8Array[]): void { + if (keys.length > KV_MAX_KEYS) { + throw new Error("a maximum of 128 keys is allowed"); + } + + for (const key of keys) { + validateKvKey(key); + } +} + +export function validateKvEntries( + entries: [Uint8Array, Uint8Array][], + totalSize: number, +): void { + if (entries.length > KV_MAX_KEYS) { + throw new Error("A maximum of 128 key-value entries is allowed"); + } + + let payloadSize = 0; + for (const [key, value] of entries) { + payloadSize += + key.byteLength + KV_KEY_WRAPPER_OVERHEAD_SIZE + value.byteLength; + } + + if (payloadSize > KV_MAX_PUT_PAYLOAD_SIZE) { + throw new Error("total payload is too large (max 976 KiB)"); + } + + const storageRemaining = Math.max(0, KV_MAX_STORAGE_SIZE - totalSize); + if (payloadSize > storageRemaining) { + throw new Error( + `not enough space left in storage (${storageRemaining} bytes remaining, current payload is ${payloadSize} bytes)`, + ); + } + + for (const [key, value] of entries) { + validateKvKey(key); + if (value.byteLength > KV_MAX_VALUE_SIZE) { + throw new Error( + `value is too large (max ${KV_MAX_VALUE_SIZE / 1024} KiB)`, + ); + } + } +} diff --git a/rivetkit-typescript/packages/sqlite-vfs-test/tests/sqlite-vfs.test.ts b/rivetkit-typescript/packages/sqlite-vfs-test/tests/sqlite-vfs.test.ts index 532ec2cbad..24c75affc6 100644 --- a/rivetkit-typescript/packages/sqlite-vfs-test/tests/sqlite-vfs.test.ts +++ b/rivetkit-typescript/packages/sqlite-vfs-test/tests/sqlite-vfs.test.ts @@ -2,6 +2,8 @@ import { describe, expect, it } from "vitest"; import { createSqliteVfs } from "../src/backend"; import type { KvVfsOptions } from "@rivetkit/sqlite-vfs"; +const CHUNK_SIZE = 4096; + function keyToString(key: Uint8Array): string { return Buffer.from(key).toString("hex"); } @@ -36,6 +38,51 @@ function createKvStore(): KvVfsOptions { }; } +function toBlobValue(value: unknown): Uint8Array { + if (value instanceof Uint8Array) { + return new Uint8Array(value); + } + if (value instanceof ArrayBuffer) { + return new Uint8Array(value); + } + if (typeof Buffer !== "undefined" && Buffer.isBuffer(value)) { + return new Uint8Array(value); + } + throw new Error(`Expected blob value, got ${typeof value}`); +} + +function createPattern(size: number, seed = 17): Uint8Array { + const out = new Uint8Array(size); + for (let i = 0; i < out.length; i++) { + out[i] = (seed + i * 31) % 251; + } + return out; +} + +function assertBytesEqual(actual: Uint8Array, expected: Uint8Array): void { + expect(actual.length).toBe(expected.length); + for (let i = 0; i < expected.length; i++) { + if (actual[i] !== expected[i]) { + throw new Error(`byte mismatch at offset ${i}: ${actual[i]} != ${expected[i]}`); + } + } +} + +function applyPatch(base: Uint8Array, offset: number, patch: Uint8Array): Uint8Array { + const next = new Uint8Array(base); + next.set(patch, offset); + return next; +} + +async function createBlobTable(db: { exec: (sql: string) => Promise }): Promise { + await db.exec(` + CREATE TABLE IF NOT EXISTS blob_data ( + id INTEGER PRIMARY KEY, + payload BLOB NOT NULL + ) + `); +} + describe("sqlite-vfs", () => { it("persists data across VFS instances", async () => { const kvStore = createKvStore(); @@ -64,4 +111,227 @@ describe("sqlite-vfs", () => { await dbReloaded.close(); }); + + it("handles chunk boundary payload sizes", async () => { + const kvStore = createKvStore(); + const vfs = await createSqliteVfs(); + const db = await vfs.open("actor-chunk-boundary", kvStore); + + try { + await createBlobTable(db); + const sizes = [ + CHUNK_SIZE - 1, + CHUNK_SIZE, + CHUNK_SIZE + 1, + 2 * CHUNK_SIZE - 1, + 2 * CHUNK_SIZE, + 2 * CHUNK_SIZE + 1, + 4 * CHUNK_SIZE - 1, + 4 * CHUNK_SIZE, + 4 * CHUNK_SIZE + 1, + ]; + + for (const [index, size] of sizes.entries()) { + const payload = createPattern(size, index + 7); + await db.run( + "INSERT INTO blob_data (id, payload) VALUES (?, ?)", + [index + 1, payload], + ); + + const result = await db.query( + "SELECT payload FROM blob_data WHERE id = ?", + [index + 1], + ); + expect(result.rows.length).toBe(1); + const readBack = toBlobValue(result.rows[0]?.[0]); + assertBytesEqual(readBack, payload); + } + } finally { + await db.close(); + } + }); + + it("handles unaligned overwrite across chunk boundaries", async () => { + const kvStore = createKvStore(); + const vfs = await createSqliteVfs(); + const db = await vfs.open("actor-unaligned-overwrite", kvStore); + + try { + await createBlobTable(db); + + const initial = createPattern(3 * CHUNK_SIZE + 211, 23); + await db.run("INSERT INTO blob_data (id, payload) VALUES (?, ?)", [1, initial]); + + const patchOffset = CHUNK_SIZE - 137; + const patch = createPattern(CHUNK_SIZE + 503, 91); + const expected = applyPatch(initial, patchOffset, patch); + await db.run("UPDATE blob_data SET payload = ? WHERE id = 1", [expected]); + + const result = await db.query("SELECT payload FROM blob_data WHERE id = 1"); + expect(result.rows.length).toBe(1); + const readBack = toBlobValue(result.rows[0]?.[0]); + assertBytesEqual(readBack, expected); + } finally { + await db.close(); + } + }); + + it("supports shrink and regrow workloads", async () => { + const kvStore = createKvStore(); + const vfs = await createSqliteVfs(); + const db = await vfs.open("actor-shrink-regrow", kvStore); + + try { + await db.exec("PRAGMA auto_vacuum = NONE"); + await createBlobTable(db); + await db.exec("DELETE FROM blob_data"); + await db.exec("VACUUM"); + + for (let i = 0; i < 40; i++) { + await db.run( + "INSERT INTO blob_data (id, payload) VALUES (?, ?)", + [i + 1, createPattern(8192, i + 11)], + ); + } + + const grown = await db.query("PRAGMA page_count"); + const grownPages = Number(grown.rows[0]?.[0] ?? 0); + expect(grownPages).toBeGreaterThan(0); + + await db.exec("DELETE FROM blob_data"); + await db.exec("VACUUM"); + const shrunk = await db.query("PRAGMA page_count"); + const shrunkPages = Number(shrunk.rows[0]?.[0] ?? 0); + expect(shrunkPages).toBeLessThanOrEqual(grownPages); + + for (let i = 0; i < 25; i++) { + await db.run( + "INSERT INTO blob_data (id, payload) VALUES (?, ?)", + [i + 100, createPattern(12288, i + 41)], + ); + } + const regrown = await db.query("PRAGMA page_count"); + const regrownPages = Number(regrown.rows[0]?.[0] ?? 0); + expect(regrownPages).toBeGreaterThan(shrunkPages); + } finally { + await db.close(); + } + }); + + it("reads sparse-like zeroblob regions as zeros", async () => { + const kvStore = createKvStore(); + const vfs = await createSqliteVfs(); + const db = await vfs.open("actor-sparse-like", kvStore); + + try { + await createBlobTable(db); + const totalSize = 3 * CHUNK_SIZE; + const patchOffset = 2 * CHUNK_SIZE + 97; + const patch = createPattern(321, 171); + + await db.run( + "INSERT INTO blob_data (id, payload) VALUES (1, zeroblob(?))", + [totalSize], + ); + const zeroBlobResult = await db.query("SELECT payload FROM blob_data WHERE id = 1"); + const baseBlob = toBlobValue(zeroBlobResult.rows[0]?.[0]); + const expected = applyPatch(baseBlob, patchOffset, patch); + await db.run("UPDATE blob_data SET payload = ? WHERE id = 1", [expected]); + + const result = await db.query("SELECT payload FROM blob_data WHERE id = 1"); + const blob = toBlobValue(result.rows[0]?.[0]); + expect(blob.length).toBe(totalSize); + + for (let i = 0; i < patchOffset; i++) { + if (blob[i] !== 0) { + throw new Error(`expected zero at offset ${i}, got ${blob[i]}`); + } + } + for (let i = 0; i < patch.length; i++) { + expect(blob[patchOffset + i]).toBe(patch[i]); + } + } finally { + await db.close(); + } + }); + + it("handles many small writes to hot and scattered rows", async () => { + const kvStore = createKvStore(); + const vfs = await createSqliteVfs(); + const db = await vfs.open("actor-many-small-writes", kvStore); + + try { + await db.exec( + "CREATE TABLE IF NOT EXISTS kv_like (id INTEGER PRIMARY KEY, value TEXT NOT NULL)", + ); + for (let i = 1; i <= 10; i++) { + await db.run("INSERT INTO kv_like (id, value) VALUES (?, ?)", [i, "init"]); + } + + for (let i = 0; i < 500; i++) { + const id = (i % 10) + 1; + await db.run("UPDATE kv_like SET value = ? WHERE id = ?", [ + `v-${i}`, + id, + ]); + } + + const results = await db.query("SELECT id, value FROM kv_like ORDER BY id"); + expect(results.rows.length).toBe(10); + for (let i = 0; i < results.rows.length; i++) { + const row = results.rows[i]; + expect(Number(row?.[0])).toBe(i + 1); + expect(String(row?.[1])).toMatch(/^v-\d+$/); + } + } finally { + await db.close(); + } + }); + + it("passes integrity checks after mixed workload and reopen", async () => { + const kvStore = createKvStore(); + const vfs = await createSqliteVfs(); + const db = await vfs.open("actor-integrity", kvStore); + + try { + await db.exec( + "CREATE TABLE IF NOT EXISTS integrity_data (id INTEGER PRIMARY KEY, value TEXT NOT NULL, payload BLOB NOT NULL)", + ); + for (let i = 0; i < 150; i++) { + await db.run( + "INSERT OR REPLACE INTO integrity_data (id, value, payload) VALUES (?, ?, ?)", + [i + 1, `seed-${i}`, createPattern(2048 + (i % 7) * 97, i + 5)], + ); + } + for (let i = 0; i < 200; i++) { + const id = (i % 150) + 1; + if (i % 9 === 0) { + await db.run("DELETE FROM integrity_data WHERE id = ?", [id]); + } else { + await db.run( + "INSERT OR REPLACE INTO integrity_data (id, value, payload) VALUES (?, ?, ?)", + [ + id, + `upd-${i}`, + createPattern(1024 + (i % 11) * 131, 100 + i), + ], + ); + } + } + + const integrityBefore = await db.query("PRAGMA integrity_check"); + expect(String(integrityBefore.rows[0]?.[0]).toLowerCase()).toBe("ok"); + } finally { + await db.close(); + } + + const vfsReloaded = await createSqliteVfs(); + const dbReloaded = await vfsReloaded.open("actor-integrity", kvStore); + try { + const integrityAfter = await dbReloaded.query("PRAGMA integrity_check"); + expect(String(integrityAfter.rows[0]?.[0]).toLowerCase()).toBe("ok"); + } finally { + await dbReloaded.close(); + } + }); }); diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts index 0bd5ddd640..a0dd457850 100644 --- a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts +++ b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts @@ -22,14 +22,11 @@ import { import { readFileSync } from "node:fs"; import { createRequire } from "node:module"; import { - CHUNK_PREFIX, CHUNK_SIZE, FILE_TAG_JOURNAL, FILE_TAG_MAIN, FILE_TAG_SHM, FILE_TAG_WAL, - META_PREFIX, - SQLITE_PREFIX, getChunkKey, getMetaKey, type SqliteFileTag, @@ -82,6 +79,24 @@ interface LoadedSqliteRuntime { module: SQLiteModule; } +function isSqliteEsmFactory(value: unknown): value is SqliteEsmFactory { + return typeof value === "function"; +} + +function isSQLiteModule(value: unknown): value is SQLiteModule { + if (!value || typeof value !== "object") { + return false; + } + const candidate = value as { + UTF8ToString?: unknown; + HEAPU8?: unknown; + }; + return ( + typeof candidate.UTF8ToString === "function" && + candidate.HEAPU8 instanceof Uint8Array + ); +} + /** * Lazily load and instantiate the async SQLite module for this VFS instance. @@ -93,14 +108,20 @@ async function loadSqliteRuntime(): Promise { // Keep the module specifier assembled at runtime so TypeScript declaration // generation does not try to typecheck this deep dist import path. const sqliteModule = await import("@rivetkit/sqlite/dist/" + "wa-sqlite-async.mjs"); - const sqliteEsmFactory = sqliteModule.default as SqliteEsmFactory; + if (!isSqliteEsmFactory(sqliteModule.default)) { + throw new Error("Invalid SQLite ESM factory export"); + } + const sqliteEsmFactory = sqliteModule.default; const require = createRequire(import.meta.url); const wasmPath = require.resolve("@rivetkit/sqlite/dist/wa-sqlite-async.wasm"); const wasmBinary = readFileSync(wasmPath); const module = await sqliteEsmFactory({ wasmBinary }); + if (!isSQLiteModule(module)) { + throw new Error("Invalid SQLite runtime module"); + } return { sqlite3: Factory(module), - module: module as SQLiteModule, + module, }; } @@ -112,10 +133,6 @@ interface OpenFile { path: string; /** File kind tag used by compact key layout */ fileTag: SqliteFileTag; - /** Whether this file uses legacy filename-based chunk keys */ - useLegacyChunkKeys: boolean; - /** Encoded filename bytes for legacy key layout */ - legacyFileNameBytes?: Uint8Array; /** Precomputed metadata key */ metaKey: Uint8Array; /** File size in bytes */ @@ -229,11 +246,11 @@ export class Database { * @param sql - SQL statement with ? placeholders * @param params - Parameter values to bind */ - async run(sql: string, params?: unknown[]): Promise { + async run(sql: string, params?: SqliteBindings): Promise { await this.#sqliteMutex.run(async () => { for await (const stmt of this.#sqlite3.statements(this.#handle, sql)) { - if (params && params.length > 0) { - this.#sqlite3.bind_collection(stmt, params as SqliteBindings); + if (params) { + this.#sqlite3.bind_collection(stmt, params); } while ((await this.#sqlite3.step(stmt)) === SQLITE_ROW) { // Consume rows for statements that return results. @@ -248,13 +265,13 @@ export class Database { * @param params - Parameter values to bind * @returns Object with rows (array of arrays) and columns (column names) */ - async query(sql: string, params?: unknown[]): Promise<{ rows: unknown[][]; columns: string[] }> { + async query(sql: string, params?: SqliteBindings): Promise<{ rows: unknown[][]; columns: string[] }> { return this.#sqliteMutex.run(async () => { const rows: unknown[][] = []; let columns: string[] = []; for await (const stmt of this.#sqlite3.statements(this.#handle, sql)) { - if (params && params.length > 0) { - this.#sqlite3.bind_collection(stmt, params as SqliteBindings); + if (params) { + this.#sqlite3.bind_collection(stmt, params); } while ((await this.#sqlite3.step(stmt)) === SQLITE_ROW) { @@ -377,27 +394,28 @@ export class SqliteVfs { if (!this.#sqlite3 || !this.#sqliteSystem) { throw new Error("Failed to initialize SQLite"); } + const sqlite3 = this.#sqlite3; + const sqliteSystem = this.#sqliteSystem; // Register this filename with its KV options - this.#sqliteSystem.registerFile(fileName, options); + sqliteSystem.registerFile(fileName, options); // Open database - const db = await this.#sqliteMutex.run(async () => - this.#sqlite3!.open_v2( - fileName, - SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, - this.#sqliteSystem!.name, - ), - ); + const db = await this.#sqliteMutex.run(async () => + sqlite3.open_v2( + fileName, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, + sqliteSystem.name, + ), + ); // Create cleanup callback - const sqliteSystem = this.#sqliteSystem; const onClose = () => { sqliteSystem.unregisterFile(fileName); }; return new Database( - this.#sqlite3, + sqlite3, db, fileName, onClose, @@ -555,6 +573,10 @@ class SqliteSystem implements SqliteVfsRegistration { ); } + #chunkKey(file: OpenFile, chunkIndex: number): Uint8Array { + return getChunkKey(file.fileTag, chunkIndex); + } + async xOpen( _pVfs: number, zName: number, @@ -570,23 +592,10 @@ class SqliteSystem implements SqliteVfsRegistration { // Get the registered KV options for this file // For journal/wal files, use the main database's options const { options, fileTag } = this.#resolveFileOrThrow(path); - let metaKey = getMetaKey(fileTag); - let useLegacyChunkKeys = false; - let legacyFileNameBytes: Uint8Array | undefined; + const metaKey = getMetaKey(fileTag); // Get existing file size if the file exists - let sizeData = await options.get(metaKey); - if (!sizeData) { - // Backward compatibility for filename-based keys used before file tags. - const legacyMetaKey = getLegacyMetaKey(path); - const legacySizeData = await options.get(legacyMetaKey); - if (legacySizeData) { - metaKey = legacyMetaKey; - useLegacyChunkKeys = true; - legacyFileNameBytes = TEXT_ENCODER.encode(path); - sizeData = legacySizeData; - } - } + const sizeData = await options.get(metaKey); let size: number; @@ -609,8 +618,6 @@ class SqliteSystem implements SqliteVfsRegistration { this.#openFiles.set(fileId, { path, fileTag, - useLegacyChunkKeys, - legacyFileNameBytes, metaKey, size, metaDirty: false, @@ -685,11 +692,7 @@ class SqliteSystem implements SqliteVfsRegistration { // Fetch all needed chunks const chunkKeys: Uint8Array[] = []; for (let i = startChunk; i <= endChunk; i++) { - chunkKeys.push( - file.useLegacyChunkKeys - ? getLegacyChunkKey(file.legacyFileNameBytes!, i) - : getChunkKey(file.fileTag, i), - ); + chunkKeys.push(this.#chunkKey(file, i)); } const chunks = await options.getBatch(chunkKeys); @@ -795,9 +798,7 @@ class SqliteSystem implements SqliteVfsRegistration { Math.min(CHUNK_SIZE, file.size - chunkOffset), ); const needsExisting = writeStart > 0 || existingBytesInChunk > writeEnd; - const chunkKey = file.useLegacyChunkKeys - ? getLegacyChunkKey(file.legacyFileNameBytes!, i) - : getChunkKey(file.fileTag, i); + const chunkKey = this.#chunkKey(file, i); let existingChunkIndex = -1; if (needsExisting) { existingChunkIndex = chunkKeysToFetch.length; @@ -897,11 +898,7 @@ class SqliteSystem implements SqliteVfsRegistration { // Delete chunks beyond the new size const keysToDelete: Uint8Array[] = []; for (let i = lastChunkToKeep + 1; i <= lastExistingChunk; i++) { - keysToDelete.push( - file.useLegacyChunkKeys - ? getLegacyChunkKey(file.legacyFileNameBytes!, i) - : getChunkKey(file.fileTag, i), - ); + keysToDelete.push(this.#chunkKey(file, i)); } if (keysToDelete.length > 0) { @@ -910,9 +907,7 @@ class SqliteSystem implements SqliteVfsRegistration { // Truncate the last kept chunk if needed if (size > 0 && size % CHUNK_SIZE !== 0) { - const lastChunkKey = file.useLegacyChunkKeys - ? getLegacyChunkKey(file.legacyFileNameBytes!, lastChunkToKeep) - : getChunkKey(file.fileTag, lastChunkToKeep); + const lastChunkKey = this.#chunkKey(file, lastChunkToKeep); const lastChunkData = await options.get(lastChunkKey); if (lastChunkData && lastChunkData.length > size % CHUNK_SIZE) { @@ -962,22 +957,10 @@ class SqliteSystem implements SqliteVfsRegistration { */ async #delete(path: string): Promise { const { options, fileTag } = this.#resolveFileOrThrow(path); - let metaKey = getMetaKey(fileTag); - let useLegacyChunkKeys = false; - let legacyFileNameBytes: Uint8Array | undefined; + const metaKey = getMetaKey(fileTag); // Get file size to find out how many chunks to delete - let sizeData = await options.get(metaKey); - if (!sizeData) { - const legacyMetaKey = getLegacyMetaKey(path); - const legacySizeData = await options.get(legacyMetaKey); - if (legacySizeData) { - metaKey = legacyMetaKey; - useLegacyChunkKeys = true; - legacyFileNameBytes = TEXT_ENCODER.encode(path); - sizeData = legacySizeData; - } - } + const sizeData = await options.get(metaKey); if (!sizeData) { // File doesn't exist, that's OK @@ -990,11 +973,7 @@ class SqliteSystem implements SqliteVfsRegistration { const keysToDelete: Uint8Array[] = [metaKey]; const numChunks = Math.ceil(size / CHUNK_SIZE); for (let i = 0; i < numChunks; i++) { - keysToDelete.push( - useLegacyChunkKeys - ? getLegacyChunkKey(legacyFileNameBytes!, i) - : getChunkKey(fileTag, i), - ); + keysToDelete.push(getChunkKey(fileTag, i)); } await options.deleteBatch(keysToDelete); @@ -1015,9 +994,7 @@ class SqliteSystem implements SqliteVfsRegistration { } const compactMetaKey = getMetaKey(resolved.fileTag); - const compactMetaData = await resolved.options.get(compactMetaKey); - const metaData = compactMetaData ?? - await resolved.options.get(getLegacyMetaKey(path)); + const metaData = await resolved.options.get(compactMetaKey); // Set result: 1 if file exists, 0 otherwise this.#writeInt32(pResOut, metaData ? 1 : 0); @@ -1136,26 +1113,3 @@ function delegalize(lo32: number, hi32: number): number { } return (hi * UINT32_SIZE) + lo; } - -function getLegacyMetaKey(fileName: string): Uint8Array { - const fileNameBytes = TEXT_ENCODER.encode(fileName); - const key = new Uint8Array(2 + fileNameBytes.length); - key[0] = SQLITE_PREFIX; - key[1] = META_PREFIX; - key.set(fileNameBytes, 2); - return key; -} - -function getLegacyChunkKey(fileNameBytes: Uint8Array, chunkIndex: number): Uint8Array { - const key = new Uint8Array(2 + fileNameBytes.length + 1 + 4); - key[0] = SQLITE_PREFIX; - key[1] = CHUNK_PREFIX; - key.set(fileNameBytes, 2); - const offset = 2 + fileNameBytes.length; - key[offset] = 0; - key[offset + 1] = (chunkIndex >>> 24) & 0xff; - key[offset + 2] = (chunkIndex >>> 16) & 0xff; - key[offset + 3] = (chunkIndex >>> 8) & 0xff; - key[offset + 4] = chunkIndex & 0xff; - return key; -}