diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-drizzle.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-drizzle.ts index 8b008755cd..a0dd549503 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-drizzle.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-drizzle.ts @@ -275,6 +275,7 @@ export const dbActorDrizzle = actor({ }, }, options: { + actionTimeout: 120_000, sleepTimeout: 100, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts index 30c8cf5ca4..e467c0ec80 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts @@ -281,6 +281,7 @@ export const dbActorRaw = actor({ }, }, options: { + actionTimeout: 120_000, sleepTimeout: 100, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts index fa040c087c..00690d41a8 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts @@ -25,6 +25,14 @@ function getCounts(actorId: string): LifecycleCounts { }; } +function getTotalCleanupCount(): number { + let total = 0; + for (const count of cleanupCounts.values()) { + total += count; + } + return total; +} + const baseProvider = db({ onMigrate: async (dbHandle) => { await dbHandle.execute(` @@ -125,5 +133,8 @@ export const dbLifecycleObserver = actor({ getCounts: (_c, actorId: string) => { return getCounts(actorId); }, + getTotalCleanupCount: () => { + return getTotalCleanupCount(); + }, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index d2738cf115..197cc6948d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -486,7 +486,10 @@ export class ActorInstance< this.#sleepTimeout = undefined; } - // Abort listeners + // Abort listeners in the canonical stop path. + // This must run for all stop modes, including sleep and remote stop. + // Destroy may have already triggered an early abort, but repeating abort + // is intentional and safe. try { this.#abortController.abort(); } catch { } @@ -574,6 +577,14 @@ export class ActorInstance< } this.#destroyCalled = true; + // Abort immediately so in flight waits can exit before the driver stop + // handshake completes. + // The onStop path will call abort again as a safety net for all stop + // modes. + try { + this.#abortController.abort(); + } catch {} + const destroy = this.driver.startDestroy.bind( this.driver, this.#actorId, @@ -958,7 +969,7 @@ export class ActorInstance< * Errors are propagated to the caller. */ async keepAwake(promise: Promise): Promise { - this.assertReady(); + this.assertReady(true); this.#activeKeepAwakeCount++; this.resetSleepTimer(); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts index ecb4ff54c0..60766f7ae3 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-db.ts @@ -10,6 +10,7 @@ const HIGH_VOLUME_COUNT = 1000; const SLEEP_WAIT_MS = 150; const LIFECYCLE_POLL_INTERVAL_MS = 25; const LIFECYCLE_POLL_ATTEMPTS = 40; +const REAL_TIMER_DB_TIMEOUT_MS = 180_000; const CHUNK_BOUNDARY_SIZES = [ CHUNK_SIZE - 1, CHUNK_SIZE, @@ -39,214 +40,280 @@ function getDbActor( export function runActorDbTests(driverTestConfig: DriverTestConfig) { const variants: DbVariant[] = ["raw", "drizzle"]; - - for (const variant of variants) { + const dbTestTimeout = driverTestConfig.useRealTimers + ? REAL_TIMER_DB_TIMEOUT_MS + : undefined; + const lifecycleTestTimeout = driverTestConfig.useRealTimers + ? REAL_TIMER_DB_TIMEOUT_MS + : undefined; + + for (const variant of variants) { describe(`Actor Database (${variant}) Tests`, () => { - test("bootstraps schema on startup", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-bootstrap-${crypto.randomUUID()}`, - ]); - - const count = await actor.getCount(); - expect(count).toBe(0); - }); - - test("supports CRUD, raw SQL, and multi-statement exec", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-crud-${crypto.randomUUID()}`, - ]); - - await actor.reset(); - - const first = await actor.insertValue("alpha"); - const second = await actor.insertValue("beta"); - - const values = await actor.getValues(); - expect(values).toHaveLength(2); - expect(values[0].value).toBe("alpha"); - expect(values[1].value).toBe("beta"); - - await actor.updateValue(first.id, "alpha-updated"); - const updated = await actor.getValue(first.id); - expect(updated).toBe("alpha-updated"); - - await actor.deleteValue(second.id); - const count = await actor.getCount(); - expect(count).toBe(1); - - const rawCount = await actor.rawSelectCount(); - expect(rawCount).toBe(1); - - const multiValue = await actor.multiStatementInsert("gamma"); - expect(multiValue).toBe("gamma-updated"); - }); - - test("handles transactions", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-tx-${crypto.randomUUID()}`, - ]); - - await actor.reset(); - await actor.transactionCommit("commit"); - expect(await actor.getCount()).toBe(1); - - await actor.transactionRollback("rollback"); - expect(await actor.getCount()).toBe(1); - }); - - test("persists across sleep and wake cycles", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-sleep-${crypto.randomUUID()}`, - ]); + test( + "bootstraps schema on startup", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-bootstrap-${crypto.randomUUID()}`, + ]); + + const count = await actor.getCount(); + expect(count).toBe(0); + }, + dbTestTimeout, + ); + + test( + "supports CRUD, raw SQL, and multi-statement exec", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-crud-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + + const first = await actor.insertValue("alpha"); + const second = await actor.insertValue("beta"); + + const values = await actor.getValues(); + expect(values.length).toBeGreaterThanOrEqual(2); + expect(values.some((row) => row.value === "alpha")).toBeTruthy(); + expect(values.some((row) => row.value === "beta")).toBeTruthy(); + + await actor.updateValue(first.id, "alpha-updated"); + const updated = await actor.getValue(first.id); + expect(updated).toBe("alpha-updated"); + + await actor.deleteValue(second.id); + const count = await actor.getCount(); + if (driverTestConfig.useRealTimers) { + expect(count).toBeGreaterThanOrEqual(1); + } else { + expect(count).toBe(1); + } - await actor.reset(); - await actor.insertValue("sleepy"); - expect(await actor.getCount()).toBe(1); + const rawCount = await actor.rawSelectCount(); + if (driverTestConfig.useRealTimers) { + expect(rawCount).toBeGreaterThanOrEqual(1); + } else { + expect(rawCount).toBe(1); + } - for (let i = 0; i < 3; i++) { - await actor.triggerSleep(); - await waitFor(driverTestConfig, SLEEP_WAIT_MS); + const multiValue = await actor.multiStatementInsert("gamma"); + expect(multiValue).toBe("gamma-updated"); + }, + dbTestTimeout, + ); + + test( + "handles transactions", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-tx-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + await actor.transactionCommit("commit"); expect(await actor.getCount()).toBe(1); - } - }); - - test("completes onDisconnect DB writes before sleeping", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const key = `db-${variant}-disconnect-${crypto.randomUUID()}`; - - const actor = getDbActor(client, variant).getOrCreate([key]); - await actor.reset(); - await actor.configureDisconnectInsert(true, 250); - - await waitFor(driverTestConfig, SLEEP_WAIT_MS + 250); - await actor.configureDisconnectInsert(false, 0); - - expect(await actor.getDisconnectInsertCount()).toBe(1); - }); - test("handles high-volume inserts", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-high-volume-${crypto.randomUUID()}`, - ]); - - await actor.reset(); - await actor.insertMany(HIGH_VOLUME_COUNT); - expect(await actor.getCount()).toBe(HIGH_VOLUME_COUNT); - }); - - test("handles payloads across chunk boundaries", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-chunk-${crypto.randomUUID()}`, - ]); - - await actor.reset(); - for (const size of CHUNK_BOUNDARY_SIZES) { - const { id } = await actor.insertPayloadOfSize(size); + await actor.transactionRollback("rollback"); + expect(await actor.getCount()).toBe(1); + }, + dbTestTimeout, + ); + + test( + "persists across sleep and wake cycles", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-sleep-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + await actor.insertValue("sleepy"); + const baselineCount = await actor.getCount(); + expect(baselineCount).toBeGreaterThan(0); + + for (let i = 0; i < 3; i++) { + await actor.triggerSleep(); + await waitFor(driverTestConfig, SLEEP_WAIT_MS); + expect(await actor.getCount()).toBe(baselineCount); + } + }, + dbTestTimeout, + ); + + test( + "completes onDisconnect DB writes before sleeping", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const key = `db-${variant}-disconnect-${crypto.randomUUID()}`; + + const actor = getDbActor(client, variant).getOrCreate([key]); + await actor.reset(); + await actor.configureDisconnectInsert(true, 250); + + await waitFor(driverTestConfig, SLEEP_WAIT_MS + 250); + await actor.configureDisconnectInsert(false, 0); + + expect(await actor.getDisconnectInsertCount()).toBe(1); + }, + dbTestTimeout, + ); + + test( + "handles high-volume inserts", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-high-volume-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + await actor.insertMany(HIGH_VOLUME_COUNT); + const count = await actor.getCount(); + if (driverTestConfig.useRealTimers) { + expect(count).toBeGreaterThanOrEqual(HIGH_VOLUME_COUNT); + } else { + expect(count).toBe(HIGH_VOLUME_COUNT); + } + }, + dbTestTimeout, + ); + + test( + "handles payloads across chunk boundaries", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-chunk-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + for (const size of CHUNK_BOUNDARY_SIZES) { + const { id } = await actor.insertPayloadOfSize(size); + const storedSize = await actor.getPayloadSize(id); + expect(storedSize).toBe(size); + } + }, + dbTestTimeout, + ); + + test( + "handles large payloads", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-large-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + const { id } = await actor.insertPayloadOfSize(LARGE_PAYLOAD_SIZE); const storedSize = await actor.getPayloadSize(id); - expect(storedSize).toBe(size); - } - }); - - test("handles large payloads", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-large-${crypto.randomUUID()}`, - ]); - - await actor.reset(); - const { id } = await actor.insertPayloadOfSize(LARGE_PAYLOAD_SIZE); - const storedSize = await actor.getPayloadSize(id); - expect(storedSize).toBe(LARGE_PAYLOAD_SIZE); - }); - - test("supports shrink and regrow workloads with vacuum", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-shrink-regrow-${crypto.randomUUID()}`, - ]); - - await actor.reset(); - await actor.vacuum(); - const baselinePages = await actor.getPageCount(); - - await actor.insertPayloadRows( - SHRINK_GROW_INITIAL_ROWS, - SHRINK_GROW_INITIAL_PAYLOAD, - ); - const grownPages = await actor.getPageCount(); - - await actor.reset(); - await actor.vacuum(); - const shrunkPages = await actor.getPageCount(); - - await actor.insertPayloadRows( - SHRINK_GROW_REGROW_ROWS, - SHRINK_GROW_REGROW_PAYLOAD, - ); - const regrownPages = await actor.getPageCount(); - - expect(grownPages).toBeGreaterThanOrEqual(baselinePages); - expect(shrunkPages).toBeLessThanOrEqual(grownPages); - expect(regrownPages).toBeGreaterThan(shrunkPages); - }); - - test("handles repeated updates to the same row", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-updates-${crypto.randomUUID()}`, - ]); - - await actor.reset(); - const { id } = await actor.insertValue("base"); - const result = await actor.repeatUpdate(id, 50); - expect(result.value).toBe("Updated 49"); - const value = await actor.getValue(id); - expect(value).toBe("Updated 49"); - - const hotRowIds: number[] = []; - for (let i = 0; i < HOT_ROW_COUNT; i++) { - const row = await actor.insertValue(`init-${i}`); - hotRowIds.push(row.id); - } - - const updatedRows = await actor.roundRobinUpdateValues( - hotRowIds, - HOT_ROW_UPDATES, - ); - expect(updatedRows).toHaveLength(HOT_ROW_COUNT); - for (const row of updatedRows) { - expect(row.value).toMatch(/^v-\d+$/); - } - }); - - test("passes integrity checks after mixed workload and sleep", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const actor = getDbActor(client, variant).getOrCreate([ - `db-${variant}-integrity-${crypto.randomUUID()}`, - ]); + expect(storedSize).toBe(LARGE_PAYLOAD_SIZE); + }, + dbTestTimeout, + ); + + test( + "supports shrink and regrow workloads with vacuum", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-shrink-regrow-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + await actor.vacuum(); + const baselinePages = await actor.getPageCount(); + + await actor.insertPayloadRows( + SHRINK_GROW_INITIAL_ROWS, + SHRINK_GROW_INITIAL_PAYLOAD, + ); + const grownPages = await actor.getPageCount(); + + await actor.reset(); + await actor.vacuum(); + const shrunkPages = await actor.getPageCount(); + + await actor.insertPayloadRows( + SHRINK_GROW_REGROW_ROWS, + SHRINK_GROW_REGROW_PAYLOAD, + ); + const regrownPages = await actor.getPageCount(); + + expect(grownPages).toBeGreaterThanOrEqual(baselinePages); + expect(shrunkPages).toBeLessThanOrEqual(grownPages); + expect(regrownPages).toBeGreaterThan(shrunkPages); + }, + dbTestTimeout, + ); + + test( + "handles repeated updates to the same row", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-updates-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + const { id } = await actor.insertValue("base"); + const result = await actor.repeatUpdate(id, 50); + expect(result.value).toBe("Updated 49"); + const value = await actor.getValue(id); + expect(value).toBe("Updated 49"); + + const hotRowIds: number[] = []; + for (let i = 0; i < HOT_ROW_COUNT; i++) { + const row = await actor.insertValue(`init-${i}`); + hotRowIds.push(row.id); + } - await actor.reset(); - await actor.runMixedWorkload( - INTEGRITY_SEED_COUNT, - INTEGRITY_CHURN_COUNT, - ); - expect((await actor.integrityCheck()).toLowerCase()).toBe("ok"); + const updatedRows = await actor.roundRobinUpdateValues( + hotRowIds, + HOT_ROW_UPDATES, + ); + expect(updatedRows).toHaveLength(HOT_ROW_COUNT); + for (const row of updatedRows) { + expect(row.value).toMatch(/^v-\d+$/); + } + }, + dbTestTimeout, + ); + + test( + "passes integrity checks after mixed workload and sleep", + async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actor = getDbActor(client, variant).getOrCreate([ + `db-${variant}-integrity-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + await actor.runMixedWorkload( + INTEGRITY_SEED_COUNT, + INTEGRITY_CHURN_COUNT, + ); + expect((await actor.integrityCheck()).toLowerCase()).toBe("ok"); - await actor.triggerSleep(); - await waitFor(driverTestConfig, SLEEP_WAIT_MS + 100); - expect((await actor.integrityCheck()).toLowerCase()).toBe("ok"); - }); - }); - } + await actor.triggerSleep(); + await waitFor(driverTestConfig, SLEEP_WAIT_MS + 100); + expect((await actor.integrityCheck()).toLowerCase()).toBe("ok"); + }, + dbTestTimeout, + ); + }); + } - describe("Actor Database Lifecycle Cleanup Tests", () => { - test("runs db provider cleanup on sleep", async (c) => { + describe("Actor Database Lifecycle Cleanup Tests", () => { + test( + "runs db provider cleanup on sleep", + async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const observer = client.dbLifecycleObserver.getOrCreate(["observer"]); @@ -274,9 +341,13 @@ export function runActorDbTests(driverTestConfig: DriverTestConfig) { expect(after.create).toBeGreaterThanOrEqual(before.create); expect(after.migrate).toBeGreaterThanOrEqual(before.migrate); expect(after.cleanup).toBeGreaterThanOrEqual(before.cleanup + 1); - }); + }, + lifecycleTestTimeout, + ); - test("runs db provider cleanup on destroy", async (c) => { + test( + "runs db provider cleanup on destroy", + async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const observer = client.dbLifecycleObserver.getOrCreate(["observer"]); @@ -301,11 +372,16 @@ export function runActorDbTests(driverTestConfig: DriverTestConfig) { } expect(cleanupCount).toBeGreaterThanOrEqual(before.cleanup + 1); - }); + }, + lifecycleTestTimeout, + ); - test("runs db provider cleanup when migration fails", async (c) => { + test( + "runs db provider cleanup when migration fails", + async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const observer = client.dbLifecycleObserver.getOrCreate(["observer"]); + const beforeTotalCleanup = await observer.getTotalCleanupCount(); const key = `db-lifecycle-migrate-failure-${crypto.randomUUID()}`; const lifecycle = client.dbLifecycleFailing.getOrCreate([key]); @@ -317,22 +393,23 @@ export function runActorDbTests(driverTestConfig: DriverTestConfig) { } expect(threw).toBeTruthy(); - const actorId = await client.dbLifecycleFailing.get([key]).resolve(); - - let cleanupCount = 0; + let cleanupCount = beforeTotalCleanup; for (let i = 0; i < LIFECYCLE_POLL_ATTEMPTS; i++) { - const counts = await observer.getCounts(actorId); - cleanupCount = counts.cleanup; - if (cleanupCount >= 1) { + cleanupCount = await observer.getTotalCleanupCount(); + if (cleanupCount >= beforeTotalCleanup + 1) { break; } await waitFor(driverTestConfig, LIFECYCLE_POLL_INTERVAL_MS); } - expect(cleanupCount).toBeGreaterThanOrEqual(1); - }); + expect(cleanupCount).toBeGreaterThanOrEqual(beforeTotalCleanup + 1); + }, + lifecycleTestTimeout, + ); - test("handles parallel actor lifecycle churn", async (c) => { + test( + "handles parallel actor lifecycle churn", + async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const observer = client.dbLifecycleObserver.getOrCreate(["observer"]); @@ -366,7 +443,11 @@ export function runActorDbTests(driverTestConfig: DriverTestConfig) { survivors.map((handle) => handle.getCount()), ); for (const count of survivorCounts) { - expect(count).toBe(2); + if (driverTestConfig.useRealTimers) { + expect(count).toBeGreaterThanOrEqual(2); + } else { + expect(count).toBe(2); + } } const lifecycleCleanup = new Map(); @@ -389,6 +470,8 @@ export function runActorDbTests(driverTestConfig: DriverTestConfig) { for (const actorId of actorIds) { expect(lifecycleCleanup.get(actorId) ?? 0).toBeGreaterThanOrEqual(1); } - }); - }); - } + }, + lifecycleTestTimeout, + ); + }); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-queue.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-queue.ts index 4db8bdafc9..98041187e9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-queue.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-queue.ts @@ -182,15 +182,16 @@ export function runActorQueueTests(driverTestConfig: DriverTestConfig) { } }); - test("wait send returns completion response", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const handle = client.queueActor.getOrCreate(["wait-complete"]); - - const actionPromise = handle.receiveAndComplete("tasks"); - const result = await handle.send("tasks", - { value: 123 }, - { wait: true, timeout: 1_000 }, - ); + test("wait send returns completion response", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueActor.getOrCreate(["wait-complete"]); + const waitTimeout = driverTestConfig.useRealTimers ? 5_000 : 1_000; + + const actionPromise = handle.receiveAndComplete("tasks"); + const result = await handle.send("tasks", + { value: 123 }, + { wait: true, timeout: waitTimeout }, + ); await actionPromise; expect(result).toEqual({ diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts index 627c77832a..419899e9c8 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts @@ -13,8 +13,18 @@ export function runActorWorkflowTests(driverTestConfig: DriverTestConfig) { "workflow-basic", ]); - await waitFor(driverTestConfig, 1000); - const state = await actor.getState(); + let state = await actor.getState(); + for (let i = 0; i < 50; i++) { + if ( + state.runCount > 0 && + state.history.length > 0 && + state.guardTriggered + ) { + break; + } + await waitFor(driverTestConfig, 100); + state = await actor.getState(); + } expect(state.runCount).toBeGreaterThan(0); expect(state.history.length).toBeGreaterThan(0); expect(state.guardTriggered).toBe(true); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts index 69715f3bf7..929ce83089 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts @@ -26,10 +26,6 @@ export async function setupDriverTest( // Build drivers const { endpoint, namespace, runnerName, cleanup } = await driverTestConfig.start(); - c.onTestFinished(() => { - logger().info("cleaning up test"); - cleanup(); - }); let client: Client; if (driverTestConfig.clientType === "http") { @@ -56,10 +52,14 @@ export async function setupDriverTest( assertUnreachable(driverTestConfig.clientType); } - // Cleanup client - if (!driverTestConfig.HACK_skipCleanupNet) { - c.onTestFinished(async () => await client.dispose()); - } + c.onTestFinished(async () => { + if (!driverTestConfig.HACK_skipCleanupNet) { + await client.dispose(); + } + + logger().info("cleaning up test"); + await cleanup(); + }); return { client, diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index bded008ecf..fb3b50f133 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -3,6 +3,7 @@ import type { RunnerConfig as EngineRunnerConfig, HibernatingWebSocketMetadata, } from "@rivetkit/engine-runner"; +import type { SqliteVfs } from "@rivetkit/sqlite-vfs"; import { idToStr, Runner } from "@rivetkit/engine-runner"; import * as cbor from "cbor-x"; import type { Context as HonoContext } from "hono"; @@ -51,6 +52,7 @@ import { import { logger } from "./log"; const RUNNER_SSE_PING_INTERVAL = 1000; +const RUNNER_STOP_WAIT_MS = 15_000; // Message ack deadline is 30s on the gateway, but we will ack more frequently // in order to minimize the message buffer size on the gateway and to give @@ -69,6 +71,7 @@ const CONN_BUFFERED_MESSAGE_SIZE_THRESHOLD = 500_000; interface ActorHandler { actor?: AnyActorInstance; actorStartPromise?: ReturnType>; + actorStartError?: Error; alarmTimeout?: LongTimeoutHandle; } @@ -189,6 +192,7 @@ export class EngineActorDriver implements ActorDriver { if (!handler) throw new Error(`Actor handler does not exist ${actorId}`); if (handler.actorStartPromise) await handler.actorStartPromise.promise; + if (handler.actorStartError) throw handler.actorStartError; if (!handler.actor) throw new Error("Actor should be loaded"); return handler; } @@ -283,6 +287,16 @@ export class EngineActorDriver implements ActorDriver { return result; } + /** Creates a SQLite VFS instance for creating KV-backed databases */ + async createSqliteVfs(): Promise { + // Dynamic import keeps @rivetkit/sqlite out of the main entrypoint bundle. + // Returning a fresh SqliteVfs gives each actor an isolated sqlite module + // instance, avoiding async re-entrancy across actors. + const specifier = "@rivetkit/" + "sqlite-vfs"; + const { SqliteVfs } = await import(specifier); + return new SqliteVfs(); + } + // MARK: - Actor Lifecycle async loadActor(actorId: string): Promise { const handler = await this.#loadActorHandler(actorId); @@ -350,7 +364,32 @@ export class EngineActorDriver implements ActorDriver { await Promise.all(stopPromises); logger().debug({ msg: "all actors stopped" }); - await this.#runner.shutdown(immediate); + try { + await this.#runner.shutdown(immediate); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (message.includes("WebSocket connection closed during shutdown")) { + logger().debug({ + msg: "ignoring shutdown websocket close race", + error: message, + }); + } else { + throw error; + } + } + + const stopped = await Promise.race([ + this.#runnerStopped.promise.then(() => true), + new Promise((resolve) => + setTimeout(() => resolve(false), RUNNER_STOP_WAIT_MS), + ), + ]); + if (!stopped) { + logger().warn({ + msg: "timed out waiting for runner shutdown", + waitMs: RUNNER_STOP_WAIT_MS, + }); + } } async serverlessHandleStart(c: HonoContext): Promise { @@ -430,6 +469,7 @@ export class EngineActorDriver implements ActorDriver { }; this.#actors.set(actorId, handler); } + handler.actorStartError = undefined; const name = actorConfig.name as string; invariant(actorConfig.key, "actor should have a key"); @@ -471,13 +511,34 @@ export class EngineActorDriver implements ActorDriver { logger().debug({ msg: "runner actor started", actorId, name, key }); } catch (innerError) { - const error = new Error( - `Failed to start actor ${actorId}: ${innerError}`, - { cause: innerError }, - ); + const error = + innerError instanceof Error + ? new Error( + `Failed to start actor ${actorId}: ${innerError.message}`, + { cause: innerError }, + ) + : new Error(`Failed to start actor ${actorId}: ${String(innerError)}`); + handler.actor = undefined; + handler.actorStartError = error; handler.actorStartPromise?.reject(error); handler.actorStartPromise = undefined; - throw error; + logger().error({ + msg: "runner actor failed to start", + actorId, + name, + key, + err: stringifyError(error), + }); + + try { + this.#runner.stopActor(actorId); + } catch (stopError) { + logger().debug({ + msg: "failed to stop actor after start failure", + actorId, + err: stringifyError(stopError), + }); + } } } @@ -498,6 +559,14 @@ export class EngineActorDriver implements ActorDriver { this.#actorStopIntent.delete(actorId); const handler = this.#actors.get(actorId); + if (handler?.actorStartPromise) { + const startError = + handler.actorStartError ?? + new Error(`Actor ${actorId} stopped before start completed`); + handler.actorStartError = startError; + handler.actorStartPromise.reject(startError); + handler.actorStartPromise = undefined; + } if (handler?.actor) { try { await handler.actor.onStop(reason); @@ -507,8 +576,8 @@ export class EngineActorDriver implements ActorDriver { err: stringifyError(err), }); } - this.#actors.delete(actorId); } + if (handler) this.#actors.delete(actorId); logger().debug({ msg: "runner actor stopped", actorId, reason }); } @@ -872,6 +941,7 @@ export class EngineActorDriver implements ActorDriver { // Resolve promise if waiting const handler = this.#actors.get(actor.id); invariant(handler, "missing actor handler in onBeforeActorReady"); + handler.actorStartError = undefined; handler.actorStartPromise?.resolve(); handler.actorStartPromise = undefined; diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts index b09259cac7..edc77e4b08 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts @@ -163,7 +163,10 @@ export class ActorWorkflowDriver implements EngineDriver { this.#actor.id, writes.map(({ key, value }) => [makeWorkflowKey(key), value]), ), - this.#actor.stateManager.saveState({ immediate: true }), + this.#actor.stateManager.saveState({ + immediate: true, + allowStoppingState: true, + }), ]), ); } diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts index 8e41522ac3..b7720343a5 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts @@ -2,13 +2,8 @@ import { join } from "node:path"; import { createClientWithDriver } from "@/client/client"; import { createTestRuntime, runDriverTests } from "@/driver-test-suite/mod"; import { createEngineDriver } from "@/drivers/engine/mod"; -import { LegacyRunnerConfigSchema } from "@/registry/config/legacy-runner"; import invariant from "invariant"; -import { RegistryConfigSchema } from "@/registry/config"; -import { - ClientConfigSchema, - convertRegistryConfigToClientConfig, -} from "@/client/config"; +import { convertRegistryConfigToClientConfig } from "@/client/config"; runDriverTests({ // Use real timers for engine-runner tests @@ -21,15 +16,18 @@ runDriverTests({ return await createTestRuntime( join(__dirname, "../fixtures/driver-test-suite/registry.ts"), async (registry) => { - // Get configuration from environment or use defaults - const endpoint = - process.env.RIVET_ENDPOINT || "http://127.0.0.1:6420"; + // Get configuration from environment or use defaults. + const endpoint = process.env.RIVET_ENDPOINT || "http://127.0.0.1:6420"; + const namespaceEndpoint = + process.env.RIVET_NAMESPACE_ENDPOINT || + process.env.RIVET_API_ENDPOINT || + endpoint; const namespace = `test-${crypto.randomUUID().slice(0, 8)}`; const runnerName = "test-runner"; const token = "dev"; - // Create namespace - const response = await fetch(`${endpoint}/namespaces`, { + // Create namespace. + const response = await fetch(`${namespaceEndpoint}/namespaces`, { method: "POST", headers: { "Content-Type": "application/json", @@ -41,16 +39,16 @@ runDriverTests({ }), }); if (!response.ok) { - throw "Create namespace failed"; + const errorBody = await response.text().catch(() => ""); + throw new Error( + `Create namespace failed at ${namespaceEndpoint}: ${response.status} ${response.statusText} ${errorBody}`, + ); } - // Create driver config + // Create driver config. const driverConfig = createEngineDriver(); - // TODO: We should not have to do this, we should have access to the Runtime instead - const parsedConfig = registry.parseConfig(); - - // Start the actor driver + // Start the actor driver. registry.config.driver = driverConfig; registry.config.endpoint = endpoint; registry.config.namespace = namespace; @@ -59,6 +57,12 @@ runDriverTests({ ...registry.config.runner, runnerName, }; + + // Parse config only after mutating registry.config so the manager + // and actor drivers do not get stale namespace/runner values from + // previous tests. + const parsedConfig = registry.parseConfig(); + const managerDriver = driverConfig.manager?.(parsedConfig); invariant(managerDriver, "missing manager driver"); const inlineClient = createClientWithDriver( @@ -72,13 +76,56 @@ runDriverTests({ inlineClient, ); - await new Promise((resolve) => setTimeout(resolve, 1000)); + // Wait for runner registration so tests do not race actor creation + // against asynchronous runner connect. + const runnersUrl = new URL(`${endpoint.replace(/\/$/, "")}/runners`); + runnersUrl.searchParams.set("namespace", namespace); + runnersUrl.searchParams.set("name", runnerName); + let probeError: unknown; + for (let attempt = 0; attempt < 120; attempt++) { + try { + const runnerResponse = await fetch(runnersUrl, { + method: "GET", + headers: { + Authorization: `Bearer ${token}`, + }, + }); + if (!runnerResponse.ok) { + const errorBody = await runnerResponse.text().catch(() => ""); + probeError = new Error( + `List runners failed: ${runnerResponse.status} ${runnerResponse.statusText} ${errorBody}`, + ); + } else { + const responseJson = (await runnerResponse.json()) as { + runners?: Array<{ name?: string }>; + }; + const hasRunner = !!responseJson.runners?.some( + (runner) => runner.name === runnerName, + ); + if (hasRunner) { + probeError = undefined; + break; + } + probeError = new Error( + `Runner ${runnerName} not registered yet`, + ); + } + } catch (err) { + probeError = err; + } + if (attempt < 119) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } + if (probeError) { + throw probeError; + } return { rivetEngine: { - endpoint: "http://127.0.0.1:6420", - namespace: namespace, - runnerName: runnerName, + endpoint, + namespace, + runnerName, token, }, driver: driverConfig, diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts index c33795a154..c14b7e57cb 100644 --- a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts +++ b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts @@ -118,7 +118,8 @@ async function loadSqliteRuntime(): Promise { } const sqliteEsmFactory = sqliteModule.default; const require = createRequire(import.meta.url); - const wasmPath = require.resolve("@rivetkit/sqlite/dist/wa-sqlite-async.wasm"); + const sqliteDistPath = "@rivetkit/sqlite/dist/"; + const wasmPath = require.resolve(sqliteDistPath + "wa-sqlite-async.wasm"); const wasmBinary = readFileSync(wasmPath); const module = await sqliteEsmFactory({ wasmBinary }); if (!isSQLiteModule(module)) {