diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts index f010a5f6ac..0e0c5bad0b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts @@ -14,8 +14,6 @@ import { CURRENT_VERSION as FILE_SYSTEM_DRIVER_CURRENT_VERSION, } from "@/schemas/file-system-driver/versioned"; import { - arrayBuffersEqual, - bufferToArrayBuffer, type LongTimeoutHandle, promiseWithResolvers, setLongTimeout, @@ -33,6 +31,13 @@ import { ensureDirectoryExistsSync, getStoragePath, } from "./utils"; +import { + computePrefixUpperBound, + ensureUint8Array, + loadSqliteRuntime, + type SqliteRuntime, + type SqliteRuntimeDatabase, +} from "./sqlite-runtime"; // Actor handler to track running instances @@ -78,6 +83,8 @@ export interface FileSystemDriverOptions { persist?: boolean; /** Custom path for storage */ customPath?: string; + /** Deprecated option retained for explicit migration to sqlite-only KV. */ + useNativeSqlite?: boolean; } /** @@ -90,6 +97,8 @@ export class FileSystemGlobalState { #alarmsDir: string; #persist: boolean; + #sqliteRuntime: SqliteRuntime; + #actorKvDatabases = new Map(); /** SQLite VFS instance for this driver. */ readonly sqliteVfs = new SqliteVfs(); @@ -120,8 +129,14 @@ export class FileSystemGlobalState { } constructor(options: FileSystemDriverOptions = {}) { - const { persist = true, customPath } = options; + const { persist = true, customPath, useNativeSqlite = true } = options; + if (!useNativeSqlite) { + throw new Error( + "File-system driver no longer supports non-SQLite KV storage.", + ); + } this.#persist = persist; + this.#sqliteRuntime = loadSqliteRuntime(); this.#storagePath = persist ? (customPath ?? getStoragePath()) : "/tmp"; const path = getNodePath(); this.#stateDir = path.join(this.#storagePath, "state"); @@ -146,6 +161,7 @@ export class FileSystemGlobalState { msg: "file system driver ready", dir: this.#storagePath, actorCount: this.#actorCountOnStartup, + sqliteRuntime: this.#sqliteRuntime.kind, }); // Cleanup stale temp files on startup @@ -157,8 +173,21 @@ export class FileSystemGlobalState { error: err, }); } + + try { + this.#migrateLegacyKvToSqliteOnStartupSync(); + } catch (error) { + logger().error({ + msg: "failed legacy kv startup migration", + error, + }); + throw error; + } } else { - logger().debug({ msg: "memory driver ready" }); + logger().debug({ + msg: "memory driver ready", + sqliteRuntime: this.#sqliteRuntime.kind, + }); } } @@ -174,6 +203,154 @@ export class FileSystemGlobalState { return getNodePath().join(this.#alarmsDir, actorId); } + #getActorKvDatabasePath(actorId: string): string { + if (this.#persist) { + return this.getActorDbPath(actorId); + } + return ":memory:"; + } + + #ensureActorKvTables(db: SqliteRuntimeDatabase): void { + db.exec(` + CREATE TABLE IF NOT EXISTS kv ( + key BLOB PRIMARY KEY NOT NULL, + value BLOB NOT NULL + ) + `); + } + + #getOrCreateActorKvDatabase(actorId: string): SqliteRuntimeDatabase { + const existing = this.#actorKvDatabases.get(actorId); + if (existing) { + return existing; + } + + const dbPath = this.#getActorKvDatabasePath(actorId); + if (this.#persist) { + const path = getNodePath(); + ensureDirectoryExistsSync(path.dirname(dbPath)); + } + + let db: SqliteRuntimeDatabase; + try { + db = this.#sqliteRuntime.open(dbPath); + } catch (error) { + throw new Error( + `failed to open actor kv database for actor ${actorId} at ${dbPath}: ${error}`, + ); + } + + this.#ensureActorKvTables(db); + this.#actorKvDatabases.set(actorId, db); + return db; + } + + #closeActorKvDatabase(actorId: string): void { + const db = this.#actorKvDatabases.get(actorId); + if (!db) { + return; + } + + try { + db.close(); + } finally { + this.#actorKvDatabases.delete(actorId); + } + } + + #putKvEntriesInDb( + db: SqliteRuntimeDatabase, + entries: [Uint8Array, Uint8Array][], + ): void { + if (entries.length === 0) { + return; + } + + db.exec("BEGIN"); + try { + for (const [key, value] of entries) { + db.run("INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)", [ + key, + value, + ]); + } + db.exec("COMMIT"); + } catch (error) { + try { + db.exec("ROLLBACK"); + } catch { + // Ignore rollback errors, original error is more actionable. + } + throw error; + } + } + + #isKvDbPopulated(db: SqliteRuntimeDatabase): boolean { + const row = db.get<{ count: number | bigint }>( + "SELECT COUNT(*) AS count FROM kv", + ); + const count = row ? Number(row.count) : 0; + return count > 0; + } + + #migrateLegacyKvToSqliteOnStartupSync(): void { + const fsSync = getNodeFsSync(); + if (!fsSync.existsSync(this.#stateDir)) { + return; + } + + const actorIds = fsSync + .readdirSync(this.#stateDir) + .filter((id) => !id.includes(".tmp.")); + + for (const actorId of actorIds) { + const statePath = this.getActorStatePath(actorId); + let state: schema.ActorState; + try { + const stateBytes = fsSync.readFileSync(statePath); + state = ACTOR_STATE_VERSIONED.deserializeWithEmbeddedVersion( + new Uint8Array(stateBytes), + ); + } catch (error) { + logger().warn({ + msg: "failed to parse actor state during startup migration", + actorId, + error, + }); + continue; + } + + if (!state.kvStorage || state.kvStorage.length === 0) { + continue; + } + + const dbPath = this.getActorDbPath(actorId); + const path = getNodePath(); + ensureDirectoryExistsSync(path.dirname(dbPath)); + const db = this.#sqliteRuntime.open(dbPath); + try { + this.#ensureActorKvTables(db); + if (this.#isKvDbPopulated(db)) { + continue; + } + + const legacyEntries = state.kvStorage.map((entry) => [ + new Uint8Array(entry.key), + new Uint8Array(entry.value), + ]) as [Uint8Array, Uint8Array][]; + this.#putKvEntriesInDb(db, legacyEntries); + + logger().info({ + msg: "migrated legacy actor kv storage to sqlite", + actorId, + entryCount: legacyEntries.length, + }); + } finally { + db.close(); + } + } + } + async *getActorsIterator(params: { cursor?: string; }): AsyncGenerator { @@ -260,15 +437,8 @@ export class FileSystemGlobalState { entry.generation = crypto.randomUUID(); } - // Initialize storage - const kvStorage: schema.ActorKvEntry[] = []; + // Initialize storage (runtime KV is stored in SQLite; state.kvStorage is legacy-only) const initialKvState = getInitialActorKvState(input); - for (const [key, value] of initialKvState) { - kvStorage.push({ - key: bufferToArrayBuffer(key), - value: bufferToArrayBuffer(value), - }); - } // Initialize metadata await this.#withActorWrite(actorId, async (lockedEntry) => { @@ -277,7 +447,7 @@ export class FileSystemGlobalState { name, key, createdAt: BigInt(Date.now()), - kvStorage, + kvStorage: [], startTs: null, connectableTs: null, sleepTs: null, @@ -291,6 +461,10 @@ export class FileSystemGlobalState { lockedEntry.state, ); } + if (initialKvState.length > 0) { + const db = this.#getOrCreateActorKvDatabase(actorId); + this.#putKvEntriesInDb(db, initialKvState); + } }); return entry; @@ -336,10 +510,16 @@ export class FileSystemGlobalState { const fs = getNodeFs(); const stateData = await fs.readFile(stateFilePath); - // Cache the loaded state in handler - entry.state = ACTOR_STATE_VERSIONED.deserializeWithEmbeddedVersion( - new Uint8Array(stateData), - ); + const loadedState = + ACTOR_STATE_VERSIONED.deserializeWithEmbeddedVersion( + new Uint8Array(stateData), + ); + + // Runtime reads/writes are SQLite-only; legacy kvStorage is for one-time startup migration. + entry.state = { + ...loadedState, + kvStorage: [], + }; return entry; } catch (innerError: any) { @@ -381,39 +561,36 @@ export class FileSystemGlobalState { entry.generation = crypto.randomUUID(); } - // Initialize kvStorage with the initial persist data - const kvStorage: schema.ActorKvEntry[] = []; - const initialKvState = getInitialActorKvState(input); - for (const [key, value] of initialKvState) { - kvStorage.push({ - key: bufferToArrayBuffer(key), - value: bufferToArrayBuffer(value), - }); - } + // Initialize storage (runtime KV is stored in SQLite; state.kvStorage is legacy-only) + const initialKvState = getInitialActorKvState(input); - await this.#withActorWrite(actorId, async (lockedEntry) => { - lockedEntry.state = { - actorId, - name, - key: key as readonly string[], - createdAt: BigInt(Date.now()), - kvStorage, - startTs: null, - connectableTs: null, - sleepTs: null, - destroyTs: null, - }; - if (this.#persist) { - await this.#performWrite( + await this.#withActorWrite(actorId, async (lockedEntry) => { + lockedEntry.state = { actorId, - lockedEntry.generation, - lockedEntry.state, - ); - } - }); + name, + key: key as readonly string[], + createdAt: BigInt(Date.now()), + kvStorage: [], + startTs: null, + connectableTs: null, + sleepTs: null, + destroyTs: null, + }; + if (this.#persist) { + await this.#performWrite( + actorId, + lockedEntry.generation, + lockedEntry.state, + ); + } + if (initialKvState.length > 0) { + const db = this.#getOrCreateActorKvDatabase(actorId); + this.#putKvEntriesInDb(db, initialKvState); + } + }); + } + return entry; } - return entry; - } async sleepActor(actorId: string) { invariant( @@ -461,11 +638,12 @@ export class FileSystemGlobalState { // Stop actor invariant(actor.actor, "actor should be loaded"); await actor.actor.onStop("sleep"); - } finally { - // Ensure any pending KV writes finish before removing the entry. - await this.#withActorWrite(actorId, async () => {}); - actor.stopPromise?.resolve(); - actor.stopPromise = undefined; + } finally { + // Ensure any pending KV writes finish before removing the entry. + await this.#withActorWrite(actorId, async () => {}); + this.#closeActorKvDatabase(actorId); + actor.stopPromise?.resolve(); + actor.stopPromise = undefined; // Remove from map after stop is complete this.#actors.delete(actorId); @@ -515,8 +693,9 @@ export class FileSystemGlobalState { await actor.actor.onStop("destroy"); } - // Ensure any pending KV writes finish before deleting files. - await this.#withActorWrite(actorId, async () => {}); + // Ensure any pending KV writes finish before deleting files. + await this.#withActorWrite(actorId, async () => {}); + this.#closeActorKvDatabase(actorId); // Clear alarm timeout if exists if (actor.alarmTimeout) { @@ -924,14 +1103,14 @@ export class FileSystemGlobalState { connectableTs: now, sleepTs: null, // Clear sleep timestamp when actor wakes up }; - if (this.#persist) { - await this.#performWrite( - actorId, - lockedEntry.generation, - lockedEntry.state, - ); - } - }); + if (this.#persist) { + await this.#performWrite( + actorId, + lockedEntry.generation, + lockedEntry.state, + ); + } + }); // Finish entry.startPromise.resolve(); @@ -1147,46 +1326,8 @@ export class FileSystemGlobalState { } throw new Error(`Actor ${actorId} state not loaded`); } - - // Create a mutable copy of kvStorage - const newKvStorage = [...entry.state.kvStorage]; - - // Update kvStorage with new entries - for (const [key, value] of entries) { - // Find existing entry with the same key - const existingIndex = newKvStorage.findIndex((e) => - arrayBuffersEqual(e.key, bufferToArrayBuffer(key)), - ); - - if (existingIndex >= 0) { - // Replace existing entry with new one - newKvStorage[existingIndex] = { - key: bufferToArrayBuffer(key), - value: bufferToArrayBuffer(value), - }; - } else { - // Add new entry - newKvStorage.push({ - key: bufferToArrayBuffer(key), - value: bufferToArrayBuffer(value), - }); - } - } - - // Update state with new kvStorage - entry.state = { - ...entry.state, - kvStorage: newKvStorage, - }; - - // Save state to disk - if (this.#persist) { - await this.#performWrite( - actorId, - entry.generation, - entry.state, - ); - } + const db = this.#getOrCreateActorKvDatabase(actorId); + this.#putKvEntriesInDb(db, entries); }); } @@ -1207,18 +1348,18 @@ export class FileSystemGlobalState { } } + const db = this.#getOrCreateActorKvDatabase(actorId); const results: (Uint8Array | null)[] = []; for (const key of keys) { - // Find entry with the same key - const foundEntry = entry.state.kvStorage.find((e) => - arrayBuffersEqual(e.key, bufferToArrayBuffer(key)), + const row = db.get<{ value: Uint8Array | ArrayBuffer }>( + "SELECT value FROM kv WHERE key = ?", + [key], ); - - if (foundEntry) { - results.push(new Uint8Array(foundEntry.value)); - } else { + if (!row) { results.push(null); + continue; } + results.push(ensureUint8Array(row.value, "value")); } return results; } @@ -1235,34 +1376,24 @@ export class FileSystemGlobalState { } throw new Error(`Actor ${actorId} state not loaded`); } - - // Create a mutable copy of kvStorage - const newKvStorage = [...entry.state.kvStorage]; - - // Delete entries from kvStorage - for (const key of keys) { - const indexToDelete = newKvStorage.findIndex((e) => - arrayBuffersEqual(e.key, bufferToArrayBuffer(key)), - ); - - if (indexToDelete >= 0) { - newKvStorage.splice(indexToDelete, 1); - } + if (keys.length === 0) { + return; } - // Update state with new kvStorage - entry.state = { - ...entry.state, - kvStorage: newKvStorage, - }; - - // Save state to disk - if (this.#persist) { - await this.#performWrite( - actorId, - entry.generation, - entry.state, - ); + const db = this.#getOrCreateActorKvDatabase(actorId); + db.exec("BEGIN"); + try { + for (const key of keys) { + db.run("DELETE FROM kv WHERE key = ?", [key]); + } + db.exec("COMMIT"); + } catch (error) { + try { + db.exec("ROLLBACK"); + } catch { + // Ignore rollback errors, original error is more actionable. + } + throw error; } }); } @@ -1284,23 +1415,21 @@ export class FileSystemGlobalState { } } - const results: [Uint8Array, Uint8Array][] = []; - for (const kvEntry of entry.state.kvStorage) { - const keyBytes = new Uint8Array(kvEntry.key); - // Check if key starts with prefix - if (keyBytes.length >= prefix.length) { - let hasPrefix = true; - for (let i = 0; i < prefix.length; i++) { - if (keyBytes[i] !== prefix[i]) { - hasPrefix = false; - break; - } - } - if (hasPrefix) { - results.push([keyBytes, new Uint8Array(kvEntry.value)]); - } - } - } - return results; + const db = this.#getOrCreateActorKvDatabase(actorId); + const upperBound = computePrefixUpperBound(prefix); + const rows = upperBound + ? db.all<{ key: Uint8Array | ArrayBuffer; value: Uint8Array | ArrayBuffer }>( + "SELECT key, value FROM kv WHERE key >= ? AND key < ? ORDER BY key ASC", + [prefix, upperBound], + ) + : db.all<{ key: Uint8Array | ArrayBuffer; value: Uint8Array | ArrayBuffer }>( + "SELECT key, value FROM kv WHERE key >= ? ORDER BY key ASC", + [prefix], + ); + + return rows.map((row) => [ + ensureUint8Array(row.key, "key"), + ensureUint8Array(row.value, "value"), + ]); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/mod.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/mod.ts index 56c1c6e74d..81ce39e784 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/mod.ts @@ -16,6 +16,8 @@ export { getStoragePath } from "./utils"; const CreateFileSystemDriverOptionsSchema = z.object({ /** Custom path for storage. */ path: z.string().optional(), + /** Deprecated: file-system driver KV is now always SQLite-backed. */ + useNativeSqlite: z.boolean().optional(), }); type CreateFileSystemDriverOptionsInput = z.input< @@ -28,9 +30,16 @@ export function createFileSystemOrMemoryDriver( ): DriverConfig { importNodeDependencies(); + if (options?.useNativeSqlite === false) { + throw new Error( + "File-system driver no longer supports non-SQLite KV storage. Remove useNativeSqlite: false.", + ); + } + const stateOptions: FileSystemDriverOptions = { persist, customPath: options?.path, + useNativeSqlite: true, }; const state = new FileSystemGlobalState(stateOptions); const driverConfig: DriverConfig = { diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts new file mode 100644 index 0000000000..323b7ebd2a --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/sqlite-runtime.ts @@ -0,0 +1,206 @@ +import { getRequireFn } from "@/utils/node"; + +type SqliteRuntimeKind = "bun" | "node" | "better-sqlite3"; + +interface SqliteStatement { + run(...params: unknown[]): unknown; + get(...params: unknown[]): unknown; + all(...params: unknown[]): unknown[]; +} + +interface SqliteRawDatabase { + exec(sql: string): unknown; + close(): unknown; + prepare?(sql: string): SqliteStatement; + query?(sql: string): SqliteStatement; +} + +export interface SqliteRuntimeDatabase { + exec(sql: string): void; + run(sql: string, params?: readonly unknown[]): void; + get>( + sql: string, + params?: readonly unknown[], + ): T | undefined; + all>( + sql: string, + params?: readonly unknown[], + ): T[]; + close(): void; +} + +export interface SqliteRuntime { + kind: SqliteRuntimeKind; + open(path: string): SqliteRuntimeDatabase; +} + +function normalizeParams(params: readonly unknown[] | undefined): unknown[] { + if (!params || params.length === 0) { + return []; + } + + return params.map((value) => { + if (value instanceof Uint8Array) { + return Buffer.from(value); + } + return value; + }); +} + +function createPreparedDatabaseAdapter( + rawDb: SqliteRawDatabase, + prepare: (sql: string) => SqliteStatement, +): SqliteRuntimeDatabase { + return { + exec: (sql) => { + rawDb.exec(sql); + }, + run: (sql, params) => { + const stmt = prepare(sql); + stmt.run(...normalizeParams(params)); + }, + get: >( + sql: string, + params?: readonly unknown[], + ) => { + const stmt = prepare(sql); + return stmt.get(...normalizeParams(params)) as T | undefined; + }, + all: >( + sql: string, + params?: readonly unknown[], + ) => { + const stmt = prepare(sql); + return stmt.all(...normalizeParams(params)) as T[]; + }, + close: () => { + rawDb.close(); + }, + }; +} + +export function loadSqliteRuntime(): SqliteRuntime { + const requireFn = getRequireFn(); + const loadErrors: string[] = []; + + try { + const bunSqlite = requireFn(/* webpackIgnore: true */ "bun:sqlite") as { + Database: new (path: string) => SqliteRawDatabase; + }; + if (typeof bunSqlite.Database === "function") { + return { + kind: "bun", + open: (path) => { + const rawDb = new bunSqlite.Database(path); + const query = (sql: string) => { + if (!rawDb.query) { + throw new Error("bun:sqlite database missing query method"); + } + return rawDb.query(sql); + }; + return createPreparedDatabaseAdapter(rawDb, query); + }, + }; + } + } catch (error) { + loadErrors.push(`bun:sqlite unavailable: ${String(error)}`); + } + + try { + const nodeSqlite = requireFn(/* webpackIgnore: true */ "node:sqlite") as { + DatabaseSync: new (path: string) => SqliteRawDatabase; + }; + if (typeof nodeSqlite.DatabaseSync === "function") { + return { + kind: "node", + open: (path) => { + const rawDb = new nodeSqlite.DatabaseSync(path); + const prepare = (sql: string) => { + if (!rawDb.prepare) { + throw new Error( + "node:sqlite DatabaseSync missing prepare method", + ); + } + return rawDb.prepare(sql); + }; + return createPreparedDatabaseAdapter(rawDb, prepare); + }, + }; + } + } catch (error) { + loadErrors.push(`node:sqlite unavailable: ${String(error)}`); + } + + try { + const betterSqlite3Module = requireFn( + /* webpackIgnore: true */ "better-sqlite3", + ) as { + default?: new (path: string) => SqliteRawDatabase; + } | (new (path: string) => SqliteRawDatabase); + const BetterSqlite3 = + typeof betterSqlite3Module === "function" + ? betterSqlite3Module + : betterSqlite3Module.default; + + if (typeof BetterSqlite3 === "function") { + return { + kind: "better-sqlite3", + open: (path) => { + const rawDb = new BetterSqlite3(path); + const prepare = (sql: string) => { + if (!rawDb.prepare) { + throw new Error( + "better-sqlite3 database missing prepare method", + ); + } + return rawDb.prepare(sql); + }; + return createPreparedDatabaseAdapter(rawDb, prepare); + }, + }; + } + } catch (error) { + loadErrors.push(`better-sqlite3 unavailable: ${String(error)}`); + throw new Error( + `No SQLite runtime available. Tried bun:sqlite, node:sqlite, and better-sqlite3. Install better-sqlite3 (e.g. "pnpm add better-sqlite3") if native runtimes are unavailable.\n${loadErrors.join("\n")}`, + ); + } + + throw new Error( + `No SQLite runtime available. Tried bun:sqlite, node:sqlite, and better-sqlite3.\n${loadErrors.join("\n")}`, + ); +} + +export function computePrefixUpperBound( + prefix: Uint8Array, +): Uint8Array | undefined { + if (prefix.length === 0) { + return undefined; + } + + const upperBound = new Uint8Array(prefix); + for (let i = upperBound.length - 1; i >= 0; i--) { + if (upperBound[i] !== 0xff) { + upperBound[i] += 1; + return upperBound.slice(0, i + 1); + } + } + return undefined; +} + +export function ensureUint8Array( + value: unknown, + fieldName: string, +): Uint8Array { + if (value instanceof Uint8Array) { + return value; + } + if (value instanceof ArrayBuffer) { + return new Uint8Array(value); + } + if (ArrayBuffer.isView(value)) { + const view = value as ArrayBufferView; + return new Uint8Array(view.buffer, view.byteOffset, view.byteLength); + } + throw new Error(`SQLite row field "${fieldName}" is not binary data`); +} diff --git a/rivetkit-typescript/packages/rivetkit/tests/file-system-kv-migration.test.ts b/rivetkit-typescript/packages/rivetkit/tests/file-system-kv-migration.test.ts new file mode 100644 index 0000000000..7783d019c5 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/tests/file-system-kv-migration.test.ts @@ -0,0 +1,128 @@ +import { + copyFileSync, + mkdirSync, + mkdtempSync, + readFileSync, + readdirSync, + rmSync, +} from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { describe, expect, it } from "vitest"; +import { importNodeDependencies } from "@/utils/node"; +import { FileSystemGlobalState } from "@/drivers/file-system/global-state"; +import { loadSqliteRuntime } from "@/drivers/file-system/sqlite-runtime"; + +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); +const fixtureStateDir = join(__dirname, "fixtures", "legacy-kv", "state"); + +function makeStorageFromFixtures(): string { + const storageRoot = mkdtempSync(join(tmpdir(), "rivetkit-kv-migration-")); + + const stateDir = join(storageRoot, "state"); + const dbDir = join(storageRoot, "databases"); + const alarmsDir = join(storageRoot, "alarms"); + mkdirSync(stateDir, { recursive: true }); + mkdirSync(dbDir, { recursive: true }); + mkdirSync(alarmsDir, { recursive: true }); + + for (const fileName of readdirSync(fixtureStateDir)) { + copyFileSync(join(fixtureStateDir, fileName), join(stateDir, fileName)); + } + + return storageRoot; +} + +describe("file-system driver legacy KV startup migration", () => { + it("migrates legacy actor kvStorage into sqlite databases on startup", async () => { + importNodeDependencies(); + const storageRoot = makeStorageFromFixtures(); + try { + const actorOneStatePath = join(storageRoot, "state", "legacy-actor-one"); + const actorOneStateBefore = readFileSync(actorOneStatePath); + + const state = new FileSystemGlobalState({ + persist: true, + customPath: storageRoot, + useNativeSqlite: true, + }); + + const alpha = await state.kvBatchGet("legacy-actor-one", [ + encoder.encode("alpha"), + ]); + expect(alpha[0]).not.toBeNull(); + expect(decoder.decode(alpha[0] ?? new Uint8Array())).toBe("one"); + + const prefixed = await state.kvListPrefix( + "legacy-actor-one", + encoder.encode("prefix:"), + ); + expect(prefixed).toHaveLength(2); + + const sqliteRuntime = loadSqliteRuntime(); + const actorTwoDb = sqliteRuntime.open( + join(storageRoot, "databases", "legacy-actor-two.db"), + ); + const actorTwoRow = actorTwoDb.get<{ value: Uint8Array | ArrayBuffer }>( + "SELECT value FROM kv WHERE key = ?", + [encoder.encode("beta")], + ); + expect(actorTwoRow).toBeDefined(); + expect( + decoder.decode( + (actorTwoRow?.value as Uint8Array | ArrayBuffer) ?? new Uint8Array(), + ), + ).toBe("two"); + actorTwoDb.close(); + + // Migration must not mutate legacy state files. + const actorOneStateAfter = readFileSync(actorOneStatePath); + expect(Buffer.compare(actorOneStateBefore, actorOneStateAfter)).toBe(0); + } finally { + rmSync(storageRoot, { recursive: true, force: true }); + } + }); + + it("does not overwrite sqlite data when database is already populated", async () => { + importNodeDependencies(); + const storageRoot = makeStorageFromFixtures(); + try { + const sqliteRuntime = loadSqliteRuntime(); + const actorDbPath = join(storageRoot, "databases", "legacy-actor-one.db"); + const db = sqliteRuntime.open(actorDbPath); + db.exec(` + CREATE TABLE IF NOT EXISTS kv ( + key BLOB PRIMARY KEY NOT NULL, + value BLOB NOT NULL + ) + `); + db.run("INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)", [ + encoder.encode("alpha"), + encoder.encode("existing"), + ]); + db.close(); + + const state = new FileSystemGlobalState({ + persist: true, + customPath: storageRoot, + useNativeSqlite: true, + }); + void state; + const checkDb = sqliteRuntime.open(actorDbPath); + const alpha = checkDb.get<{ value: Uint8Array | ArrayBuffer }>( + "SELECT value FROM kv WHERE key = ?", + [encoder.encode("alpha")], + ); + expect(alpha).toBeDefined(); + expect( + decoder.decode( + (alpha?.value as Uint8Array | ArrayBuffer) ?? new Uint8Array(), + ), + ).toBe("existing"); + checkDb.close(); + } finally { + rmSync(storageRoot, { recursive: true, force: true }); + } + }); +}); diff --git a/rivetkit-typescript/packages/rivetkit/tests/fixtures/legacy-kv/state/legacy-actor-one b/rivetkit-typescript/packages/rivetkit/tests/fixtures/legacy-kv/state/legacy-actor-one new file mode 100644 index 0000000000..caac5e87ea Binary files /dev/null and b/rivetkit-typescript/packages/rivetkit/tests/fixtures/legacy-kv/state/legacy-actor-one differ diff --git a/rivetkit-typescript/packages/rivetkit/tests/fixtures/legacy-kv/state/legacy-actor-two b/rivetkit-typescript/packages/rivetkit/tests/fixtures/legacy-kv/state/legacy-actor-two new file mode 100644 index 0000000000..799a59984a Binary files /dev/null and b/rivetkit-typescript/packages/rivetkit/tests/fixtures/legacy-kv/state/legacy-actor-two differ