Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions frontend/src/components/actors/actors-actor-details.tsx

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,43 @@ import { migrations } from "./db/migrations";
import { schema } from "./db/schema";

export const dbActorDrizzle = actor({
state: {
disconnectInsertEnabled: false,
disconnectInsertDelayMs: 0,
},
db: db({
schema,
migrations,
}),
onDisconnect: async (c) => {
if (!c.state.disconnectInsertEnabled) {
return;
}

if (c.state.disconnectInsertDelayMs > 0) {
await new Promise<void>((resolve) =>
setTimeout(resolve, c.state.disconnectInsertDelayMs),
);
}

await c.db.execute(
`INSERT INTO test_data (value, payload, created_at) VALUES ('__disconnect__', '', ${Date.now()})`,
);
},
actions: {
configureDisconnectInsert: (c, enabled: boolean, delayMs: number) => {
c.state.disconnectInsertEnabled = enabled;
c.state.disconnectInsertDelayMs = Math.max(
0,
Math.floor(delayMs),
);
},
getDisconnectInsertCount: async (c) => {
const results = await c.db.execute<{ count: number }>(
`SELECT COUNT(*) as count FROM test_data WHERE value = '__disconnect__'`,
);
return results[0]?.count ?? 0;
},
reset: async (c) => {
await c.db.execute(`DELETE FROM test_data`);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { actor } from "rivetkit";
import { db } from "rivetkit/db";

export const dbActorRaw = actor({
state: {
disconnectInsertEnabled: false,
disconnectInsertDelayMs: 0,
},
db: db({
onMigrate: async (db) => {
await db.execute(`
Expand All @@ -14,7 +18,35 @@ export const dbActorRaw = actor({
`);
},
}),
onDisconnect: async (c) => {
if (!c.state.disconnectInsertEnabled) {
return;
}

if (c.state.disconnectInsertDelayMs > 0) {
await new Promise<void>((resolve) =>
setTimeout(resolve, c.state.disconnectInsertDelayMs),
);
}

await c.db.execute(
`INSERT INTO test_data (value, payload, created_at) VALUES ('__disconnect__', '', ${Date.now()})`,
);
},
actions: {
configureDisconnectInsert: (c, enabled: boolean, delayMs: number) => {
c.state.disconnectInsertEnabled = enabled;
c.state.disconnectInsertDelayMs = Math.max(
0,
Math.floor(delayMs),
);
},
getDisconnectInsertCount: async (c) => {
const results = await c.db.execute<{ count: number }>(
`SELECT COUNT(*) as count FROM test_data WHERE value = '__disconnect__'`,
);
return results[0]?.count ?? 0;
},
reset: async (c) => {
await c.db.execute(`DELETE FROM test_data`);
},
Expand Down
9 changes: 6 additions & 3 deletions rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,17 @@ export class Conn<
});
}

this.#actor.connectionManager.connDisconnected(this);
try {
await this.#actor.connectionManager.connDisconnected(this);
} finally {
this[CONN_DRIVER_SYMBOL] = undefined;
}
} else {
this.#actor.rLog.warn({
msg: "missing connection driver state for disconnect",
conn: this.id,
});
this[CONN_DRIVER_SYMBOL] = undefined;
}

this[CONN_DRIVER_SYMBOL] = undefined;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class ConnectionManager<
> {
#actor: ActorInstance<S, CP, CS, V, I, DB, E, Q>;
#connections = new Map<ConnId, Conn<S, CP, CS, V, I, DB, E, Q>>();
#pendingDisconnectCount = 0;

/** Connections that have had their state changed and need to be persisted. */
#connsWithPersistChanged = new Set<ConnId>();
Expand All @@ -70,6 +71,10 @@ export class ConnectionManager<
return this.#connsWithPersistChanged;
}

get pendingDisconnectCount(): number {
return this.#pendingDisconnectCount;
}

clearConnWithPersistChanged() {
this.#connsWithPersistChanged.clear();
}
Expand Down Expand Up @@ -307,9 +312,8 @@ export class ConnectionManager<
this.#actor.eventManager.removeSubscription(eventName, conn, true);
}

this.#actor.resetSleepTimer();

this.#actor.inspector.emitter.emit("connectionsUpdated");
this.#pendingDisconnectCount += 1;

const attributes = {
"rivet.conn.id": conn.id,
Expand All @@ -321,7 +325,6 @@ export class ConnectionManager<
attributes,
);

// Trigger disconnect
try {
if (this.#actor.config.onDisconnect) {
const result = this.#actor.traces.withSpan(span, () =>
Expand All @@ -336,23 +339,9 @@ export class ConnectionManager<
span,
);
if (result instanceof Promise) {
result
.then(() => {
this.#actor.endTraceSpan(span, { code: "OK" });
})
.catch((error) => {
this.#actor.endTraceSpan(span, {
code: "ERROR",
message: stringifyError(error),
});
this.#actor.rLog.error({
msg: "error in `onDisconnect`",
error: stringifyError(error),
});
});
} else {
this.#actor.endTraceSpan(span, { code: "OK" });
await result;
}
this.#actor.endTraceSpan(span, { code: "OK" });
} else {
this.#actor.emitTraceEvent(
"connection.disconnect",
Expand All @@ -370,29 +359,35 @@ export class ConnectionManager<
msg: "error in `onDisconnect`",
error: stringifyError(error),
});
}

// Remove from connsWithPersistChanged after onDisconnect to handle any
// state changes made during the disconnect callback. Disconnected connections
// are removed from KV storage via kvBatchDelete below, not through the
// normal persist save flow, so they should not trigger persist saves.
this.#connsWithPersistChanged.delete(conn.id);

// Remove from KV storage
if (conn.isHibernatable) {
const key = makeConnKey(conn.id);
try {
await this.#actor.driver.kvBatchDelete(this.#actor.id, [key]);
this.#actor.rLog.debug({
msg: "removed connection from KV",
connId: conn.id,
});
} catch (err) {
this.#actor.rLog.error({
msg: "kvBatchDelete failed for conn",
err: stringifyError(err),
});
} finally {
// Remove from connsWithPersistChanged after onDisconnect to handle any
// state changes made during the disconnect callback. Disconnected connections
// are removed from KV storage via kvBatchDelete below, not through the
// normal persist save flow, so they should not trigger persist saves.
this.#connsWithPersistChanged.delete(conn.id);

// Remove from KV storage.
if (conn.isHibernatable) {
const key = makeConnKey(conn.id);
try {
await this.#actor.driver.kvBatchDelete(this.#actor.id, [key]);
this.#actor.rLog.debug({
msg: "removed connection from KV",
connId: conn.id,
});
} catch (err) {
this.#actor.rLog.error({
msg: "kvBatchDelete failed for conn",
err: stringifyError(err),
});
}
}

this.#pendingDisconnectCount = Math.max(
0,
this.#pendingDisconnectCount - 1,
);
this.#actor.resetSleepTimer();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum CanSleep {
NotReady,
NotStarted,
ActiveConns,
ActiveDisconnectCallbacks,
ActiveHonoHttpRequests,
ActiveKeepAwake,
ActiveRun,
Expand Down Expand Up @@ -1533,6 +1534,10 @@ export class ActorInstance<
// }
}

if (this.connectionManager.pendingDisconnectCount > 0) {
return CanSleep.ActiveDisconnectCallbacks;
}

return CanSleep.Yes;
}

Expand Down
30 changes: 26 additions & 4 deletions rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,21 @@ class DbMutex {
/**
* Create a sqlite-proxy async callback from a wa-sqlite Database
*/
function createProxyCallback(waDb: Database, mutex: DbMutex) {
function createProxyCallback(
waDb: Database,
mutex: DbMutex,
isClosed: () => boolean,
) {
return async (
sql: string,
params: any[],
method: "run" | "all" | "values" | "get",
): Promise<{ rows: any }> => {
return mutex.run(async () => {
if (isClosed()) {
throw new Error("database is closed");
}

if (method === "run") {
await waDb.run(sql, params);
return { rows: [] };
Expand Down Expand Up @@ -194,9 +202,15 @@ export function db<
const kvStore = createActorKvStore(ctx.kv);
const waDb = await ctx.sqliteVfs.open(ctx.actorId, kvStore);
waDbInstance = waDb;
let closed = false;
const ensureOpen = () => {
if (closed) {
throw new Error("database is closed");
}
};

// Create the async proxy callback
const callback = createProxyCallback(waDb, mutex);
const callback = createProxyCallback(waDb, mutex, () => closed);

// Create the drizzle instance using sqlite-proxy
const client = proxyDrizzle<TSchema>(callback, config);
Expand All @@ -209,6 +223,8 @@ export function db<
...args: unknown[]
): Promise<TRow[]> => {
return mutex.run(async () => {
ensureOpen();

if (args.length > 0) {
const result = await waDb.query(query, args);
return result.rows.map((row: unknown[]) => {
Expand Down Expand Up @@ -244,8 +260,14 @@ export function db<
});
},
close: async () => {
await waDb.close();
waDbInstance = null;
await mutex.run(async () => {
if (closed) {
return;
}
closed = true;
await waDb.close();
waDbInstance = null;
});
},
} satisfies RawAccess);
},
Expand Down
16 changes: 15 additions & 1 deletion rivetkit-typescript/packages/rivetkit/src/db/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ export function db({

const kvStore = createActorKvStore(ctx.kv);
const db = await ctx.sqliteVfs.open(ctx.actorId, kvStore);
let closed = false;
const ensureOpen = () => {
if (closed) {
throw new Error("database is closed");
}
};

return {
execute: async <
Expand All @@ -103,6 +109,8 @@ export function db({
...args: unknown[]
): Promise<TRow[]> => {
return mutex.run(async () => {
ensureOpen();

if (args.length > 0) {
// Use parameterized query when args are provided
const { rows, columns } = await db.query(query, args);
Expand Down Expand Up @@ -132,7 +140,13 @@ export function db({
});
},
close: async () => {
await db.close();
await mutex.run(async () => {
if (closed) {
return;
}
closed = true;
await db.close();
});
},
} satisfies RawAccess;
},
Expand Down
Loading
Loading