diff --git a/frontend/src/components/actors/actor-database.tsx b/frontend/src/components/actors/actor-database.tsx index 821b16aeda..3930336b34 100644 --- a/frontend/src/components/actors/actor-database.tsx +++ b/frontend/src/components/actors/actor-database.tsx @@ -1,9 +1,205 @@ +import { Button, Flex, ScrollArea, WithTooltip } from "@/components"; +import { + faChevronLeft, + faChevronRight, + faRefresh, + faTable, + faTableCells, + Icon, +} from "@rivet-gg/icons"; +import { useQuery } from "@tanstack/react-query"; +import { useState } from "react"; +import { ShimmerLine } from "../shimmer-line"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "../ui/select"; +import { useActorInspector } from "./actor-inspector-context"; +import { DatabaseTable } from "./database/database-table"; import type { ActorId } from "./queries"; +const PAGE_SIZE = 100; + interface ActorDatabaseProps { actorId: ActorId; } -export function ActorDatabase(_props: ActorDatabaseProps) { - return null; +export function ActorDatabase({ actorId }: ActorDatabaseProps) { + const actorInspector = useActorInspector(); + const { data, refetch } = useQuery( + actorInspector.actorDatabaseQueryOptions(actorId), + ); + const [table, setTable] = useState( + () => data?.tables?.[0]?.table.name, + ); + const [page, setPage] = useState(0); + + const selectedTable = table || data?.tables?.[0]?.table.name; + + const { + data: rows, + refetch: refetchData, + isLoading, + } = useQuery({ + ...actorInspector.actorDatabaseRowsQueryOptions( + actorId, + selectedTable ?? "", + page, + PAGE_SIZE, + ), + enabled: !!selectedTable, + }); + + const currentTable = data?.tables?.find( + (t) => t.table.name === selectedTable, + ); + + const totalRows = currentTable?.records ?? 0; + const totalPages = Math.max(1, Math.ceil(totalRows / PAGE_SIZE)); + const hasNextPage = page < totalPages - 1; + const hasPrevPage = page > 0; + + return ( + <> +
+
+ { + setTable(t); + setPage(0); + }} + value={selectedTable} + /> +
+
+ + + {currentTable ? ( + <> + {currentTable.table.schema}. + {currentTable.table.name} + + ({currentTable.columns.length} columns,{" "} + {currentTable.records} rows) + + + ) : ( + + No table selected + + )} + +
+
+
+ setPage((p) => p - 1)} + > + + + } + /> + + {page + 1} / {totalPages} + + setPage((p) => p + 1)} + > + + + } + /> +
+ { + refetch(); + refetchData(); + }} + > + + + } + /> +
+
+
+ {isLoading ? : null} + + {currentTable ? ( + + ) : null} + +
+ + ); +} + +function TableSelect({ + actorId, + value, + onSelect, +}: { + actorId: ActorId; + onSelect: (table: string) => void; + value: string | undefined; +}) { + const actorInspector = useActorInspector(); + const { data: tables } = useQuery( + actorInspector.actorDatabaseTablesQueryOptions(actorId), + ); + + return ( + + ); } diff --git a/frontend/src/components/actors/actor-inspector-context.tsx b/frontend/src/components/actors/actor-inspector-context.tsx index 012dc84eca..6c2001ac51 100644 --- a/frontend/src/components/actors/actor-inspector-context.tsx +++ b/frontend/src/components/actors/actor-inspector-context.tsx @@ -59,6 +59,33 @@ type QueueStatusSummary = { }>; }; +export type DatabaseColumn = { + cid: number; + name: string; + type: string; + notnull: boolean; + dflt_value: string | null; + pk: boolean | null; +}; + +export type DatabaseForeignKey = { + id: number; + table: string; + from: string; + to: string; +}; + +export type DatabaseTableInfo = { + table: { schema: string; name: string; type: string }; + columns: DatabaseColumn[]; + foreignKeys: DatabaseForeignKey[]; + records: number; +}; + +export type DatabaseSchema = { + tables: DatabaseTableInfo[]; +}; + interface ActorInspectorApi { ping: () => Promise; executeAction: (name: string, args: unknown[]) => Promise; @@ -72,6 +99,12 @@ interface ActorInspectorApi { history: WorkflowHistory | null; isEnabled: boolean; }>; + getDatabaseSchema: () => Promise; + getDatabaseTableRows: ( + table: string, + limit: number, + offset: number, + ) => Promise; getMetadata: () => Promise<{ version: string }>; } @@ -84,6 +117,7 @@ type FeatureSupport = { const MIN_RIVETKIT_VERSION_TRACES = "2.0.40"; const MIN_RIVETKIT_VERSION_QUEUE = "2.0.40"; +const MIN_RIVETKIT_VERSION_DATABASE = "2.0.42"; const INSPECTOR_ERROR_EVENTS_DROPPED = "inspector.events_dropped"; function parseSemver(version?: string) { @@ -152,10 +186,7 @@ function getInspectorProtocolVersion(version: string | undefined) { if (!parsed) { return 2; } - if (isVersionAtLeast(version, MIN_RIVETKIT_VERSION_QUEUE)) { - return 4; - } - if (isVersionAtLeast(version, MIN_RIVETKIT_VERSION_TRACES)) { + if (isVersionAtLeast(version, MIN_RIVETKIT_VERSION_DATABASE)) { return 3; } if (parsed.major >= 2) { @@ -214,37 +245,31 @@ export const createDefaultActorInspectorContext = ({ }, actorDatabaseQueryOptions(actorId: ActorId) { - // TODO: implement return queryOptions({ - staleTime: Infinity, + staleTime: 0, queryKey: actorInspectorQueriesKeys.actorDatabase(actorId), queryFn: () => { - return { enabled: false, db: [] } as unknown as { - enabled: boolean; - db: { - table: { name: string; type: string }; - records: number; - }[]; - }; + return api.getDatabaseSchema(); }, }); }, actorDatabaseEnabledQueryOptions(actorId: ActorId) { - // TODO: implement return queryOptions({ staleTime: Infinity, - ...this.actorDatabaseQueryOptions(actorId), - select: (data) => data.enabled, + queryKey: [ + ...actorInspectorQueriesKeys.actorDatabase(actorId), + "enabled", + ], + queryFn: () => new Promise(() => {}), }); }, actorDatabaseTablesQueryOptions(actorId: ActorId) { - // TODO: implement return queryOptions({ ...this.actorDatabaseQueryOptions(actorId), select: (data) => - data.db?.map((table) => ({ + data.tables?.map((table) => ({ name: table.table.name, type: table.table.type, records: table.records, @@ -253,16 +278,27 @@ export const createDefaultActorInspectorContext = ({ }); }, - actorDatabaseRowsQueryOptions(actorId: ActorId, table: string) { - // TODO: implement + actorDatabaseRowsQueryOptions( + actorId: ActorId, + table: string, + page: number, + pageSize = 100, + ) { return queryOptions({ - staleTime: Infinity, + staleTime: 0, + gcTime: 5000, queryKey: [ ...actorInspectorQueriesKeys.actorDatabase(actorId), table, + page, + pageSize, ], queryFn: () => { - return [] as unknown as Record[]; + return api.getDatabaseTableRows( + table, + pageSize, + page * pageSize, + ); }, }); }, @@ -730,6 +766,55 @@ export const ActorInspectorProvider = ({ return promise; }, + getDatabaseSchema: async () => { + const { id, promise } = + actionsManager.current.createResolver({ + name: "getDatabaseSchema", + timeoutMs: 10_000, + }); + + sendMessage( + serverMessage( + { + body: { + tag: "DatabaseSchemaRequest", + val: { id: BigInt(id) }, + }, + }, + inspectorProtocolVersion, + ), + ); + + return promise; + }, + + getDatabaseTableRows: async (table, limit, offset) => { + const { id, promise } = + actionsManager.current.createResolver({ + name: "getDatabaseTableRows", + timeoutMs: 10_000, + }); + + sendMessage( + serverMessage( + { + body: { + tag: "DatabaseTableRowsRequest", + val: { + id: BigInt(id), + table, + limit: BigInt(limit), + offset: BigInt(offset), + }, + }, + }, + inspectorProtocolVersion, + ), + ); + + return promise; + }, + getMetadata() { return getActorMetadataProxy.current(); }, @@ -811,6 +896,14 @@ const createMessageHandler = body.val.isWorkflowEnabled, ); + queryClient.setQueryData( + [ + ...actorInspectorQueriesKeys.actorDatabase(actorId), + "enabled", + ], + body.val.isDatabaseEnabled, + ); + if (body.val.workflowHistory) { queryClient.setQueryData( actorInspectorQueriesKeys.actorWorkflowHistory(actorId), @@ -904,6 +997,20 @@ const createMessageHandler = isEnabled: body.val.isWorkflowEnabled, }); }) + .with({ tag: "DatabaseSchemaResponse" }, (body) => { + const { rid } = body.val; + actionsManager.current.resolve( + Number(rid), + cbor.decode(new Uint8Array(body.val.schema)), + ); + }) + .with({ tag: "DatabaseTableRowsResponse" }, (body) => { + const { rid } = body.val; + actionsManager.current.resolve( + Number(rid), + cbor.decode(new Uint8Array(body.val.result)), + ); + }) .with({ tag: "Error" }, (body) => { if (body.val.message === INSPECTOR_ERROR_EVENTS_DROPPED) { return; diff --git a/frontend/src/components/actors/actor-traces.tsx b/frontend/src/components/actors/actor-traces.tsx index 3faa1492a3..98d5043333 100644 --- a/frontend/src/components/actors/actor-traces.tsx +++ b/frontend/src/components/actors/actor-traces.tsx @@ -21,7 +21,6 @@ import { SelectTrigger, SelectValue, } from "../ui/select"; -import { ToggleGroup, ToggleGroupItem } from "../ui/toggle-group"; import { useActorInspector } from "./actor-inspector-context"; import { ActorObjectInspector } from "./console/actor-inspector"; import type { ActorId } from "./queries"; @@ -212,7 +211,7 @@ export function ActorTraces({ actorId }: { actorId: ActorId }) { : "Select a time range"}
- @@ -229,7 +228,7 @@ export function ActorTraces({ actorId }: { actorId: ActorId }) { > - + */}
{viewType === "list" ? ( diff --git a/frontend/src/components/actors/actors-actor-details.tsx b/frontend/src/components/actors/actors-actor-details.tsx index 4cd720e59d..f170c0878f 100644 --- a/frontend/src/components/actors/actors-actor-details.tsx +++ b/frontend/src/components/actors/actors-actor-details.tsx @@ -157,13 +157,13 @@ export function ActorTabs({ Workflow - {/* Database - */} + {/* { + return createColumns(dbCols, references, { enableRowSelection }); + }, [dbCols, references, enableRowSelection]); + + const [rowSelection, setRowSelection] = useState({}); + const [sorting, setSorting] = useState([]); + + const table = useTable({ + columns, + data: data as Record[], + enableRowSelection, + enableSorting, + enableColumnResizing, + getCoreRowModel: getCoreRowModel(), + getExpandedRowModel: getExpandedRowModel(), + getSortedRowModel: getSortedRowModel(), + defaultColumn: {}, + columnResizeMode: "onChange", + onSortingChange: setSorting, + onRowSelectionChange: setRowSelection, + paginateExpandedRows: false, + state: { + sorting, + rowSelection, + }, + }); + + const calculateColumnSizes = useCallback(() => { + const headers = table.getFlatHeaders(); + const colSizes: { [key: string]: number } = {}; + for (let i = 0; i < headers.length; i++) { + const header = headers[i]; + colSizes[`--header-${header.id}-size`] = header.getSize(); + colSizes[`--col-${header.column.id}-size`] = + header.column.getSize(); + } + return colSizes; + }, [table]); + + const columnSizeVars = useMemo(() => { + return calculateColumnSizes(); + }, [calculateColumnSizes]); + + return ( + + + {table.getHeaderGroups().map((headerGroup) => ( + + {headerGroup.headers.map((header) => { + return ( + + {header.isPlaceholder ? null : header.column.getCanSort() ? ( + + ) : ( +
+ {flexRender( + header.column.columnDef.header, + header.getContext(), + )} +
+ )} + {header.column.getCanResize() ? ( + // biome-ignore lint/a11y/noStaticElementInteractions: resize handle uses mouse drag +
+
+
+ ) : null} + + ); + })} + + ))} + + + {table.getRowModel().rows.map((row) => ( + + + {row.getVisibleCells().map((cell) => ( + +
+
+ {flexRender( + cell.column.columnDef.cell, + cell.getContext(), + )} +
+
+
+ ))} +
+
+ ))} +
+
+ ); +} + +const ch = createColumnHelper>(); + +function createColumns( + columns: DatabaseColumn[], + references?: DatabaseForeignKey[], + { enableRowSelection }: { enableRowSelection?: boolean } = {}, +) { + return [ + ...[ + enableRowSelection + ? ch.display({ + id: "select", + enableResizing: false, + header: ({ table }) => ( + { + if (value === "indeterminate") { + table.toggleAllRowsSelected(true); + return; + } + table.toggleAllRowsSelected(!!value); + }} + aria-label="Select all" + /> + ), + cell: ({ row }) => ( + { + if (value === "indeterminate") { + row.toggleSelected(true); + return; + } + row.toggleSelected(); + }} + /> + ), + }) + : null, + ].filter((v): v is NonNullable => v !== null), + ...columns.map((col) => + ch.accessor(col.name, { + header: () => ( + + {col.name}{" "} + + {col.type} + + + + ), + cell: (info) => { + if (col.type === "blob") { + return ( + + BINARY + + ); + } + const value = info.getValue(); + if (value === null) { + return ( + + NULL + + ); + } + + return <>{String(info.getValue())}; + }, + }), + ), + ]; +} + +function ForeignKey({ + references, + column, +}: { + references?: DatabaseForeignKey[]; + column: DatabaseColumn; +}) { + const ref = references?.find((r) => r.from === column.name); + if (!ref) return null; + return ( + + + {ref.table}.{ref.to} + + ); } diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index dccaf71807..d806e15032 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -193,7 +193,7 @@ ], "scripts": { "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/serve-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts src/workflow/mod.ts src/db/mod.ts src/db/drizzle/mod.ts", - "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/persist/v1.bare -o dist/schemas/persist/v1.ts && ./scripts/compile-bare.ts compile schemas/transport/v1.bare -o dist/schemas/transport/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts", + "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/persist/v1.bare -o dist/schemas/persist/v1.ts && ./scripts/compile-bare.ts compile schemas/transport/v1.bare -o dist/schemas/transport/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v3.bare -o dist/schemas/actor-inspector/v3.ts", "check-types": "tsc --noEmit", "lint": "biome check .", "lint:fix": "biome check --write .", diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v3.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v3.bare new file mode 100644 index 0000000000..3d8aba46de --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v3.bare @@ -0,0 +1,195 @@ +# MARK: Message To Server + +type PatchStateRequest struct { + state: data +} + +type ActionRequest struct { + id: uint + name: str + args: data +} + +type StateRequest struct { + id: uint +} + +type ConnectionsRequest struct { + id: uint +} + +type RpcsListRequest struct { + id: uint +} + +type TraceQueryRequest struct { + id: uint + startMs: uint + endMs: uint + limit: uint +} + +type QueueRequest struct { + id: uint + limit: uint +} + +type WorkflowHistoryRequest struct { + id: uint +} + +type DatabaseSchemaRequest struct { + id: uint +} + +# Fetches rows from a specific table with a row limit and offset. +type DatabaseTableRowsRequest struct { + id: uint + table: str + limit: uint + offset: uint +} + +type ToServerBody union { + PatchStateRequest | + StateRequest | + ConnectionsRequest | + ActionRequest | + RpcsListRequest | + TraceQueryRequest | + QueueRequest | + WorkflowHistoryRequest | + DatabaseSchemaRequest | + DatabaseTableRowsRequest +} + +type ToServer struct { + body: ToServerBody +} + +# MARK: Message To Client + +type State data + +type Connection struct { + id: str + details: data +} + +# Workflow history is encoded using schemas/transport. +type WorkflowHistory data + +type Init struct { + connections: list + state: optional + isStateEnabled: bool + rpcs: list + isDatabaseEnabled: bool + queueSize: uint + workflowHistory: optional + isWorkflowEnabled: bool +} + +type ConnectionsResponse struct { + rid: uint + connections: list +} + +type StateResponse struct { + rid: uint + state: optional + isStateEnabled: bool +} + +type ActionResponse struct { + rid: uint + output: data +} + +type TraceQueryResponse struct { + rid: uint + payload: data +} + +type QueueMessageSummary struct { + id: uint + name: str + createdAtMs: uint +} + +type QueueStatus struct { + size: uint + maxSize: uint + messages: list + truncated: bool +} + +type QueueResponse struct { + rid: uint + status: QueueStatus +} + +type WorkflowHistoryResponse struct { + rid: uint + history: optional + isWorkflowEnabled: bool +} + +# Database schema is CBOR-encoded with table metadata. +type DatabaseSchemaResponse struct { + rid: uint + schema: data +} + +# Database table rows result is CBOR-encoded rows. +type DatabaseTableRowsResponse struct { + rid: uint + result: data +} + +type StateUpdated struct { + state: State +} + +type QueueUpdated struct { + queueSize: uint +} + +type WorkflowHistoryUpdated struct { + history: WorkflowHistory +} + +type RpcsListResponse struct { + rid: uint + rpcs: list +} + +type ConnectionsUpdated struct { + connections: list +} + +type Error struct { + message: str +} + +type ToClientBody union { + StateResponse | + ConnectionsResponse | + ActionResponse | + ConnectionsUpdated | + QueueUpdated | + StateUpdated | + WorkflowHistoryUpdated | + RpcsListResponse | + TraceQueryResponse | + QueueResponse | + WorkflowHistoryResponse | + Error | + Init | + DatabaseSchemaResponse | + DatabaseTableRowsResponse +} + +type ToClient struct { + body: ToClientBody +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 78a6fa97d4..e39f6a5cd8 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -468,7 +468,7 @@ export class ActorInstance< // Abort listeners try { this.#abortController.abort(); - } catch {} + } catch { } // Wait for run handler to complete await this.#waitForRunHandler(this.#config.options.runStopTimeout); @@ -585,14 +585,14 @@ export class ActorInstance< async processMessage( message: { body: - | { - tag: "ActionRequest"; - val: { id: bigint; name: string; args: unknown }; - } - | { - tag: "SubscriptionRequest"; - val: { eventName: string; subscribe: boolean }; - }; + | { + tag: "ActionRequest"; + val: { id: bigint; name: string; args: unknown }; + } + | { + tag: "SubscriptionRequest"; + val: { eventName: string; subscribe: boolean }; + }; }, conn: Conn, ) { @@ -633,6 +633,8 @@ export class ActorInstance< throw new errors.ActionNotFound(actionName); } + this.#activeKeepAwakeCount++; + this.resetSleepTimer(); const actionSpan = this.startTraceSpan(`actor.action.${actionName}`, { "rivet.action.name": actionName, }); @@ -653,9 +655,15 @@ export class ActorInstance< ); let output: unknown; - if (outputOrPromise instanceof Promise) { + const maybeThenable = outputOrPromise as { + then?: ( + onfulfilled?: unknown, + onrejected?: unknown, + ) => unknown; + }; + if (maybeThenable && typeof maybeThenable.then === "function") { output = await deadline( - outputOrPromise, + Promise.resolve(outputOrPromise), this.#config.options.actionTimeout, ); } else { @@ -715,6 +723,15 @@ export class ActorInstance< status: { code: "OK" }, }); } + this.#activeKeepAwakeCount--; + if (this.#activeKeepAwakeCount < 0) { + this.#activeKeepAwakeCount = 0; + this.#rLog.warn({ + msg: "active keep awake count went below 0, this is a RivetKit bug", + ...EXTRA_ERROR_LOG, + }); + } + this.resetSleepTimer(); this.stateManager.savePersistThrottled(); } } @@ -1306,31 +1323,52 @@ export class ActorInstance< async #setupDatabase() { if ("db" in this.#config && this.#config.db) { - const client = await this.#config.db.createClient({ - actorId: this.#actorId, - overrideRawDatabaseClient: this.driver.overrideRawDatabaseClient - ? () => this.driver.overrideRawDatabaseClient!(this.#actorId) - : undefined, - overrideDrizzleDatabaseClient: - this.driver.overrideDrizzleDatabaseClient + try { + const client = await this.#config.db.createClient({ + actorId: this.#actorId, + overrideRawDatabaseClient: this.driver.overrideRawDatabaseClient ? () => - this.driver.overrideDrizzleDatabaseClient!( - this.#actorId, - ) + this.driver.overrideRawDatabaseClient!(this.#actorId) : undefined, - kv: { - batchPut: (entries) => - this.driver.kvBatchPut(this.#actorId, entries), - batchGet: (keys) => this.driver.kvBatchGet(this.#actorId, keys), - batchDelete: (keys) => - this.driver.kvBatchDelete(this.#actorId, keys), - }, - sqliteVfs: this.driver.sqliteVfs, - }); - this.#rLog.info({ msg: "database migration starting" }); - await this.#config.db.onMigrate?.(client); - this.#rLog.info({ msg: "database migration complete" }); - this.#db = client; + overrideDrizzleDatabaseClient: + this.driver.overrideDrizzleDatabaseClient + ? () => + this.driver.overrideDrizzleDatabaseClient!( + this.#actorId, + ) + : undefined, + kv: { + batchPut: (entries) => + this.driver.kvBatchPut(this.#actorId, entries), + batchGet: (keys) => this.driver.kvBatchGet(this.#actorId, keys), + batchDelete: (keys) => + this.driver.kvBatchDelete(this.#actorId, keys), + }, + sqliteVfs: this.driver.sqliteVfs, + }); + this.#rLog.info({ msg: "database migration starting" }); + await this.#config.db.onMigrate?.(client); + this.#rLog.info({ msg: "database migration complete" }); + this.#db = client; + } catch (error) { + if (error instanceof Error) { + this.#rLog.error({ + msg: "database setup failed", + error: stringifyError(error), + }); + throw error; + } + const wrappedError = new Error( + `Database setup failed: ${String(error)}`, + ); + this.#rLog.error({ + msg: "database setup failed with non-Error object", + error: String(error), + errorType: typeof error, + }); + throw wrappedError; + } + } } } diff --git a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts index a7875071f1..3829beb9c9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts @@ -1,14 +1,10 @@ +import type { Database } from "@rivetkit/sqlite-vfs"; import { - type BetterSQLite3Database, - drizzle as sqliteDrizzle, -} from "drizzle-orm/better-sqlite3"; -import { - type SqliteRemoteDatabase, drizzle as proxyDrizzle, + type SqliteRemoteDatabase, } from "drizzle-orm/sqlite-proxy"; -import type { KvVfsOptions } from "../sqlite-vfs"; import type { DatabaseProvider, RawAccess } from "../config"; -import type { Database } from "@rivetkit/sqlite-vfs"; +import type { KvVfsOptions } from "../sqlite-vfs"; export * from "./sqlite-core"; @@ -59,30 +55,65 @@ function createActorKvStore(kv: { }; } +/** + * Mutex to serialize async operations on a wa-sqlite database handle. + * wa-sqlite is not safe for concurrent operations on the same handle. + */ +class DbMutex { + #locked = false; + #waiting: (() => void)[] = []; + + async acquire(): Promise { + while (this.#locked) { + await new Promise((resolve) => this.#waiting.push(resolve)); + } + this.#locked = true; + } + + release(): void { + this.#locked = false; + const next = this.#waiting.shift(); + if (next) { + next(); + } + } + + async run(fn: () => Promise): Promise { + await this.acquire(); + try { + return await fn(); + } finally { + this.release(); + } + } +} + /** * Create a sqlite-proxy async callback from a wa-sqlite Database */ -function createProxyCallback(waDb: Database) { +function createProxyCallback(waDb: Database, mutex: DbMutex) { return async ( sql: string, params: any[], method: "run" | "all" | "values" | "get", ): Promise<{ rows: any }> => { - if (method === "run") { - await waDb.run(sql, params); - return { rows: [] }; - } + return mutex.run(async () => { + if (method === "run") { + await waDb.run(sql, params); + return { rows: [] }; + } - // For all/get/values, use parameterized query - const result = await waDb.query(sql, params); + // For all/get/values, use parameterized query + const result = await waDb.query(sql, params); - // drizzle's mapResultRow accesses rows by column index (positional arrays) - // so we return raw arrays for all methods - if (method === "get") { - return { rows: result.rows[0] }; - } + // drizzle's mapResultRow accesses rows by column index (positional arrays) + // so we return raw arrays for all methods + if (method === "get") { + return { rows: result.rows[0] }; + } - return { rows: result.rows }; + return { rows: result.rows }; + }); }; } @@ -92,24 +123,29 @@ function createProxyCallback(waDb: Database) { */ async function runInlineMigrations( waDb: Database, + mutex: DbMutex, migrations: any, ): Promise { // Create migrations table - await waDb.exec(` + await mutex.run(() => + waDb.exec(` CREATE TABLE IF NOT EXISTS __drizzle_migrations ( id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT NOT NULL, created_at INTEGER ) - `); + `), + ); // Get the last applied migration let lastCreatedAt = 0; - await waDb.exec( - "SELECT id, hash, created_at FROM __drizzle_migrations ORDER BY created_at DESC LIMIT 1", - (row) => { - lastCreatedAt = Number(row[2]) || 0; - }, + await mutex.run(() => + waDb.exec( + "SELECT id, hash, created_at FROM __drizzle_migrations ORDER BY created_at DESC LIMIT 1", + (row) => { + lastCreatedAt = Number(row[2]) || 0; + }, + ), ); // Apply pending migrations from journal entries @@ -126,11 +162,13 @@ async function runInlineMigrations( if (!sql) continue; // Execute migration SQL - await waDb.exec(sql); + await mutex.run(() => waDb.exec(sql)); // Record migration - await waDb.exec( - `INSERT INTO __drizzle_migrations (hash, created_at) VALUES ('${entry.tag}', ${entry.when})`, + await mutex.run(() => + waDb.exec( + `INSERT INTO __drizzle_migrations (hash, created_at) VALUES ('${entry.tag}', ${entry.when})`, + ), ); } } @@ -142,6 +180,7 @@ export function db< ): DatabaseProvider & RawAccess> { // Store the wa-sqlite Database instance alongside the drizzle client let waDbInstance: Database | null = null; + const mutex = new DbMutex(); return { createClient: async (ctx) => { @@ -157,7 +196,7 @@ export function db< waDbInstance = waDb; // Create the async proxy callback - const callback = createProxyCallback(waDb); + const callback = createProxyCallback(waDb, mutex); // Create the drizzle instance using sqlite-proxy const client = proxyDrizzle(callback, config); @@ -169,30 +208,40 @@ export function db< query: string, ...args: unknown[] ): Promise => { - if (args.length > 0) { - const { rows, columns } = await waDb.query(query, args); - return rows.map((row: unknown[]) => { - const rowObj: Record = {}; - for (let i = 0; i < row.length; i++) { - rowObj[columns[i]] = row[i]; - } - return rowObj; - }) as TRow[]; - } - - const results: Record[] = []; - let columnNames: string[] | null = null; - await waDb.exec(query, (row: unknown[], columns: string[]) => { - if (!columnNames) { - columnNames = columns; - } - const rowObj: Record = {}; - for (let i = 0; i < row.length; i++) { - rowObj[columnNames[i]] = row[i]; + return mutex.run(async () => { + if (args.length > 0) { + const result = await waDb.query(query, args); + return result.rows.map((row: unknown[]) => { + const obj: Record = {}; + for ( + let i = 0; + i < result.columns.length; + i++ + ) { + obj[result.columns[i]] = row[i]; + } + return obj; + }) as TRow[]; } - results.push(rowObj); + // Use exec for non-parameterized queries since + // wa-sqlite's query() can crash on some statements. + const results: Record[] = []; + let columnNames: string[] | null = null; + await waDb.exec( + query, + (row: unknown[], columns: string[]) => { + if (!columnNames) { + columnNames = columns; + } + const obj: Record = {}; + for (let i = 0; i < row.length; i++) { + obj[columnNames[i]] = row[i]; + } + results.push(obj); + }, + ); + return results as TRow[]; }); - return results as TRow[]; }, close: async () => { await waDb.close(); @@ -202,7 +251,11 @@ export function db< }, onMigrate: async (_client) => { if (config?.migrations && waDbInstance) { - await runInlineMigrations(waDbInstance, config.migrations); + await runInlineMigrations( + waDbInstance, + mutex, + config.migrations, + ); } }, onDestroy: async (client) => { diff --git a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts index 1cd28de725..f5a894802e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts @@ -5,6 +5,31 @@ interface DatabaseFactoryConfig { onMigrate?: (db: RawAccess) => Promise | void; } +/** + * Mutex to serialize async operations on a wa-sqlite database handle. + * wa-sqlite is not safe for concurrent operations on the same handle. + */ +class DbMutex { + #locked = false; + #waiting: (() => void)[] = []; + + async run(fn: () => Promise): Promise { + while (this.#locked) { + await new Promise((resolve) => this.#waiting.push(resolve)); + } + this.#locked = true; + try { + return await fn(); + } finally { + this.#locked = false; + const next = this.#waiting.shift(); + if (next) { + next(); + } + } + } +} + /** * Create a KV store wrapper that uses the actor driver's KV operations */ @@ -36,6 +61,8 @@ function createActorKvStore(kv: { export function db({ onMigrate, }: DatabaseFactoryConfig = {}): DatabaseProvider { + const mutex = new DbMutex(); + return { createClient: async (ctx) => { // Check if override is provided @@ -71,37 +98,39 @@ export function db({ return { execute: async < TRow extends Record = Record, - >( - query: string, - ...args: unknown[] - ): Promise => { - if (args.length > 0) { - // Use parameterized query when args are provided - const { rows, columns } = await db.query(query, args); - return rows.map((row: unknown[]) => { - const rowObj: Record = {}; - for (let i = 0; i < row.length; i++) { - rowObj[columns[i]] = row[i]; + >( + query: string, + ...args: unknown[] + ): Promise => { + return mutex.run(async () => { + if (args.length > 0) { + // Use parameterized query when args are provided + const { rows, columns } = await db.query(query, args); + return rows.map((row: unknown[]) => { + const rowObj: Record = {}; + for (let i = 0; i < row.length; i++) { + rowObj[columns[i]] = row[i]; + } + return rowObj; + }) as TRow[]; } - return rowObj; - }) as TRow[]; - } - // Use exec for non-parameterized queries - const results: Record[] = []; - let columnNames: string[] | null = null; - await db.exec(query, (row: unknown[], columns: string[]) => { - if (!columnNames) { - columnNames = columns; - } - const rowObj: Record = {}; - for (let i = 0; i < row.length; i++) { - rowObj[columnNames[i]] = row[i]; - } - results.push(rowObj); - }); - return results as TRow[]; - }, + // Use exec for non-parameterized queries + const results: Record[] = []; + let columnNames: string[] | null = null; + await db.exec(query, (row: unknown[], columns: string[]) => { + if (!columnNames) { + columnNames = columns; + } + const rowObj: Record = {}; + for (let i = 0; i < row.length; i++) { + rowObj[columnNames[i]] = row[i]; + } + results.push(rowObj); + }); + return results as TRow[]; + }); + }, close: async () => { await db.close(); }, diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts index cd5b05bccd..701229c093 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts @@ -103,6 +103,73 @@ export class ActorInspector { } } + async getDatabaseSchema(): Promise { + if (!this.isDatabaseEnabled()) { + throw new actorErrors.DatabaseNotEnabled(); + } + + const db = this.actor.db; + + // Get table list from sqlite_master, excluding internal tables. + const tables = await db.execute( + "SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view') AND name NOT LIKE 'sqlite_%' AND name NOT LIKE '__drizzle_%'", + ) as { name: string; type: string }[]; + + // Serialize all queries to avoid concurrent wa-sqlite access + // which can cause "file is not a database" errors. + const tableInfos = []; + for (const table of tables) { + const quoted = `"${escapeDoubleQuotes(table.name)}"`; + const sample = await db.execute( + `SELECT * FROM ${quoted} LIMIT 1`, + ) as Record[]; + const countResult = await db.execute( + `SELECT COUNT(*) as count FROM ${quoted}`, + ) as { count: number }[]; + + const columnNames = sample?.[0] + ? Object.keys(sample[0]) + : []; + + tableInfos.push({ + table: { schema: "main", name: table.name, type: table.type }, + columns: columnNames.map((name, cid) => ({ + cid, + name, + type: "", + notnull: 0, + dflt_value: null, + pk: 0, + })), + foreignKeys: [], + records: countResult?.[0]?.count ?? 0, + }); + } + + return bufferToArrayBuffer(cbor.encode({ tables: tableInfos })); + } + + async getDatabaseTableRows( + table: string, + limit: number, + offset: number, + ): Promise { + if (!this.isDatabaseEnabled()) { + throw new actorErrors.DatabaseNotEnabled(); + } + + const db = this.actor.db; + const safeLimit = Math.max(0, Math.min(Math.floor(limit), 500)); + const safeOffset = Math.max(0, Math.floor(offset)); + const quoted = `"${escapeDoubleQuotes(table)}"`; + const result = await db.execute( + `SELECT * FROM ${quoted} LIMIT ? OFFSET ?`, + safeLimit, + safeOffset, + ); + return bufferToArrayBuffer(cbor.encode(result)); + } + isStateEnabled() { return this.actor.stateEnabled; } @@ -278,3 +345,8 @@ export class ActorInspector { })); } } + +function escapeDoubleQuotes(value: string): string { + return value.replace(/"/g, '""'); +} + diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts index ca10f33933..e62ea714e7 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts @@ -176,6 +176,58 @@ export async function handleWebSocketInspectorConnect({ }, }, }); + } else if (message.body.tag === "DatabaseSchemaRequest") { + const { id } = message.body.val; + try { + const schema = await inspector.getDatabaseSchema(); + sendMessage(ws, { + body: { + tag: "DatabaseSchemaResponse", + val: { rid: id, schema }, + }, + }); + } catch (error) { + inspectorLogger().warn( + { error }, + "Failed to get database schema", + ); + sendMessage(ws, { + body: { + tag: "Error", + val: { + message: `Failed to get database schema: ${error instanceof Error ? error.message : String(error)}`, + }, + }, + }); + } + } else if (message.body.tag === "DatabaseTableRowsRequest") { + const { id, table, limit, offset } = message.body.val; + try { + const result = await inspector.getDatabaseTableRows( + table, + Number(limit), + Number(offset), + ); + sendMessage(ws, { + body: { + tag: "DatabaseTableRowsResponse", + val: { rid: id, result }, + }, + }); + } catch (error) { + inspectorLogger().warn( + { error }, + "Failed to get database table rows", + ); + sendMessage(ws, { + body: { + tag: "Error", + val: { + message: `Failed to get database rows: ${error instanceof Error ? error.message : String(error)}`, + }, + }, + }); + } } else { assertUnreachable(message.body); } diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts index 87a57d3b2c..8341c0c4a1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts @@ -1 +1 @@ -export * from "../../../dist/schemas/actor-inspector/v2"; +export * from "../../../dist/schemas/actor-inspector/v3"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts index 73379b94f0..75e46d63c5 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts @@ -2,13 +2,15 @@ import { createVersionedDataHandler } from "vbare"; import * as v1 from "../../../dist/schemas/actor-inspector/v1"; import * as v2 from "../../../dist/schemas/actor-inspector/v2"; +import * as v3 from "../../../dist/schemas/actor-inspector/v3"; -export const CURRENT_VERSION = 2; +export const CURRENT_VERSION = 3; const EVENTS_DROPPED_ERROR = "inspector.events_dropped"; const WORKFLOW_HISTORY_DROPPED_ERROR = "inspector.workflow_history_dropped"; const QUEUE_DROPPED_ERROR = "inspector.queue_dropped"; const TRACE_DROPPED_ERROR = "inspector.trace_dropped"; +const DATABASE_DROPPED_ERROR = "inspector.database_dropped"; // Converter from v1 to v2: Drop events in Init and add new fields const v1ToClientToV2 = (v1Data: v1.ToClient): v2.ToClient => { @@ -110,6 +112,29 @@ const v2ToClientToV1 = (v2Data: v2.ToClient): v1.ToClient => { return v2Data as unknown as v1.ToClient; }; +// Converter from v2 to v3: v2 messages are a subset of v3 +const v2ToClientToV3 = (v2Data: v2.ToClient): v3.ToClient => { + return v2Data as unknown as v3.ToClient; +}; + +// Converter from v3 to v2: Drop database responses +const v3ToClientToV2 = (v3Data: v3.ToClient): v2.ToClient => { + if ( + v3Data.body.tag === "DatabaseSchemaResponse" || + v3Data.body.tag === "DatabaseTableRowsResponse" + ) { + return { + body: { + tag: "Error", + val: { + message: DATABASE_DROPPED_ERROR, + }, + }, + }; + } + return v3Data as unknown as v2.ToClient; +}; + // Converter from v1 to v2: Drop events requests const v1ToServerToV2 = (v1Data: v1.ToServer): v2.ToServer => { if ( @@ -133,13 +158,31 @@ const v2ToServerToV1 = (v2Data: v2.ToServer): v1.ToServer => { return v2Data as unknown as v1.ToServer; }; -export const TO_SERVER_VERSIONED = createVersionedDataHandler({ +// Converter from v2 to v3: v2 messages are a subset of v3 +const v2ToServerToV3 = (v2Data: v2.ToServer): v3.ToServer => { + return v2Data as unknown as v3.ToServer; +}; + +// Converter from v3 to v2: Drop database requests +const v3ToServerToV2 = (v3Data: v3.ToServer): v2.ToServer => { + if ( + v3Data.body.tag === "DatabaseSchemaRequest" || + v3Data.body.tag === "DatabaseTableRowsRequest" + ) { + throw new Error("Cannot convert v3-only database requests to v2"); + } + return v3Data as unknown as v2.ToServer; +}; + +export const TO_SERVER_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: return v1.encodeToServer(data as v1.ToServer); case 2: return v2.encodeToServer(data as v2.ToServer); + case 3: + return v3.encodeToServer(data as v3.ToServer); default: throw new Error(`Unknown version ${version}`); } @@ -150,21 +193,25 @@ export const TO_SERVER_VERSIONED = createVersionedDataHandler({ return v1.decodeToServer(bytes); case 2: return v2.decodeToServer(bytes); + case 3: + return v3.decodeToServer(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToServerToV2], - serializeConverters: () => [v2ToServerToV1], + deserializeConverters: () => [v1ToServerToV2, v2ToServerToV3], + serializeConverters: () => [v3ToServerToV2, v2ToServerToV1], }); -export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ +export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: return v1.encodeToClient(data as v1.ToClient); case 2: return v2.encodeToClient(data as v2.ToClient); + case 3: + return v3.encodeToClient(data as v3.ToClient); default: throw new Error(`Unknown version ${version}`); } @@ -175,10 +222,12 @@ export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ return v1.decodeToClient(bytes); case 2: return v2.decodeToClient(bytes); + case 3: + return v3.decodeToClient(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToClientToV2], - serializeConverters: () => [v2ToClientToV1], + deserializeConverters: () => [v1ToClientToV2, v2ToClientToV3], + serializeConverters: () => [v3ToClientToV2, v2ToClientToV1], });