Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions engine/packages/pegboard/src/actor_kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ mod entry;
mod utils;

const VERSION: &str = env!("CARGO_PKG_VERSION");

// Keep the KV validation limits below in sync with
// rivetkit-typescript/packages/rivetkit/src/drivers/file-system/kv-limits.ts.
const MAX_KEY_SIZE: usize = 2 * 1024;
const MAX_VALUE_SIZE: usize = 128 * 1024;
const MAX_KEYS: usize = 128;
Expand Down
3 changes: 2 additions & 1 deletion engine/packages/pegboard/src/actor_kv/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ pub fn validate_entries(
for value in values {
ensure!(
value.len() <= MAX_VALUE_SIZE,
"value is too large (max 128 KiB)"
"value is too large (max {} KiB)",
MAX_VALUE_SIZE / 1024
);
}

Expand Down
119 changes: 29 additions & 90 deletions rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type SqliteRemoteDatabase,
} from "drizzle-orm/sqlite-proxy";
import type { DatabaseProvider, RawAccess } from "../config";
import type { KvVfsOptions } from "../sqlite-vfs";
import { AsyncMutex, createActorKvStore, toSqliteBindings } from "../shared";

export * from "./sqlite-core";

Expand All @@ -27,73 +27,12 @@ interface DatabaseFactoryConfig<
migrations?: any;
}

/**
* Create a KV store wrapper that uses the actor driver's KV operations
*/
function createActorKvStore(kv: {
batchPut: (entries: [Uint8Array, Uint8Array][]) => Promise<void>;
batchGet: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>;
batchDelete: (keys: Uint8Array[]) => Promise<void>;
}): KvVfsOptions {
return {
get: async (key: Uint8Array) => {
const results = await kv.batchGet([key]);
return results[0];
},
getBatch: async (keys: Uint8Array[]) => {
return await kv.batchGet(keys);
},
put: async (key: Uint8Array, value: Uint8Array) => {
await kv.batchPut([[key, value]]);
},
putBatch: async (entries: [Uint8Array, Uint8Array][]) => {
await kv.batchPut(entries);
},
deleteBatch: async (keys: Uint8Array[]) => {
await kv.batchDelete(keys);
},
};
}

/**
* Mutex to serialize async operations on a @rivetkit/sqlite database handle.
* @rivetkit/sqlite is not safe for concurrent operations on the same handle.
*/
class DbMutex {
#locked = false;
#waiting: (() => void)[] = [];

async acquire(): Promise<void> {
while (this.#locked) {
await new Promise<void>((resolve) => this.#waiting.push(resolve));
}
this.#locked = true;
}

release(): void {
this.#locked = false;
const next = this.#waiting.shift();
if (next) {
next();
}
}

async run<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}

/**
* Create a sqlite-proxy async callback from a @rivetkit/sqlite Database
*/
function createProxyCallback(
waDb: Database,
mutex: DbMutex,
mutex: AsyncMutex,
isClosed: () => boolean,
) {
return async (
Expand All @@ -107,12 +46,12 @@ function createProxyCallback(
}

if (method === "run") {
await waDb.run(sql, params);
await waDb.run(sql, toSqliteBindings(params));
return { rows: [] };
}

// For all/get/values, use parameterized query
const result = await waDb.query(sql, params);
const result = await waDb.query(sql, toSqliteBindings(params));

// drizzle's mapResultRow accesses rows by column index (positional arrays)
// so we return raw arrays for all methods
Expand All @@ -131,7 +70,7 @@ function createProxyCallback(
*/
async function runInlineMigrations(
waDb: Database,
mutex: DbMutex,
mutex: AsyncMutex,
migrations: any,
): Promise<void> {
// Create migrations table
Expand Down Expand Up @@ -174,8 +113,9 @@ async function runInlineMigrations(

// Record migration
await mutex.run(() =>
waDb.exec(
`INSERT INTO __drizzle_migrations (hash, created_at) VALUES ('${entry.tag}', ${entry.when})`,
waDb.run(
"INSERT INTO __drizzle_migrations (hash, created_at) VALUES (?, ?)",
[entry.tag, entry.when],
),
);
}
Expand All @@ -188,7 +128,7 @@ export function db<
): DatabaseProvider<SqliteRemoteDatabase<TSchema> & RawAccess> {
// Store the @rivetkit/sqlite Database instance alongside the drizzle client
let waDbInstance: Database | null = null;
const mutex = new DbMutex();
const mutex = new AsyncMutex();

return {
createClient: async (ctx) => {
Expand Down Expand Up @@ -226,18 +166,17 @@ export function db<
ensureOpen();

if (args.length > 0) {
const result = await waDb.query(query, args);
const result = await waDb.query(
query,
toSqliteBindings(args),
);
return result.rows.map((row: unknown[]) => {
const obj: Record<string, unknown> = {};
for (
let i = 0;
i < result.columns.length;
i++
) {
obj[result.columns[i]] = row[i];
}
return obj;
}) as TRow[];
for (let i = 0; i < result.columns.length; i++) {
obj[result.columns[i]] = row[i];
}
return obj;
}) as TRow[];
}
// Use exec for non-parameterized queries since
// @rivetkit/sqlite's query() can crash on some statements.
Expand All @@ -246,17 +185,17 @@ export function db<
await waDb.exec(
query,
(row: unknown[], columns: string[]) => {
if (!columnNames) {
columnNames = columns;
}
const obj: Record<string, unknown> = {};
for (let i = 0; i < row.length; i++) {
obj[columnNames[i]] = row[i];
}
results.push(obj);
},
);
return results as TRow[];
if (!columnNames) {
columnNames = columns;
}
const obj: Record<string, unknown> = {};
for (let i = 0; i < row.length; i++) {
obj[columnNames[i]] = row[i];
}
results.push(obj);
},
);
return results as TRow[];
});
},
close: async () => {
Expand Down
52 changes: 7 additions & 45 deletions rivetkit-typescript/packages/rivetkit/src/db/mod.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,12 @@
import type { KvVfsOptions } from "./sqlite-vfs";
import type { DatabaseProvider, RawAccess } from "./config";
import { AsyncMutex, createActorKvStore, toSqliteBindings } from "./shared";

export type { RawAccess } from "./config";

interface DatabaseFactoryConfig {
onMigrate?: (db: RawAccess) => Promise<void> | void;
}

/**
* Create a KV store wrapper that uses the actor driver's KV operations
*/
function createActorKvStore(kv: {
batchPut: (entries: [Uint8Array, Uint8Array][]) => Promise<void>;
batchGet: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>;
batchDelete: (keys: Uint8Array[]) => Promise<void>;
}): KvVfsOptions {
return {
get: async (key: Uint8Array) => {
const results = await kv.batchGet([key]);
return results[0];
},
getBatch: async (keys: Uint8Array[]) => {
return await kv.batchGet(keys);
},
put: async (key: Uint8Array, value: Uint8Array) => {
await kv.batchPut([[key, value]]);
},
putBatch: async (entries: [Uint8Array, Uint8Array][]) => {
await kv.batchPut(entries);
},
deleteBatch: async (keys: Uint8Array[]) => {
await kv.batchDelete(keys);
},
};
}

export function db({
onMigrate,
}: DatabaseFactoryConfig = {}): DatabaseProvider<RawAccess> {
Expand Down Expand Up @@ -70,23 +42,12 @@ export function db({
const kvStore = createActorKvStore(ctx.kv);
const db = await ctx.sqliteVfs.open(ctx.actorId, kvStore);
let closed = false;
const mutex = new AsyncMutex();
const ensureOpen = () => {
if (closed) {
throw new Error("database is closed");
}
};
let op: Promise<void> = Promise.resolve();

const serialize = async <T>(fn: () => Promise<T>): Promise<T> => {
// Ensure @rivetkit/sqlite calls are not concurrent. Actors can process multiple
// actions concurrently, and @rivetkit/sqlite is not re-entrant.
const next = op.then(fn, fn);
op = next.then(
() => undefined,
() => undefined,
);
return await next;
};

return {
execute: async <
Expand All @@ -95,7 +56,7 @@ export function db({
query: string,
...args: unknown[]
): Promise<TRow[]> => {
return await serialize(async () => {
return await mutex.run(async () => {
ensureOpen();

// `db.exec` does not support binding `?` placeholders.
Expand All @@ -104,14 +65,15 @@ export function db({
// Keep using `db.exec` for non-parameterized SQL because it
// supports multi-statement migrations.
if (args.length > 0) {
const bindings = toSqliteBindings(args);
const token = query.trimStart().slice(0, 16).toUpperCase();
const returnsRows =
token.startsWith("SELECT") ||
token.startsWith("PRAGMA") ||
token.startsWith("WITH");

if (returnsRows) {
const { rows, columns } = await db.query(query, args);
const { rows, columns } = await db.query(query, bindings);
return rows.map((row: unknown[]) => {
const rowObj: Record<string, unknown> = {};
for (let i = 0; i < columns.length; i++) {
Expand All @@ -121,7 +83,7 @@ export function db({
}) as TRow[];
}

await db.run(query, args);
await db.run(query, bindings);
return [] as TRow[];
}

Expand All @@ -141,7 +103,7 @@ export function db({
});
},
close: async () => {
await serialize(async () => {
await mutex.run(async () => {
if (closed) {
return;
}
Expand Down
Loading
Loading