diff --git a/CLAUDE.md b/CLAUDE.md index 9b69466c3b..5c36f38b02 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -80,6 +80,10 @@ gt m ### pnpm Workspace - Use pnpm for all npm-related commands. We're using a pnpm workspace. +### SQLite Package +- Use `@rivetkit/sqlite` for SQLite WebAssembly support. +- Do not use the legacy upstream package directly. `@rivetkit/sqlite` is the maintained fork used in this repository and is sourced from `rivet-dev/wa-sqlite`. + ### RivetKit Package Resolutions The root `/package.json` contains `resolutions` that map RivetKit packages to their local workspace versions: diff --git a/examples/cloudflare-workers/README.md b/examples/cloudflare-workers/README.md index 67e75f4b94..50184ff1ef 100644 --- a/examples/cloudflare-workers/README.md +++ b/examples/cloudflare-workers/README.md @@ -16,14 +16,15 @@ npm run dev - **Cloudflare Workers integration**: Deploy Rivet Actors to Cloudflare's edge network using Durable Objects - **Edge-native execution**: Actors run at the edge for low-latency global access +- **Native Durable Object SQLite**: Actor state is persisted through Cloudflare's built-in Durable Object SQLite storage - **Built-in HTTP API**: Actors automatically exposed via HTTP endpoints - **Wrangler CLI integration**: Standard Cloudflare tooling for development and deployment ## Implementation -This example demonstrates deploying Rivet Actors to Cloudflare Workers: +This example demonstrates deploying Rivet Actors to Cloudflare Workers with native Durable Object SQLite: -- **Actor Definition** ([`src/backend/registry.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/cloudflare-workers/src/backend/registry.ts)): Shows how to set up actors for Cloudflare Workers deployment using Durable Objects +- **Actor Definition** ([`src/actors.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/cloudflare-workers/src/actors.ts)): Shows how to set up a SQLite-backed actor for Cloudflare Workers using Durable Objects ## Resources diff --git a/examples/cloudflare-workers/scripts/client.ts b/examples/cloudflare-workers/scripts/client.ts index 1a20e9c778..660d1af99c 100644 --- a/examples/cloudflare-workers/scripts/client.ts +++ b/examples/cloudflare-workers/scripts/client.ts @@ -1,35 +1,47 @@ import { createClient } from "rivetkit/client"; -import type { registry } from "../src/registry"; +import type { registry } from "../src/actors"; // Create RivetKit client -const client = createClient( - process.env.RIVET_ENDPOINT ?? "http://localhost:8787/rivet", -); +const client = createClient({ + endpoint: process.env.RIVET_ENDPOINT ?? "http://localhost:8787/api/rivet", + disableMetadataLookup: true, +}); async function main() { - console.log("🚀 Cloudflare Workers Client Demo"); + console.log("🚀 Cloudflare Workers SQLite E2E Demo"); try { - // Create counter instance - const counter = client.counter.getOrCreate("demo").connect(); - - // Increment counter - console.log("Incrementing counter 'demo'..."); - const result1 = await counter.increment(1); - console.log("New count:", result1); - - // Increment again with larger value - console.log("Incrementing counter 'demo' by 5..."); - const result2 = await counter.increment(5); - console.log("New count:", result2); - - // Create another counter - const counter2 = client.counter.getOrCreate("another"); - console.log("Incrementing counter 'another' by 10..."); - const result3 = await counter2.increment(10); - console.log("New count:", result3); - - console.log("✅ Demo completed!"); + const counterKey = "sqlite-demo"; + const counter = client.sqliteCounter.getOrCreate(counterKey); + + const initialCount = await counter.getCount(); + console.log("Initial count:", initialCount); + + const afterOne = await counter.increment(1); + console.log("After +1:", afterOne); + + const afterFive = await counter.increment(5); + console.log("After +5:", afterFive); + + const expected = initialCount + 6; + if (afterFive !== expected) { + throw new Error( + `Unexpected count after increments. Expected ${expected}, got ${afterFive}.`, + ); + } + + // Ensure the value persisted by re-resolving the actor handle. + const counterAgain = client.sqliteCounter.getOrCreate(counterKey); + const persistedCount = await counterAgain.getCount(); + console.log("Persisted count:", persistedCount); + + if (persistedCount !== expected) { + throw new Error( + `Persistence check failed. Expected ${expected}, got ${persistedCount}.`, + ); + } + + console.log("✅ SQLite E2E check passed"); } catch (error) { console.error("❌ Error:", error); process.exit(1); diff --git a/examples/cloudflare-workers/src/actors.ts b/examples/cloudflare-workers/src/actors.ts new file mode 100644 index 0000000000..753e9fe32b --- /dev/null +++ b/examples/cloudflare-workers/src/actors.ts @@ -0,0 +1,75 @@ +import { actor, setup, event } from "rivetkit"; +import { db } from "rivetkit/db"; + +const COUNTER_ROW_ID = 1; + +export const counter = actor({ + state: { count: 0 }, + events: { + newCount: event(), + }, + actions: { + increment: (c, x: number) => { + if (!Number.isFinite(x)) { + throw new Error("increment value must be a finite number"); + } + + const delta = Math.trunc(x); + c.state.count += delta; + c.broadcast("newCount", c.state.count); + return c.state.count; + }, + getCount: (c) => c.state.count, + }, +}); + +export const sqliteCounter = actor({ + db: db({ + onMigrate: async (database) => { + await database.execute(` + CREATE TABLE IF NOT EXISTS counter_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + count INTEGER NOT NULL + ) + `); + await database.execute( + "INSERT OR IGNORE INTO counter_state (id, count) VALUES (1, 0)", + ); + }, + }), + events: { + newCount: event(), + }, + actions: { + increment: async (c, x: number) => { + if (!Number.isFinite(x)) { + throw new Error("increment value must be a finite number"); + } + + const delta = Math.trunc(x); + await c.db.execute( + "UPDATE counter_state SET count = count + ? WHERE id = ?", + delta, + COUNTER_ROW_ID, + ); + const rows = await c.db.execute<{ count: number }>( + "SELECT count FROM counter_state WHERE id = ?", + COUNTER_ROW_ID, + ); + const count = Number(rows[0]?.count ?? 0); + c.broadcast("newCount", count); + return count; + }, + getCount: async (c) => { + const rows = await c.db.execute<{ count: number }>( + "SELECT count FROM counter_state WHERE id = ?", + COUNTER_ROW_ID, + ); + return Number(rows[0]?.count ?? 0); + }, + }, +}); + +export const registry = setup({ + use: { counter, sqliteCounter }, +}); diff --git a/examples/cloudflare-workers/src/index.ts b/examples/cloudflare-workers/src/index.ts index 48c0cc626d..bd2759ac2a 100644 --- a/examples/cloudflare-workers/src/index.ts +++ b/examples/cloudflare-workers/src/index.ts @@ -1,5 +1,5 @@ import { createHandler } from "@rivetkit/cloudflare-workers"; -import { registry } from "./registry"; +import { registry } from "./actors"; const { handler, ActorHandler } = createHandler(registry); export { handler as default, ActorHandler }; diff --git a/examples/cloudflare-workers/src/registry.ts b/examples/cloudflare-workers/src/registry.ts deleted file mode 100644 index 5939fc3512..0000000000 --- a/examples/cloudflare-workers/src/registry.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { actor, setup, event } from "rivetkit"; - -export const counter = actor({ - state: { count: 0 }, - events: { - newCount: event(), - }, - actions: { - increment: (c, x: number) => { - c.state.count += x; - c.broadcast("newCount", c.state.count); - return c.state.count; - }, - }, -}); - -export const registry = setup({ - use: { counter }, -}); diff --git a/examples/next-js/README.md b/examples/next-js/README.md index 4c9798817d..a6065c8567 100644 --- a/examples/next-js/README.md +++ b/examples/next-js/README.md @@ -23,7 +23,7 @@ npm run dev This example demonstrates minimal Next.js integration with Rivet Actors: -- **Actor Definition** ([`src/backend/registry.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/next-js/src/backend/registry.ts)): Simple counter actor integrated with Next.js App Router +- **Actor Definition** ([`src/rivet/actors.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/next-js/src/rivet/actors.ts)): Simple counter actor integrated with Next.js App Router ## Resources diff --git a/examples/next-js/scripts/connect.ts b/examples/next-js/scripts/connect.ts index 9aff34bdeb..e5b21d93ef 100644 --- a/examples/next-js/scripts/connect.ts +++ b/examples/next-js/scripts/connect.ts @@ -1,21 +1,47 @@ import { createClient } from "rivetkit/client"; -import type { registry } from "../src/rivet/registry"; +import type { registry } from "../src/rivet/actors"; async function main() { - const client = createClient( - "http://localhost:3000/api/rivet", - ); + const endpoint = process.env.RIVET_ENDPOINT ?? "http://localhost:3000/api/rivet"; + const client = createClient({ + endpoint, + }); + console.log("Using endpoint:", endpoint); - const counter = client.counter.getOrCreate().connect(); + const counterKey = ["sqlite-e2e"]; + const counter = client.sqliteCounter.getOrCreate(counterKey); - counter.on("newCount", (count: number) => console.log("Event:", count)); + const initialCount = await counter.getCount(); + console.log("Initial count:", initialCount); - while (true) { - const out = await counter.increment(1); - console.log("RPC:", out); + const afterTwo = await counter.increment(2); + console.log("After +2:", afterTwo); - await new Promise((resolve) => setTimeout(resolve, 1000)); + const afterFive = await counter.increment(3); + console.log("After +3:", afterFive); + + const expected = initialCount + 5; + if (afterFive !== expected) { + throw new Error( + `Unexpected count after increments. Expected ${expected}, got ${afterFive}.`, + ); } + + // Ensure the value persisted by re-resolving the actor handle. + const counterAgain = client.sqliteCounter.getOrCreate(counterKey); + const persistedCount = await counterAgain.getCount(); + console.log("Persisted count:", persistedCount); + + if (persistedCount !== expected) { + throw new Error( + `Persistence check failed. Expected ${expected}, got ${persistedCount}.`, + ); + } + + console.log("✅ SQLite E2E check passed"); } -main(); +main().catch((error) => { + console.error("❌ Error:", error); + process.exit(1); +}); diff --git a/examples/next-js/src/app/api/rivet/[...all]/route.ts b/examples/next-js/src/app/api/rivet/[...all]/route.ts index a9ebd3e0a7..6fb6fd5375 100644 --- a/examples/next-js/src/app/api/rivet/[...all]/route.ts +++ b/examples/next-js/src/app/api/rivet/[...all]/route.ts @@ -1,5 +1,5 @@ import { toNextHandler } from "@rivetkit/next-js"; -import { registry } from "@/rivet/registry"; +import { registry } from "@/rivet/actors"; export const maxDuration = 300; diff --git a/examples/next-js/src/components/Counter.tsx b/examples/next-js/src/components/Counter.tsx index 01032d50cf..52cec3a8e5 100644 --- a/examples/next-js/src/components/Counter.tsx +++ b/examples/next-js/src/components/Counter.tsx @@ -2,7 +2,7 @@ import { createRivetKit } from "@rivetkit/next-js/client"; import { useEffect, useState } from "react"; -import type { registry } from "../rivet/registry"; +import type { registry } from "../rivet/actors"; export const { useActor } = createRivetKit({ endpoint: process.env.NEXT_PUBLIC_RIVET_ENDPOINT ?? "http://localhost:3000/api/rivet", @@ -24,7 +24,9 @@ export function Counter() { useEffect(() => { if (counter.connection && isConnected) { - counter.connection.getCount().then(setCount); + counter.connection + .getCount() + .then((value) => setCount(value)); } }, [counter.connection, isConnected]); @@ -34,7 +36,8 @@ export function Counter() { const increment = async (amount: number) => { if (counter.connection) { - await counter.connection.increment(amount); + const nextCount = await counter.connection.increment(amount); + setCount(nextCount); } }; diff --git a/examples/next-js/src/rivet/actors.ts b/examples/next-js/src/rivet/actors.ts new file mode 100644 index 0000000000..8bf72501a5 --- /dev/null +++ b/examples/next-js/src/rivet/actors.ts @@ -0,0 +1,77 @@ +import { actor, setup, event } from "rivetkit"; +import { db } from "rivetkit/db"; + +const COUNTER_ROW_ID = 1; + +export const counter = actor({ + state: { + count: 0, + }, + events: { + newCount: event(), + }, + actions: { + increment: (c, x: number) => { + if (!Number.isFinite(x)) { + throw new Error("increment value must be a finite number"); + } + + const delta = Math.trunc(x); + c.state.count += delta; + c.broadcast("newCount", c.state.count); + return c.state.count; + }, + getCount: (c) => c.state.count, + }, +}); + +export const sqliteCounter = actor({ + db: db({ + onMigrate: async (database) => { + await database.execute(` + CREATE TABLE IF NOT EXISTS counter_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + count INTEGER NOT NULL + ) + `); + await database.execute( + "INSERT OR IGNORE INTO counter_state (id, count) VALUES (1, 0)", + ); + }, + }), + events: { + newCount: event(), + }, + actions: { + increment: async (c, x: number) => { + if (!Number.isFinite(x)) { + throw new Error("increment value must be a finite number"); + } + + const delta = Math.trunc(x); + await c.db.execute( + "UPDATE counter_state SET count = count + ? WHERE id = ?", + delta, + COUNTER_ROW_ID, + ); + const rows = await c.db.execute<{ count: number }>( + "SELECT count FROM counter_state WHERE id = ?", + COUNTER_ROW_ID, + ); + const count = Number(rows[0]?.count ?? 0); + c.broadcast("newCount", count); + return count; + }, + getCount: async (c) => { + const rows = await c.db.execute<{ count: number }>( + "SELECT count FROM counter_state WHERE id = ?", + COUNTER_ROW_ID, + ); + return Number(rows[0]?.count ?? 0); + }, + }, +}); + +export const registry = setup({ + use: { counter, sqliteCounter }, +}); diff --git a/examples/next-js/src/rivet/registry.ts b/examples/next-js/src/rivet/registry.ts deleted file mode 100644 index e1a8269066..0000000000 --- a/examples/next-js/src/rivet/registry.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { actor, setup, event } from "rivetkit"; - -const counter = actor({ - state: { - count: 0, - }, - events: { - newCount: event(), - }, - actions: { - increment: (c, x: number) => { - c.state.count += x; - c.broadcast("newCount", c.state.count); - return c.state.count; - }, - getCount: (c) => { - return c.state.count; - }, - }, -}); - -export const registry = setup({ - use: { counter }, -}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5dee523024..5a46b1c49f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4413,6 +4413,9 @@ importers: '@rivetkit/on-change': specifier: ^6.0.2-rc.1 version: 6.0.2-rc.1 + '@rivetkit/sqlite': + specifier: ^0.1.1 + version: 0.1.1 '@rivetkit/sqlite-vfs': specifier: workspace:* version: link:../sqlite-vfs @@ -4461,9 +4464,6 @@ importers: vbare: specifier: ^0.0.4 version: 0.0.4 - wa-sqlite: - specifier: ^1.0.0 - version: 1.0.0 zod: specifier: ^4.1.0 version: 4.1.13 @@ -4543,12 +4543,12 @@ importers: '@rivetkit/bare-ts': specifier: ^0.6.2 version: 0.6.2 + '@rivetkit/sqlite': + specifier: ^0.1.1 + version: 0.1.1 vbare: specifier: ^0.0.4 version: 0.0.4 - wa-sqlite: - specifier: ^1.0.0 - version: 1.0.0 devDependencies: '@bare-ts/tools': specifier: ^0.13.0 @@ -8843,6 +8843,9 @@ packages: resolution: {integrity: sha512-5RC9Ze/wTKqSlJvopdCgr+EfyV93+iiH8Thog0QXrl8PT1unuBNw/jadXNMtwgAxrIaCJL+JLaHQH9w7rqpMDw==} engines: {node: '>=20'} + '@rivetkit/sqlite@0.1.1': + resolution: {integrity: sha512-NE7ZBy/hQhOrWzMZFjkHX9SoXxf+ILcDvVV+mNbUYPgiy/fsDzlXdK0+JDTGnko5f4Xl6/KVCoCozz9gkwkq8A==} + '@rolldown/pluginutils@1.0.0-beta.27': resolution: {integrity: sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA==} @@ -16719,9 +16722,6 @@ packages: w3c-keyname@2.2.8: resolution: {integrity: sha512-dpojBhNsCNN7T82Tm7k26A6G9ML3NkhDsnw9n/eoxSRlVBB4CEtIQ/KTCLI2Fwf3ataSXRhYFkQi3SlnFwPvPQ==} - wa-sqlite@1.0.0: - resolution: {integrity: sha512-Kyybo5/BaJp76z7gDWGk2J6Hthl4NIPsE+swgraEjy3IY6r5zIR02wAs1OJH4XtJp1y3puj3Onp5eMGS0z7nUA==} - walker@1.0.8: resolution: {integrity: sha512-ts/8E8l5b7kY0vlWLewOkDXMmPdLcVV4GmOQLyxuSswIJsweeFZtAsMF7k1Nszz+TYBQrlYRmzOnr398y1JemQ==} @@ -21404,6 +21404,8 @@ snapshots: '@rivetkit/on-change@6.0.2-rc.1': {} + '@rivetkit/sqlite@0.1.1': {} + '@rolldown/pluginutils@1.0.0-beta.27': {} '@rollup/pluginutils@5.3.0(rollup@4.57.1)': @@ -31263,8 +31265,6 @@ snapshots: w3c-keyname@2.2.8: {} - wa-sqlite@1.0.0: {} - walker@1.0.8: dependencies: makeerror: 1.0.12 diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index 556d7c15f0..9df7cf3578 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -239,7 +239,7 @@ "tar": "^7.5.0", "uuid": "^12.0.0", "vbare": "^0.0.4", - "@rivetkit/sqlite": "^0.1.0", + "@rivetkit/sqlite": "^0.1.1", "zod": "^4.1.0" }, "devDependencies": { diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts index fc9cb5154f..08efeb7af5 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts @@ -1,11 +1,12 @@ import { getRequireFn } from "@/utils/node"; type SqliteRuntimeKind = "bun" | "node" | "better-sqlite3"; +type SqliteDatabaseCtor = new (path: string) => SqliteRawDatabase; interface SqliteStatement { run(...params: unknown[]): unknown; - get(...params: unknown[]): unknown; - all(...params: unknown[]): unknown[]; + get>(...params: unknown[]): T | undefined; + all>(...params: unknown[]): T[]; } interface SqliteRawDatabase { @@ -64,14 +65,14 @@ function createPreparedDatabaseAdapter( params?: readonly unknown[], ) => { const stmt = prepare(sql); - return stmt.get(...normalizeParams(params)) as T | undefined; + return stmt.get(...normalizeParams(params)); }, all: >( sql: string, params?: readonly unknown[], ) => { const stmt = prepare(sql); - return stmt.all(...normalizeParams(params)) as T[]; + return stmt.all(...normalizeParams(params)); }, close: () => { rawDb.close(); @@ -99,20 +100,17 @@ export function loadSqliteRuntime(): SqliteRuntime { try { const bunSqlite = requireFn(/* webpackIgnore: true */ "bun:sqlite") as { - Database: new (path: string) => SqliteRawDatabase; + Database?: SqliteDatabaseCtor; }; - if (typeof bunSqlite.Database === "function") { + const BunDatabase = bunSqlite.Database; + if (BunDatabase) { return { kind: "bun", open: (path) => { - const rawDb = new bunSqlite.Database(path); + const rawDb = new BunDatabase(path); configureSqliteRuntimeDatabase(rawDb, path); - const query = (sql: string) => { - if (!rawDb.query) { - throw new Error("bun:sqlite database missing query method"); - } - return rawDb.query(sql); - }; + const query = rawDb.query?.bind(rawDb); + if (!query) throw new Error("bun:sqlite database missing query method"); return createPreparedDatabaseAdapter(rawDb, query); }, }; @@ -123,22 +121,19 @@ export function loadSqliteRuntime(): SqliteRuntime { try { const nodeSqlite = requireFn(/* webpackIgnore: true */ "node:sqlite") as { - DatabaseSync: new (path: string) => SqliteRawDatabase; + DatabaseSync?: SqliteDatabaseCtor; }; - if (typeof nodeSqlite.DatabaseSync === "function") { + const NodeDatabaseSync = nodeSqlite.DatabaseSync; + if (NodeDatabaseSync) { return { kind: "node", open: (path) => { - const rawDb = new nodeSqlite.DatabaseSync(path); + const rawDb = new NodeDatabaseSync(path); configureSqliteRuntimeDatabase(rawDb, path); - const prepare = (sql: string) => { - if (!rawDb.prepare) { - throw new Error( - "node:sqlite DatabaseSync missing prepare method", - ); - } - return rawDb.prepare(sql); - }; + const prepare = rawDb.prepare?.bind(rawDb); + if (!prepare) { + throw new Error("node:sqlite DatabaseSync missing prepare method"); + } return createPreparedDatabaseAdapter(rawDb, prepare); }, }; @@ -150,28 +145,21 @@ export function loadSqliteRuntime(): SqliteRuntime { try { const betterSqlite3Module = requireFn( /* webpackIgnore: true */ "better-sqlite3", - ) as { - default?: new (path: string) => SqliteRawDatabase; - } | (new (path: string) => SqliteRawDatabase); + ) as SqliteDatabaseCtor | { default?: SqliteDatabaseCtor }; const BetterSqlite3 = typeof betterSqlite3Module === "function" ? betterSqlite3Module : betterSqlite3Module.default; - - if (typeof BetterSqlite3 === "function") { + if (BetterSqlite3) { return { kind: "better-sqlite3", open: (path) => { const rawDb = new BetterSqlite3(path); configureSqliteRuntimeDatabase(rawDb, path); - const prepare = (sql: string) => { - if (!rawDb.prepare) { - throw new Error( - "better-sqlite3 database missing prepare method", - ); - } - return rawDb.prepare(sql); - }; + const prepare = rawDb.prepare?.bind(rawDb); + if (!prepare) { + throw new Error("better-sqlite3 database missing prepare method"); + } return createPreparedDatabaseAdapter(rawDb, prepare); }, }; @@ -216,8 +204,7 @@ export function ensureUint8Array( return new Uint8Array(value); } if (ArrayBuffer.isView(value)) { - const view = value as ArrayBufferView; - return new Uint8Array(view.buffer, view.byteOffset, view.byteLength); + return new Uint8Array(value.buffer, value.byteOffset, value.byteLength); } throw new Error(`SQLite row field "${fieldName}" is not binary data`); } diff --git a/rivetkit-typescript/packages/sqlite-vfs/package.json b/rivetkit-typescript/packages/sqlite-vfs/package.json index 5fb868d7d7..c6c8769670 100644 --- a/rivetkit-typescript/packages/sqlite-vfs/package.json +++ b/rivetkit-typescript/packages/sqlite-vfs/package.json @@ -33,7 +33,7 @@ }, "dependencies": { "@rivetkit/bare-ts": "^0.6.2", - "@rivetkit/sqlite": "^0.1.0", + "@rivetkit/sqlite": "^0.1.1", "vbare": "^0.0.4" }, "devDependencies": { diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/kv.ts b/rivetkit-typescript/packages/sqlite-vfs/src/kv.ts index 3ff27a7eff..42940719c7 100644 --- a/rivetkit-typescript/packages/sqlite-vfs/src/kv.ts +++ b/rivetkit-typescript/packages/sqlite-vfs/src/kv.ts @@ -17,34 +17,63 @@ export const META_PREFIX = 0; /** Key prefix byte for file chunks (after SQLITE_PREFIX) */ export const CHUNK_PREFIX = 1; +/** File kind tag for the actor's main database file */ +export const FILE_TAG_MAIN = 0; + +/** File kind tag for the actor's rollback journal sidecar */ +export const FILE_TAG_JOURNAL = 1; + +/** File kind tag for the actor's WAL sidecar */ +export const FILE_TAG_WAL = 2; + +/** File kind tag for the actor's SHM sidecar */ +export const FILE_TAG_SHM = 3; + +export type SqliteFileTag = + | typeof FILE_TAG_MAIN + | typeof FILE_TAG_JOURNAL + | typeof FILE_TAG_WAL + | typeof FILE_TAG_SHM; + /** * Gets the key for file metadata - * Format: [SQLITE_PREFIX (1 byte), META_PREFIX (1 byte), filename (UTF-8 encoded)] + * Format: [SQLITE_PREFIX (1 byte), META_PREFIX (1 byte), file tag (1 byte)] */ -export function getMetaKey(fileName: string): Uint8Array { - const encoder = new TextEncoder(); - const fileNameBytes = encoder.encode(fileName); - const key = new Uint8Array(2 + fileNameBytes.length); +export function getMetaKey(fileTag: SqliteFileTag): Uint8Array { + const key = new Uint8Array(3); key[0] = SQLITE_PREFIX; key[1] = META_PREFIX; - key.set(fileNameBytes, 2); + key[2] = fileTag; return key; } /** * Gets the key for a file chunk - * Format: [SQLITE_PREFIX (1 byte), CHUNK_PREFIX (1 byte), filename (UTF-8), null separator (1 byte), chunk index (4 bytes, big-endian)] + * Format: [SQLITE_PREFIX (1 byte), CHUNK_PREFIX (1 byte), file tag (1 byte), chunk index (4 bytes, big-endian)] */ -export function getChunkKey(fileName: string, chunkIndex: number): Uint8Array { - const encoder = new TextEncoder(); - const fileNameBytes = encoder.encode(fileName); - const key = new Uint8Array(2 + fileNameBytes.length + 1 + 4); - key[0] = SQLITE_PREFIX; - key[1] = CHUNK_PREFIX; - key.set(fileNameBytes, 2); - key[2 + fileNameBytes.length] = 0; // null separator - // Encode chunk index as 32-bit unsigned integer (big-endian for proper ordering) - const view = new DataView(key.buffer); - view.setUint32(2 + fileNameBytes.length + 1, chunkIndex, false); - return key; +export function createChunkKeyFactory( + fileTag: SqliteFileTag, +): (chunkIndex: number) => Uint8Array { + const prefix = new Uint8Array(3); + prefix[0] = SQLITE_PREFIX; + prefix[1] = CHUNK_PREFIX; + prefix[2] = fileTag; + + return (chunkIndex: number): Uint8Array => { + const key = new Uint8Array(prefix.length + 4); + key.set(prefix, 0); + const offset = prefix.length; + key[offset + 0] = (chunkIndex >>> 24) & 0xff; + key[offset + 1] = (chunkIndex >>> 16) & 0xff; + key[offset + 2] = (chunkIndex >>> 8) & 0xff; + key[offset + 3] = chunkIndex & 0xff; + return key; + }; +} + +export function getChunkKey( + fileTag: SqliteFileTag, + chunkIndex: number, +): Uint8Array { + return createChunkKeyFactory(fileTag)(chunkIndex); } diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts index d667421b1e..06d80b3305 100644 --- a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts +++ b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts @@ -4,17 +4,36 @@ * This module provides a SQLite API that uses a KV-backed VFS * for storage. Each SqliteVfs instance is independent and can be * used concurrently with other instances. + * + * Keep this VFS on direct VFS.Base callbacks for minimal wrapper overhead. + * Use @rivetkit/sqlite/src/FacadeVFS.js as the reference implementation for + * callback ABI and pointer/data conversion behavior. + * This implementation is optimized for single-writer semantics because each + * actor owns one SQLite database. */ -// Note: @rivetkit/sqlite VFS.Base type definitions have incorrect types for xRead/xWrite -// The actual runtime uses Uint8Array, not the {size, value} object shown in types import * as VFS from "@rivetkit/sqlite/src/VFS.js"; - -import SQLiteESMFactory from "@rivetkit/sqlite/dist/wa-sqlite-async.mjs"; -import { Factory } from "@rivetkit/sqlite"; +import { + Factory, + SQLITE_OPEN_CREATE, + SQLITE_OPEN_READWRITE, + SQLITE_ROW, +} from "@rivetkit/sqlite"; import { readFileSync } from "node:fs"; import { createRequire } from "node:module"; -import { CHUNK_SIZE, getMetaKey, getChunkKey } from "./kv"; +import { + CHUNK_PREFIX, + CHUNK_SIZE, + FILE_TAG_JOURNAL, + FILE_TAG_MAIN, + FILE_TAG_SHM, + FILE_TAG_WAL, + META_PREFIX, + SQLITE_PREFIX, + getMetaKey, + createChunkKeyFactory, + type SqliteFileTag, +} from "./kv"; import { FILE_META_VERSIONED, CURRENT_VERSION, @@ -22,20 +41,83 @@ import { import type { FileMeta } from "../schemas/file-meta/mod"; import type { KvVfsOptions } from "./types"; +type SqliteEsmFactory = (config?: { wasmBinary?: ArrayBuffer | Uint8Array }) => Promise; +type SQLite3Api = ReturnType; +type SqliteBindings = Parameters[1]; + +interface SQLiteModule { + UTF8ToString: (ptr: number) => string; + HEAPU8: Uint8Array; +} + +const TEXT_ENCODER = new TextEncoder(); +const TEXT_DECODER = new TextDecoder(); +// libvfs captures this async/sync mask at registration time. Any VFS callback +// that returns a Promise must be listed here so SQLite uses async relays. +const SQLITE_ASYNC_METHODS = new Set([ + "xOpen", + "xClose", + "xRead", + "xWrite", + "xTruncate", + "xSync", + "xFileSize", + "xDelete", + "xAccess", +]); + +interface LoadedSqliteRuntime { + sqlite3: SQLite3Api; + module: SQLiteModule; +} + + +/** + * Lazily load and instantiate the async SQLite module for this VFS instance. + * We do this on first open so actors that do not use SQLite do not pay module + * parse and wasm initialization cost at startup, and we pass wasmBinary + * explicitly so this works consistently in both ESM and CJS bundles. + */ +async function loadSqliteRuntime(): Promise { + // Keep the module specifier assembled at runtime so TypeScript declaration + // generation does not try to typecheck this deep dist import path. + const sqliteModule = await import("@rivetkit/sqlite/dist/" + "wa-sqlite-async.mjs"); + const sqliteEsmFactory = sqliteModule.default as SqliteEsmFactory; + const require = createRequire(import.meta.url); + const wasmPath = require.resolve("@rivetkit/sqlite/dist/wa-sqlite-async.wasm"); + const wasmBinary = readFileSync(wasmPath); + const module = await sqliteEsmFactory({ wasmBinary }); + return { + sqlite3: Factory(module), + module: module as SQLiteModule, + }; +} + /** * Represents an open file */ interface OpenFile { /** File path */ path: string; + /** Precomputed metadata key */ + metaKey: Uint8Array; + /** Fast key builder that reuses encoded filename prefix */ + chunkKeyForIndex: (chunkIndex: number) => Uint8Array; /** File size in bytes */ size: number; + /** True when in-memory size has not been persisted yet */ + metaDirty: boolean; /** Open flags */ flags: number; /** KV options for this file */ options: KvVfsOptions; } +interface ResolvedFile { + options: KvVfsOptions; + fileTag: SqliteFileTag; +} + /** * Encodes file metadata to a Uint8Array using BARE schema */ @@ -55,37 +137,6 @@ function decodeFileMeta(data: Uint8Array): number { return Number(meta.size); } -/** - * SQLite API interface (subset needed for VFS registration) - * This is part of @rivetkit/sqlite but not exported in TypeScript types - */ -interface SQLite3Api { - vfs_register: (vfs: unknown, makeDefault?: boolean) => number; - open_v2: ( - filename: string, - flags: number, - vfsName?: string, - ) => Promise; - close: (db: number) => Promise; - exec: ( - db: number, - sql: string, - callback?: (row: unknown[], columns: string[]) => void, - ) => Promise; - run: ( - db: number, - sql: string, - params: unknown[] | null, - ) => Promise; - execWithParams: ( - db: number, - sql: string, - params: unknown[] | null, - ) => Promise<{ rows: unknown[][]; columns: string[] }>; - SQLITE_OPEN_READWRITE: number; - SQLITE_OPEN_CREATE: number; -} - /** * Simple async mutex for serializing database operations * @rivetkit/sqlite calls are not safe to run concurrently on one module instance @@ -160,9 +211,16 @@ export class Database { * @param params - Parameter values to bind */ async run(sql: string, params?: unknown[]): Promise { - await this.#sqliteMutex.run(async () => - this.#sqlite3.run(this.#handle, sql, params ?? null), - ); + await this.#sqliteMutex.run(async () => { + for await (const stmt of this.#sqlite3.statements(this.#handle, sql)) { + if (params && params.length > 0) { + this.#sqlite3.bind_collection(stmt, params as SqliteBindings); + } + while ((await this.#sqlite3.step(stmt)) === SQLITE_ROW) { + // Consume rows for statements that return results. + } + } + }); } /** @@ -172,9 +230,24 @@ export class Database { * @returns Object with rows (array of arrays) and columns (column names) */ async query(sql: string, params?: unknown[]): Promise<{ rows: unknown[][]; columns: string[] }> { - return this.#sqliteMutex.run(async () => - this.#sqlite3.execWithParams(this.#handle, sql, params ?? null), - ); + return this.#sqliteMutex.run(async () => { + const rows: unknown[][] = []; + let columns: string[] = []; + for await (const stmt of this.#sqlite3.statements(this.#handle, sql)) { + if (params && params.length > 0) { + this.#sqlite3.bind_collection(stmt, params as SqliteBindings); + } + + while ((await this.#sqlite3.step(stmt)) === SQLITE_ROW) { + if (columns.length === 0) { + columns = this.#sqlite3.column_names(stmt); + } + rows.push(this.#sqlite3.row(stmt)); + } + } + + return { rows, columns }; + }); } /** @@ -232,18 +305,14 @@ export class SqliteVfs { // Synchronously create the promise if not started if (!this.#initPromise) { - this.#initPromise = (async () => { - // Load WASM binary (Node.js environment) - const require = createRequire(import.meta.url); - const wasmPath = require.resolve("@rivetkit/sqlite/dist/wa-sqlite-async.wasm"); - const wasmBinary = readFileSync(wasmPath); - - // Initialize @rivetkit/sqlite module - each instance gets its own module - const module = await SQLiteESMFactory({ wasmBinary }); - this.#sqlite3 = Factory(module) as unknown as SQLite3Api; - - // Create and register VFS with unique name - this.#sqliteSystem = new SqliteSystem(this.#sqlite3, `kv-vfs-${this.#instanceId}`); + this.#initPromise = (async () => { + const { sqlite3, module } = await loadSqliteRuntime(); + this.#sqlite3 = sqlite3; + this.#sqliteSystem = new SqliteSystem( + sqlite3, + module, + `kv-vfs-${this.#instanceId}`, + ); this.#sqliteSystem.register(); })(); } @@ -273,15 +342,14 @@ export class SqliteVfs { throw new Error("Failed to initialize SQLite"); } - // Register this filename with its KV options - this.#sqliteSystem.registerFile(fileName, options); + // Register this filename with its KV options + this.#sqliteSystem.registerFile(fileName, options); - // Open database + // Open database const db = await this.#sqliteMutex.run(async () => this.#sqlite3!.open_v2( fileName, - this.#sqlite3!.SQLITE_OPEN_READWRITE | - this.#sqlite3!.SQLITE_OPEN_CREATE, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, this.#sqliteSystem!.name, ), ); @@ -292,16 +360,16 @@ export class SqliteVfs { sqliteSystem.unregisterFile(fileName); }; - return new Database( - this.#sqlite3, - db, - fileName, - onClose, - this.#sqliteMutex, - ); - } finally { - this.#openMutex.release(); - } + return new Database( + this.#sqlite3, + db, + fileName, + onClose, + this.#sqliteMutex, + ); + } finally { + this.#openMutex.release(); + } } } @@ -309,15 +377,24 @@ export class SqliteVfs { * Internal VFS implementation */ class SqliteSystem extends VFS.Base { - readonly name: string; - readonly #fileOptions: Map = new Map(); + #mainFileName: string | null = null; + #mainFileOptions: KvVfsOptions | null = null; readonly #openFiles: Map = new Map(); readonly #sqlite3: SQLite3Api; + readonly #module: SQLiteModule; + #heapDataView: DataView; + #heapDataViewBuffer: ArrayBufferLike; - constructor(sqlite3: SQLite3Api, name: string) { - super(); + constructor(sqlite3: SQLite3Api, module: SQLiteModule, name: string) { + super(name, module); this.#sqlite3 = sqlite3; - this.name = name; + this.#module = module; + this.#heapDataViewBuffer = module.HEAPU8.buffer; + this.#heapDataView = new DataView(this.#heapDataViewBuffer); + } + + hasAsyncMethod(methodName: string): boolean { + return SQLITE_ASYNC_METHODS.has(methodName); } /** @@ -331,380 +408,456 @@ class SqliteSystem extends VFS.Base { * Registers a file with its KV options (before opening) */ registerFile(fileName: string, options: KvVfsOptions): void { - this.#fileOptions.set(fileName, options); + if (!this.#mainFileName) { + this.#mainFileName = fileName; + this.#mainFileOptions = options; + return; + } + + if (this.#mainFileName !== fileName) { + throw new Error( + `SqliteSystem is actor-scoped and expects one main file. Got ${fileName}, expected ${this.#mainFileName}.`, + ); + } + + this.#mainFileOptions = options; } /** * Unregisters a file's KV options (after closing) */ unregisterFile(fileName: string): void { - this.#fileOptions.delete(fileName); + if (this.#mainFileName === fileName) { + this.#mainFileName = null; + this.#mainFileOptions = null; + } } /** - * Gets KV options for a file, handling journal/wal files by using the main database's options + * Resolve file path to the actor's main DB file or known SQLite sidecars. */ - #getOptionsForPath(path: string): KvVfsOptions | undefined { - let options = this.#fileOptions.get(path); - if (!options) { - // Try to find the main database file by removing common SQLite suffixes - const mainDbPath = path - .replace(/-journal$/, "") - .replace(/-wal$/, "") - .replace(/-shm$/, ""); - - if (mainDbPath !== path) { - options = this.#fileOptions.get(mainDbPath); - } + #resolveFile(path: string): ResolvedFile | null { + if (!this.#mainFileName || !this.#mainFileOptions) { + return null; + } + + if (path === this.#mainFileName) { + return { options: this.#mainFileOptions, fileTag: FILE_TAG_MAIN }; } - return options; + if (path === `${this.#mainFileName}-journal`) { + return { options: this.#mainFileOptions, fileTag: FILE_TAG_JOURNAL }; + } + if (path === `${this.#mainFileName}-wal`) { + return { options: this.#mainFileOptions, fileTag: FILE_TAG_WAL }; + } + if (path === `${this.#mainFileName}-shm`) { + return { options: this.#mainFileOptions, fileTag: FILE_TAG_SHM }; + } + + return null; } - /** - * Opens a file - */ - xOpen( - path: string | null, + #resolveFileOrThrow(path: string): ResolvedFile { + const resolved = this.#resolveFile(path); + if (resolved) { + return resolved; + } + + if (!this.#mainFileName) { + throw new Error(`No KV options registered for file: ${path}`); + } + + throw new Error( + `Unsupported SQLite file path ${path}. Expected one of ${this.#mainFileName}, ${this.#mainFileName}-journal, ${this.#mainFileName}-wal, ${this.#mainFileName}-shm.`, + ); + } + + async xOpen( + _pVfs: number, + zName: number, fileId: number, flags: number, - pOutFlags: DataView, - ): number { - return this.handleAsync(async () => { - if (!path) { - return VFS.SQLITE_CANTOPEN; - } + pOutFlags: number, + ): Promise { + const path = this.#decodeFilename(zName, flags); + if (!path) { + return VFS.SQLITE_CANTOPEN; + } - // Get the registered KV options for this file - // For journal/wal files, use the main database's options - const options = this.#getOptionsForPath(path); - if (!options) { - throw new Error(`No KV options registered for file: ${path}`); - } + // Get the registered KV options for this file + // For journal/wal files, use the main database's options + const { options, fileTag } = this.#resolveFileOrThrow(path); + let metaKey = getMetaKey(fileTag); + let chunkKeyForIndex = createChunkKeyFactory(fileTag); - // Get existing file size if the file exists - const metaKey = getMetaKey(path); - const sizeData = await options.get(metaKey); + // Get existing file size if the file exists + let sizeData = await options.get(metaKey); + if (!sizeData) { + // Backward compatibility for filename-based keys used before file tags. + const legacyMetaKey = getLegacyMetaKey(path); + const legacySizeData = await options.get(legacyMetaKey); + if (legacySizeData) { + metaKey = legacyMetaKey; + chunkKeyForIndex = createLegacyChunkKeyFactory(path); + sizeData = legacySizeData; + } + } - let size: number; + let size: number; + + if (sizeData) { + // File exists, use existing size + size = decodeFileMeta(sizeData); + } else if (flags & VFS.SQLITE_OPEN_CREATE) { + // File doesn't exist, create it + size = 0; + await options.put(metaKey, encodeFileMeta(size)); + } else { + // File doesn't exist and we're not creating it + return VFS.SQLITE_CANTOPEN; + } - if (sizeData) { - // File exists, use existing size - size = decodeFileMeta(sizeData); - } else if (flags & VFS.SQLITE_OPEN_CREATE) { - // File doesn't exist, create it - size = 0; - await options.put(metaKey, encodeFileMeta(size)); - } else { - // File doesn't exist and we're not creating it - return VFS.SQLITE_CANTOPEN; - } + // Store open file info with options + this.#openFiles.set(fileId, { + path, + metaKey, + chunkKeyForIndex, + size, + metaDirty: false, + flags, + options, + }); - // Store open file info with options - this.#openFiles.set(fileId, { - path, - size, - flags, - options, - }); + // Set output flags to the actual flags used. + this.#writeInt32(pOutFlags, flags); - // Set output flags - pOutFlags.setInt32(0, flags & VFS.SQLITE_OPEN_READONLY ? 1 : 0, true); + return VFS.SQLITE_OK; + } + async xClose(fileId: number): Promise { + const file = this.#openFiles.get(fileId); + if (!file) { return VFS.SQLITE_OK; - }); - } + } - /** - * Closes a file - */ - xClose(fileId: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_OK; - } + if (file.metaDirty) { + await file.options.put(file.metaKey, encodeFileMeta(file.size)); + file.metaDirty = false; + } - // Delete file if SQLITE_OPEN_DELETEONCLOSE flag was set - if (file.flags & VFS.SQLITE_OPEN_DELETEONCLOSE) { - await this.#delete(file.path); - } + // Delete file if SQLITE_OPEN_DELETEONCLOSE flag was set + if (file.flags & VFS.SQLITE_OPEN_DELETEONCLOSE) { + await this.#delete(file.path); + } - this.#openFiles.delete(fileId); - return VFS.SQLITE_OK; - }); + this.#openFiles.delete(fileId); + return VFS.SQLITE_OK; } - /** - * Reads data from a file - */ - // @ts-expect-error - VFS.Base types are incorrect, runtime uses Uint8Array - xRead(fileId: number, pData: Uint8Array, iOffset: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_READ; - } + async xRead( + fileId: number, + pData: number, + iAmt: number, + iOffsetLo: number, + iOffsetHi: number, + ): Promise { + if (iAmt === 0) { + return VFS.SQLITE_OK; + } - const options = file.options; - const requestedLength = pData.length; - const fileSize = file.size; + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR_READ; + } - // If offset is beyond file size, return short read with zeroed buffer - if (iOffset >= fileSize) { - pData.fill(0); - return VFS.SQLITE_IOERR_SHORT_READ; - } + const data = this.#module.HEAPU8.subarray(pData, pData + iAmt); + const options = file.options; + const requestedLength = iAmt; + const iOffset = delegalize(iOffsetLo, iOffsetHi); + const fileSize = file.size; - // Calculate which chunks we need to read - const startChunk = Math.floor(iOffset / CHUNK_SIZE); - const endChunk = Math.floor((iOffset + requestedLength - 1) / CHUNK_SIZE); + // If offset is beyond file size, return short read with zeroed buffer + if (iOffset >= fileSize) { + data.fill(0); + return VFS.SQLITE_IOERR_SHORT_READ; + } - // Fetch all needed chunks - const chunkKeys: Uint8Array[] = []; - for (let i = startChunk; i <= endChunk; i++) { - chunkKeys.push(getChunkKey(file.path, i)); - } + // Calculate which chunks we need to read + const startChunk = Math.floor(iOffset / CHUNK_SIZE); + const endChunk = Math.floor((iOffset + requestedLength - 1) / CHUNK_SIZE); - const chunks = await options.getBatch(chunkKeys); + // Fetch all needed chunks + const chunkKeys: Uint8Array[] = []; + for (let i = startChunk; i <= endChunk; i++) { + chunkKeys.push(file.chunkKeyForIndex(i)); + } - // Copy data from chunks to output buffer - for (let i = startChunk; i <= endChunk; i++) { - const chunkData = chunks[i - startChunk]; - const chunkOffset = i * CHUNK_SIZE; + const chunks = await options.getBatch(chunkKeys); - // Calculate the range within this chunk - const readStart = Math.max(0, iOffset - chunkOffset); - const readEnd = Math.min( - CHUNK_SIZE, - iOffset + requestedLength - chunkOffset, - ); + // Copy data from chunks to output buffer + for (let i = startChunk; i <= endChunk; i++) { + const chunkData = chunks[i - startChunk]; + const chunkOffset = i * CHUNK_SIZE; - if (chunkData) { - // Copy available data - const sourceStart = readStart; - const sourceEnd = Math.min(readEnd, chunkData.length); - const destStart = chunkOffset + readStart - iOffset; - - if (sourceEnd > sourceStart) { - pData.set( - chunkData.slice(sourceStart, sourceEnd), - destStart, - ); - } + // Calculate the range within this chunk + const readStart = Math.max(0, iOffset - chunkOffset); + const readEnd = Math.min( + CHUNK_SIZE, + iOffset + requestedLength - chunkOffset, + ); - // Zero-fill if chunk is smaller than expected - if (sourceEnd < readEnd) { - const zeroStart = destStart + (sourceEnd - sourceStart); - const zeroEnd = destStart + (readEnd - readStart); - pData.fill(0, zeroStart, zeroEnd); - } - } else { - // Chunk doesn't exist, zero-fill - const destStart = chunkOffset + readStart - iOffset; - const destEnd = destStart + (readEnd - readStart); - pData.fill(0, destStart, destEnd); + if (chunkData) { + // Copy available data + const sourceStart = readStart; + const sourceEnd = Math.min(readEnd, chunkData.length); + const destStart = chunkOffset + readStart - iOffset; + + if (sourceEnd > sourceStart) { + data.set(chunkData.subarray(sourceStart, sourceEnd), destStart); } - } - // If we read less than requested (past EOF), return short read - const actualBytes = Math.min(requestedLength, fileSize - iOffset); - if (actualBytes < requestedLength) { - pData.fill(0, actualBytes); - return VFS.SQLITE_IOERR_SHORT_READ; + // Zero-fill if chunk is smaller than expected + if (sourceEnd < readEnd) { + const zeroStart = destStart + (sourceEnd - sourceStart); + const zeroEnd = destStart + (readEnd - readStart); + data.fill(0, zeroStart, zeroEnd); + } + } else { + // Chunk doesn't exist, zero-fill + const destStart = chunkOffset + readStart - iOffset; + const destEnd = destStart + (readEnd - readStart); + data.fill(0, destStart, destEnd); } + } - return VFS.SQLITE_OK; - }); + // If we read less than requested (past EOF), return short read + const actualBytes = Math.min(requestedLength, fileSize - iOffset); + if (actualBytes < requestedLength) { + data.fill(0, actualBytes); + return VFS.SQLITE_IOERR_SHORT_READ; + } + + return VFS.SQLITE_OK; } - /** - * Writes data to a file - */ - // @ts-expect-error - VFS.Base types are incorrect, runtime uses Uint8Array - xWrite(fileId: number, pData: Uint8Array, iOffset: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_WRITE; - } + async xWrite( + fileId: number, + pData: number, + iAmt: number, + iOffsetLo: number, + iOffsetHi: number, + ): Promise { + if (iAmt === 0) { + return VFS.SQLITE_OK; + } - const options = file.options; - const writeLength = pData.length; + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR_WRITE; + } - // Calculate which chunks we need to modify - const startChunk = Math.floor(iOffset / CHUNK_SIZE); - const endChunk = Math.floor((iOffset + writeLength - 1) / CHUNK_SIZE); + const data = this.#module.HEAPU8.subarray(pData, pData + iAmt); + const iOffset = delegalize(iOffsetLo, iOffsetHi); + const options = file.options; + const writeLength = iAmt; + + // Calculate which chunks we need to modify + const startChunk = Math.floor(iOffset / CHUNK_SIZE); + const endChunk = Math.floor((iOffset + writeLength - 1) / CHUNK_SIZE); + + interface WritePlan { + chunkKey: Uint8Array; + chunkOffset: number; + writeStart: number; + writeEnd: number; + existingChunkIndex: number; + } - // Fetch existing chunks that we'll need to modify - const chunkKeys: Uint8Array[] = []; - for (let i = startChunk; i <= endChunk; i++) { - chunkKeys.push(getChunkKey(file.path, i)); + // Only fetch chunks where we must preserve existing prefix/suffix bytes. + const plans: WritePlan[] = []; + const chunkKeysToFetch: Uint8Array[] = []; + for (let i = startChunk; i <= endChunk; i++) { + const chunkOffset = i * CHUNK_SIZE; + const writeStart = Math.max(0, iOffset - chunkOffset); + const writeEnd = Math.min( + CHUNK_SIZE, + iOffset + writeLength - chunkOffset, + ); + const existingBytesInChunk = Math.max( + 0, + Math.min(CHUNK_SIZE, file.size - chunkOffset), + ); + const needsExisting = writeStart > 0 || existingBytesInChunk > writeEnd; + const chunkKey = file.chunkKeyForIndex(i); + let existingChunkIndex = -1; + if (needsExisting) { + existingChunkIndex = chunkKeysToFetch.length; + chunkKeysToFetch.push(chunkKey); } + plans.push({ + chunkKey, + chunkOffset, + writeStart, + writeEnd, + existingChunkIndex, + }); + } - const existingChunks = await options.getBatch(chunkKeys); - - // Prepare new chunk data - const entriesToWrite: [Uint8Array, Uint8Array][] = []; - - for (let i = startChunk; i <= endChunk; i++) { - const chunkOffset = i * CHUNK_SIZE; - const existingChunk = existingChunks[i - startChunk]; - - // Calculate the range within this chunk that we're writing - const writeStart = Math.max(0, iOffset - chunkOffset); - const writeEnd = Math.min( - CHUNK_SIZE, - iOffset + writeLength - chunkOffset, - ); - - // Calculate the size this chunk needs to be - const requiredSize = writeEnd; - - // Create new chunk data - let newChunk: Uint8Array; - if (existingChunk && existingChunk.length >= requiredSize) { - // Use existing chunk (copy it so we can modify) - newChunk = new Uint8Array(Math.max(existingChunk.length, requiredSize)); - newChunk.set(existingChunk); - } else if (existingChunk) { - // Need to expand existing chunk - newChunk = new Uint8Array(requiredSize); - newChunk.set(existingChunk); - } else { - // Create new chunk - newChunk = new Uint8Array(requiredSize); - } + const existingChunks = chunkKeysToFetch.length > 0 + ? await options.getBatch(chunkKeysToFetch) + : []; + + // Prepare new chunk data + const entriesToWrite: [Uint8Array, Uint8Array][] = []; + + for (const plan of plans) { + const existingChunk = + plan.existingChunkIndex >= 0 + ? existingChunks[plan.existingChunkIndex] + : null; + // Create new chunk data + let newChunk: Uint8Array; + if (existingChunk) { + newChunk = new Uint8Array(Math.max(existingChunk.length, plan.writeEnd)); + newChunk.set(existingChunk); + } else { + newChunk = new Uint8Array(plan.writeEnd); + } - // Copy data from input buffer to chunk - const sourceStart = chunkOffset + writeStart - iOffset; - const sourceEnd = sourceStart + (writeEnd - writeStart); - newChunk.set(pData.slice(sourceStart, sourceEnd), writeStart); + // Copy data from input buffer to chunk + const sourceStart = plan.chunkOffset + plan.writeStart - iOffset; + const sourceEnd = sourceStart + (plan.writeEnd - plan.writeStart); + newChunk.set(data.subarray(sourceStart, sourceEnd), plan.writeStart); - entriesToWrite.push([getChunkKey(file.path, i), newChunk]); - } + entriesToWrite.push([plan.chunkKey, newChunk]); + } - // Update file size if we wrote past the end - const newSize = Math.max(file.size, iOffset + writeLength); - if (newSize !== file.size) { - file.size = newSize; - entriesToWrite.push([getMetaKey(file.path), encodeFileMeta(file.size)]); - } + // Update file size if we wrote past the end + const previousSize = file.size; + const newSize = Math.max(file.size, iOffset + writeLength); + if (newSize !== previousSize) { + file.size = newSize; + file.metaDirty = true; + } + if (file.metaDirty) { + entriesToWrite.push([file.metaKey, encodeFileMeta(file.size)]); + } - // Write all chunks and metadata - await options.putBatch(entriesToWrite); + // Write all chunks and metadata + await options.putBatch(entriesToWrite); + if (file.metaDirty) { + file.metaDirty = false; + } - return VFS.SQLITE_OK; - }); + return VFS.SQLITE_OK; } - /** - * Truncates a file - */ - xTruncate(fileId: number, size: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_TRUNCATE; - } + async xTruncate( + fileId: number, + sizeLo: number, + sizeHi: number, + ): Promise { + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR_TRUNCATE; + } - const options = file.options; + const size = delegalize(sizeLo, sizeHi); + const options = file.options; - // If truncating to larger size, just update metadata - if (size >= file.size) { - return VFS.SQLITE_OK; + // If truncating to larger size, just update metadata + if (size >= file.size) { + if (size > file.size) { + file.size = size; + file.metaDirty = true; + await options.put(file.metaKey, encodeFileMeta(file.size)); + file.metaDirty = false; } + return VFS.SQLITE_OK; + } - // Calculate which chunks to delete - // Note: When size=0, lastChunkToKeep = floor(-1/4096) = -1, which means - // all chunks (starting from index 0) will be deleted in the loop below. - const lastChunkToKeep = Math.floor((size - 1) / CHUNK_SIZE); - const lastExistingChunk = Math.floor((file.size - 1) / CHUNK_SIZE); + // Calculate which chunks to delete + // Note: When size=0, lastChunkToKeep = floor(-1/4096) = -1, which means + // all chunks (starting from index 0) will be deleted in the loop below. + const lastChunkToKeep = Math.floor((size - 1) / CHUNK_SIZE); + const lastExistingChunk = Math.floor((file.size - 1) / CHUNK_SIZE); - // Delete chunks beyond the new size - const keysToDelete: Uint8Array[] = []; - for (let i = lastChunkToKeep + 1; i <= lastExistingChunk; i++) { - keysToDelete.push(getChunkKey(file.path, i)); - } + // Delete chunks beyond the new size + const keysToDelete: Uint8Array[] = []; + for (let i = lastChunkToKeep + 1; i <= lastExistingChunk; i++) { + keysToDelete.push(file.chunkKeyForIndex(i)); + } - if (keysToDelete.length > 0) { - await options.deleteBatch(keysToDelete); - } + if (keysToDelete.length > 0) { + await options.deleteBatch(keysToDelete); + } - // Truncate the last kept chunk if needed - if (size > 0 && size % CHUNK_SIZE !== 0) { - const lastChunkKey = getChunkKey(file.path, lastChunkToKeep); - const lastChunkData = await options.get(lastChunkKey); + // Truncate the last kept chunk if needed + if (size > 0 && size % CHUNK_SIZE !== 0) { + const lastChunkKey = file.chunkKeyForIndex(lastChunkToKeep); + const lastChunkData = await options.get(lastChunkKey); - if (lastChunkData && lastChunkData.length > size % CHUNK_SIZE) { - const truncatedChunk = lastChunkData.slice(0, size % CHUNK_SIZE); - await options.put(lastChunkKey, truncatedChunk); - } + if (lastChunkData && lastChunkData.length > size % CHUNK_SIZE) { + const truncatedChunk = lastChunkData.subarray(0, size % CHUNK_SIZE); + await options.put(lastChunkKey, truncatedChunk); } + } - // Update file size - file.size = size; - await options.put(getMetaKey(file.path), encodeFileMeta(file.size)); + // Update file size + file.size = size; + file.metaDirty = true; + await options.put(file.metaKey, encodeFileMeta(file.size)); + file.metaDirty = false; - return VFS.SQLITE_OK; - }); + return VFS.SQLITE_OK; } - /** - * Syncs file data to storage - */ - xSync(fileId: number, _flags: number): number { - return this.handleAsync(async () => { - // KV storage is immediately durable, so sync is a no-op - // But we should ensure size is persisted - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_OK; - } - - const options = file.options; - await options.put(getMetaKey(file.path), encodeFileMeta(file.size)); + async xSync(fileId: number, _flags: number): Promise { + const file = this.#openFiles.get(fileId); + if (!file || !file.metaDirty) { return VFS.SQLITE_OK; - }); + } + + await file.options.put(file.metaKey, encodeFileMeta(file.size)); + file.metaDirty = false; + return VFS.SQLITE_OK; } - /** - * Gets the file size - */ - xFileSize(fileId: number, pSize: DataView): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_FSTAT; - } + async xFileSize(fileId: number, pSize: number): Promise { + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR_FSTAT; + } - // Set size as 64-bit integer (low and high parts) - pSize.setBigInt64(0, BigInt(file.size), true); - return VFS.SQLITE_OK; - }); + // Set size as 64-bit integer. + this.#writeBigInt64(pSize, BigInt(file.size)); + return VFS.SQLITE_OK; } - /** - * Deletes a file - */ - xDelete(path: string, _syncDir: number): number { - return this.handleAsync(async () => { - await this.#delete(path); - return VFS.SQLITE_OK; - }); + async xDelete(_pVfs: number, zName: number, _syncDir: number): Promise { + await this.#delete(this.#module.UTF8ToString(zName)); + return VFS.SQLITE_OK; } /** * Internal delete implementation */ async #delete(path: string): Promise { - const options = this.#getOptionsForPath(path); - if (!options) { - throw new Error(`No KV options registered for file: ${path}`); - } + const { options, fileTag } = this.#resolveFileOrThrow(path); + let metaKey = getMetaKey(fileTag); + let chunkKeyForIndex = createChunkKeyFactory(fileTag); // Get file size to find out how many chunks to delete - const metaKey = getMetaKey(path); - const sizeData = await options.get(metaKey); + let sizeData = await options.get(metaKey); + if (!sizeData) { + const legacyMetaKey = getLegacyMetaKey(path); + const legacySizeData = await options.get(legacyMetaKey); + if (legacySizeData) { + metaKey = legacyMetaKey; + chunkKeyForIndex = createLegacyChunkKeyFactory(path); + sizeData = legacySizeData; + } + } if (!sizeData) { // File doesn't exist, that's OK @@ -717,30 +870,150 @@ class SqliteSystem extends VFS.Base { const keysToDelete: Uint8Array[] = [metaKey]; const numChunks = Math.ceil(size / CHUNK_SIZE); for (let i = 0; i < numChunks; i++) { - keysToDelete.push(getChunkKey(path, i)); + keysToDelete.push(chunkKeyForIndex(i)); } await options.deleteBatch(keysToDelete); } - /** - * Checks file accessibility - */ - xAccess(path: string, _flags: number, pResOut: DataView): number { - return this.handleAsync(async () => { - const options = this.#getOptionsForPath(path); - if (!options) { - // File not registered, doesn't exist - pResOut.setInt32(0, 0, true); - return VFS.SQLITE_OK; + async xAccess( + _pVfs: number, + zName: number, + _flags: number, + pResOut: number, + ): Promise { + const path = this.#module.UTF8ToString(zName); + const resolved = this.#resolveFile(path); + if (!resolved) { + // File not registered, doesn't exist + this.#writeInt32(pResOut, 0); + return VFS.SQLITE_OK; + } + + const compactMetaKey = getMetaKey(resolved.fileTag); + const compactMetaData = await resolved.options.get(compactMetaKey); + const metaData = compactMetaData ?? + await resolved.options.get(getLegacyMetaKey(path)); + + // Set result: 1 if file exists, 0 otherwise + this.#writeInt32(pResOut, metaData ? 1 : 0); + return VFS.SQLITE_OK; + } + + xCheckReservedLock(_fileId: number, pResOut: number): number { + // This VFS is actor-scoped with one writer, so there is no external + // reserved lock state to report. + this.#writeInt32(pResOut, 0); + return VFS.SQLITE_OK; + } + + xFullPathname(_pVfs: number, zName: number, nOut: number, zOut: number): number { + const path = this.#module.UTF8ToString(zName); + const bytes = TEXT_ENCODER.encode(path); + const out = this.#module.HEAPU8.subarray(zOut, zOut + nOut); + if (bytes.length >= out.length) { + return VFS.SQLITE_IOERR; + } + out.set(bytes, 0); + out[bytes.length] = 0; + return VFS.SQLITE_OK; + } + + #decodeFilename(zName: number, flags: number): string | null { + if (!zName) { + return null; + } + + if (flags & VFS.SQLITE_OPEN_URI) { + // Decode SQLite URI filename layout: path\0key\0value\0...\0 + let pName = zName; + let state: 1 | 2 | 3 | null = 1; + const charCodes: number[] = []; + while (state) { + const charCode = this.#module.HEAPU8[pName++]; + if (charCode) { + charCodes.push(charCode); + continue; + } + + if (!this.#module.HEAPU8[pName]) { + state = null; + } + switch (state) { + case 1: + charCodes.push("?".charCodeAt(0)); + state = 2; + break; + case 2: + charCodes.push("=".charCodeAt(0)); + state = 3; + break; + case 3: + charCodes.push("&".charCodeAt(0)); + state = 2; + break; + } } + return TEXT_DECODER.decode(new Uint8Array(charCodes)); + } - const metaKey = getMetaKey(path); - const metaData = await options.get(metaKey); + return this.#module.UTF8ToString(zName); + } - // Set result: 1 if file exists, 0 otherwise - pResOut.setInt32(0, metaData ? 1 : 0, true); - return VFS.SQLITE_OK; - }); + #heapView(): DataView { + const heapBuffer = this.#module.HEAPU8.buffer; + if (heapBuffer !== this.#heapDataViewBuffer) { + this.#heapDataViewBuffer = heapBuffer; + this.#heapDataView = new DataView(heapBuffer); + } + return this.#heapDataView; + } + + #writeInt32(pointer: number, value: number): void { + const heapByteOffset = this.#module.HEAPU8.byteOffset + pointer; + this.#heapView().setInt32(heapByteOffset, value, true); } + + #writeBigInt64(pointer: number, value: bigint): void { + const heapByteOffset = this.#module.HEAPU8.byteOffset + pointer; + this.#heapView().setBigInt64(heapByteOffset, value, true); + } +} + +/** + * Rebuild an i64 from Emscripten's legalized (lo32, hi32) pair. + * SQLite passes file offsets and sizes this way, so we normalize negative lo32 + * back to unsigned low-word form to keep >32-bit chunk addressing correct. + */ +function delegalize(lo32: number, hi32: number): number { + return (hi32 * 0x100000000) + lo32 + (lo32 < 0 ? 2 ** 32 : 0); +} + +function getLegacyMetaKey(fileName: string): Uint8Array { + const fileNameBytes = TEXT_ENCODER.encode(fileName); + const key = new Uint8Array(2 + fileNameBytes.length); + key[0] = SQLITE_PREFIX; + key[1] = META_PREFIX; + key.set(fileNameBytes, 2); + return key; +} + +function createLegacyChunkKeyFactory(fileName: string): (chunkIndex: number) => Uint8Array { + const fileNameBytes = TEXT_ENCODER.encode(fileName); + const prefix = new Uint8Array(2 + fileNameBytes.length + 1); + prefix[0] = SQLITE_PREFIX; + prefix[1] = CHUNK_PREFIX; + prefix.set(fileNameBytes, 2); + prefix[prefix.length - 1] = 0; + + return (chunkIndex: number): Uint8Array => { + const key = new Uint8Array(prefix.length + 4); + key.set(prefix, 0); + const offset = prefix.length; + key[offset + 0] = (chunkIndex >>> 24) & 0xff; + key[offset + 1] = (chunkIndex >>> 16) & 0xff; + key[offset + 2] = (chunkIndex >>> 8) & 0xff; + key[offset + 3] = chunkIndex & 0xff; + return key; + }; } diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/wa-sqlite.d.ts b/rivetkit-typescript/packages/sqlite-vfs/src/wa-sqlite.d.ts deleted file mode 100644 index eeb81a13b9..0000000000 --- a/rivetkit-typescript/packages/sqlite-vfs/src/wa-sqlite.d.ts +++ /dev/null @@ -1,26 +0,0 @@ -declare module "@rivetkit/sqlite" { - export function Factory(module: any): any; -} - -declare module "@rivetkit/sqlite/src/VFS.js" { - export class Base { - handleAsync(fn: () => Promise): number; - } - - export const SQLITE_OK: number; - export const SQLITE_CANTOPEN: number; - export const SQLITE_IOERR_READ: number; - export const SQLITE_IOERR_SHORT_READ: number; - export const SQLITE_IOERR_WRITE: number; - export const SQLITE_IOERR_TRUNCATE: number; - export const SQLITE_IOERR_FSTAT: number; - export const SQLITE_OPEN_CREATE: number; - export const SQLITE_OPEN_READONLY: number; - export const SQLITE_OPEN_DELETEONCLOSE: number; - export const SQLITE_OPEN_READWRITE: number; -} - -declare module "@rivetkit/sqlite/dist/wa-sqlite-async.mjs" { - const factory: (config?: { wasmBinary?: ArrayBuffer }) => Promise; - export default factory; -} diff --git a/specs/sqlite-vfs-rust-wasm.md b/specs/sqlite-vfs-rust-wasm.md index 93b03a3f8d..324afc3146 100644 --- a/specs/sqlite-vfs-rust-wasm.md +++ b/specs/sqlite-vfs-rust-wasm.md @@ -78,7 +78,7 @@ Replace `@rivetkit/sqlite-vfs` with a Rust-backed implementation that uses embed - Do not link against system SQLite to preserve feature parity. - Wasm: - Build SQLite to wasm with async host call support to allow `getBatch` and `putBatch` to be awaited. - - Follow the same feature set as the wa-sqlite build for compatibility. + - Follow the same feature set as the @rivetkit/sqlite build for compatibility. - The sqlite-wasm module must expose the VFS registration and open/exec APIs used today. ## API and ABI Design @@ -186,5 +186,5 @@ Replace `@rivetkit/sqlite-vfs` with a Rust-backed implementation that uses embed ## Open Questions -- Which compile-time SQLite flags are enabled in wa-sqlite. These must be mirrored in both native and wasm builds and documented here once identified. +- Which compile-time SQLite flags are enabled in @rivetkit/sqlite. These must be mirrored in both native and wasm builds and documented here once identified. - Whether to expose additional debug logging flags consistent with `VFS_DEBUG`.