diff --git a/packages/rivetkit/src/drivers/engine/actor-driver.ts b/packages/rivetkit/src/drivers/engine/actor-driver.ts index 10e7fd514..d08e22249 100644 --- a/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -177,16 +177,9 @@ export class EngineActorDriver implements ActorDriver { async readPersistedData(actorId: string): Promise { const handler = this.#actors.get(actorId); if (!handler) throw new Error(`Actor ${actorId} not loaded`); - if (handler.persistedData) return handler.persistedData; - const [value] = await this.#runner.kvGet(actorId, [KEYS.PERSIST_DATA]); - - if (value !== null) { - handler.persistedData = value; - return value; - } else { - return undefined; - } + // This was loaded during actor startup + return handler.persistedData; } async writePersistedData(actorId: string, data: Uint8Array): Promise { @@ -251,11 +244,24 @@ export class EngineActorDriver implements ActorDriver { // Get or create handler let handler = this.#actors.get(actorId); if (!handler) { + // IMPORTANT: We must set the handler in the map synchronously before doing any + // async operations to avoid race conditions where multiple calls might try to + // create the same handler simultaneously. handler = { actorStartPromise: promiseWithResolvers(), - persistedData: serializeEmptyPersistData(input), + persistedData: undefined, }; this.#actors.set(actorId, handler); + + // Load persisted data from storage + const [persistedValue] = await this.#runner.kvGet(actorId, [ + KEYS.PERSIST_DATA, + ]); + + handler.persistedData = + persistedValue !== null + ? persistedValue + : serializeEmptyPersistData(input); } const name = runConfig.name as string; diff --git a/packages/rivetkit/src/manager/router.ts b/packages/rivetkit/src/manager/router.ts index f468df105..4d2e1fed0 100644 --- a/packages/rivetkit/src/manager/router.ts +++ b/packages/rivetkit/src/manager/router.ts @@ -641,7 +641,7 @@ function addManagerRoutes( }); } - router.get("/health", (c) => handleHealthRequest(c, runConfig)); + router.get("/health", (c) => handleHealthRequest(c)); router.get("/metadata", (c) => handleMetadataRequest(c, runConfig)); diff --git a/packages/rivetkit/tests/driver-engine.test.ts b/packages/rivetkit/tests/driver-engine.test.ts index f33f5a0e2..74a0dfab5 100644 --- a/packages/rivetkit/tests/driver-engine.test.ts +++ b/packages/rivetkit/tests/driver-engine.test.ts @@ -18,9 +18,10 @@ runDriverTests({ join(__dirname, "../fixtures/driver-test-suite/registry.ts"), async (registry) => { // Get configuration from environment or use defaults - const endpoint = process.env.RIVET_ENDPOINT || "http://localhost:6420"; + const endpoint = process.env.RIVET_ENDPOINT || "http://127.0.0.1:6420"; const namespace = `test-${crypto.randomUUID().slice(0, 8)}`; const runnerName = "test-runner"; + const token = "dev"; // Create namespace const response = await fetch(`${endpoint}/namespaces`, { @@ -44,6 +45,10 @@ runDriverTests({ // Start the actor driver const runConfig = RunnerConfigSchema.parse({ driver: driverConfig, + endpoint, + namespace, + runnerName, + token, getUpgradeWebSocket: () => undefined, }); const managerDriver = driverConfig.manager(registry.config, runConfig); @@ -55,12 +60,14 @@ runDriverTests({ inlineClient, ); + await new Promise((resolve) => setTimeout(resolve, 1000)); + return { rivetEngine: { endpoint: "http://127.0.0.1:6420", namespace: namespace, runnerName: runnerName, - token: "dev", + token, }, driver: driverConfig, cleanup: async () => {