diff --git a/engine/packages/epoxy/src/http_client.rs b/engine/packages/epoxy/src/http_client.rs index 1eee2eee76..1e3a0d4af0 100644 --- a/engine/packages/epoxy/src/http_client.rs +++ b/engine/packages/epoxy/src/http_client.rs @@ -1,4 +1,4 @@ -use anyhow::*; +use anyhow::{Context, Result, bail}; use epoxy_protocol::{ PROTOCOL_VERSION, protocol::{self, ReplicaId}, @@ -37,7 +37,13 @@ where Fut: Future> + Send, T: Send, { - let quorum_size = utils::calculate_quorum(replica_ids.len(), quorum_type); + let target_responses = utils::calculate_fanout_quorum(replica_ids.len(), quorum_type); + + if target_responses == 0 { + tracing::warn!("no fanout, target is 0"); + + return Ok(Vec::new()); + } // Create futures for all replicas (excluding the sender) let mut responses = futures_util::stream::iter( @@ -57,32 +63,22 @@ where ) .collect::>() .await; - tracing::debug!(?quorum_size, len = ?responses.len(), ?quorum_type, "fanout quorum size"); - - // Choose how many successful responses we need before considering a success - let target_responses = match quorum_type { - // Only require 1 response - utils::QuorumType::Any => 1, - // Include all responses - utils::QuorumType::All => responses.len(), - // Subtract 1 from quorum size since we're not counting ourselves - utils::QuorumType::Fast | utils::QuorumType::Slow => quorum_size - 1, - }; + tracing::debug!(?target_responses, len=?responses.len(), "fanout target"); // Collect responses until we reach quorum or all futures complete let mut successful_responses = Vec::new(); while successful_responses.len() < target_responses { if let Some(response) = responses.next().await { match response { - std::result::Result::Ok(result) => match result { - std::result::Result::Ok(response) => { + Ok(result) => match result { + Ok(response) => { successful_responses.push(response); } - std::result::Result::Err(err) => { + Err(err) => { tracing::warn!(?err, "received error from replica"); } }, - std::result::Result::Err(err) => { + Err(err) => { tracing::warn!(?err, "received timeout from replica"); } } @@ -159,8 +155,8 @@ pub async fn send_message_to_address( .await; let response = match response_result { - std::result::Result::Ok(resp) => resp, - std::result::Result::Err(e) => { + Ok(resp) => resp, + Err(e) => { tracing::error!( to_replica = to_replica_id, replica_url = %replica_url, diff --git a/engine/packages/epoxy/src/utils.rs b/engine/packages/epoxy/src/utils.rs index cd7f51953a..a0840a0102 100644 --- a/engine/packages/epoxy/src/utils.rs +++ b/engine/packages/epoxy/src/utils.rs @@ -43,12 +43,54 @@ pub fn get_all_replicas(config: &protocol::ClusterConfig) -> Vec { config.replicas.iter().map(|r| r.replica_id).collect() } +// See EPaxos 4.3 pub fn calculate_quorum(n: usize, q: QuorumType) -> usize { - match q { - QuorumType::Fast => (n * 3) / 4 + 1, - QuorumType::Slow => n / 2 + 1, - QuorumType::All => n, - QuorumType::Any => 1, + match n { + // Nonsensical + 0 => 0, + 1 => 1, + // EPaxos does not apply to clusters with N < 3 because you cannot tolerate any faults. However we can + // still get correctness invariants to hold by requiring both nodes to agree on everything (quorum + // size is always 2) + 2 => match q { + QuorumType::Fast => 2, + QuorumType::Slow => 2, + QuorumType::All => 2, + QuorumType::Any => 1, + }, + // Note that for even N's we don't gain any extra fault tolerance but we get potentially better read + // latency. N=4 acts like N=3 in terms of fault tolerance. + n => { + let f = (n - 1) / 2; + + match q { + QuorumType::Fast => f + (f + 1) / 2, + QuorumType::Slow => f + 1, + QuorumType::All => n, + QuorumType::Any => 1, + } + } + } +} + +/// Calculates quorum size assuming the sender is excluded. +pub fn calculate_fanout_quorum(n: usize, q: QuorumType) -> usize { + match n { + // Nonsensical + 0 => 0, + 1 => 0, + // NOTE: See comments in `calculate_quorum` + 2 => 1, + n => { + let f = (n - 1) / 2; + + match q { + QuorumType::Fast => (f + (f + 1) / 2) - 1, + QuorumType::Slow => f, + QuorumType::All => n - 1, + QuorumType::Any => 1, + } + } } } diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-error-serialization.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-error-serialization.ts new file mode 100644 index 0000000000..900943bbc8 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-error-serialization.ts @@ -0,0 +1,27 @@ +import { actor } from "rivetkit"; +import { ActorError } from "@/actor/errors"; + +// Custom error that will be thrown in createConnState +class CustomConnectionError extends ActorError { + constructor(message: string) { + super("connection", "custom_error", message, { public: true }); + } +} + +/** + * Actor that throws a custom error in createConnState to test error serialization + */ +export const connErrorSerializationActor = actor({ + state: { + value: 0, + }, + createConnState: (_c, params: { shouldThrow?: boolean }) => { + if (params.shouldThrow) { + throw new CustomConnectionError("Test error from createConnState"); + } + return { initialized: true }; + }, + actions: { + getValue: (c) => c.state.value, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts index d85c7e881e..4b9b840a39 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -75,6 +75,8 @@ import { workflowSleepActor, workflowStopTeardownActor, } from "./workflow"; +import { startStopRaceActor, lifecycleObserver } from "./start-stop-race"; +import { connErrorSerializationActor } from "./conn-error-serialization"; // Consolidated setup with all actors export const registry = setup({ @@ -177,5 +179,10 @@ export const registry = setup({ // From access-control.ts accessControlActor, accessControlNoQueuesActor, + // From start-stop-race.ts + startStopRaceActor, + lifecycleObserver, + // From conn-error-serialization.ts + connErrorSerializationActor, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/start-stop-race.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/start-stop-race.ts new file mode 100644 index 0000000000..9fad609233 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/start-stop-race.ts @@ -0,0 +1,71 @@ +import { actor } from "rivetkit"; + +/** + * Actor designed to test start/stop race conditions. + * Has a slow initialization to make race conditions easier to trigger. + */ +export const startStopRaceActor = actor({ + state: { + initialized: false, + startTime: 0, + destroyCalled: false, + startCompleted: false, + }, + onWake: async (c) => { + c.state.startTime = Date.now(); + + // Simulate slow initialization to create window for race condition + await new Promise((resolve) => setTimeout(resolve, 100)); + + c.state.initialized = true; + c.state.startCompleted = true; + }, + onDestroy: (c) => { + c.state.destroyCalled = true; + // Don't save state here - the actor framework will save it automatically + }, + actions: { + getState: (c) => { + return { + initialized: c.state.initialized, + startTime: c.state.startTime, + destroyCalled: c.state.destroyCalled, + startCompleted: c.state.startCompleted, + }; + }, + ping: (c) => { + return "pong"; + }, + destroy: (c) => { + c.destroy(); + }, + }, +}); + +/** + * Observer actor to track lifecycle events from other actors + */ +export const lifecycleObserver = actor({ + state: { + events: [] as Array<{ + actorKey: string; + event: string; + timestamp: number; + }>, + }, + actions: { + recordEvent: (c, params: { actorKey: string; event: string }) => { + c.state.events.push({ + actorKey: params.actorKey, + event: params.event, + timestamp: Date.now(), + }); + }, + getEvents: (c) => { + return c.state.events; + }, + clearEvents: (c) => { + c.state.events = []; + }, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index cf3590672e..98db320e1a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -17,6 +17,7 @@ import { runActorConnTests } from "./tests/actor-conn"; import { runActorConnHibernationTests } from "./tests/actor-conn-hibernation"; import { runActorConnStateTests } from "./tests/actor-conn-state"; import { runActorDbTests } from "./tests/actor-db"; +import { runConnErrorSerializationTests } from "./tests/conn-error-serialization"; import { runActorDestroyTests } from "./tests/actor-destroy"; import { runActorDriverTests } from "./tests/actor-driver"; import { runActorErrorHandlingTests } from "./tests/actor-error-handling"; @@ -111,6 +112,8 @@ export function runDriverTests( runActorConnHibernationTests(driverTestConfig); + runConnErrorSerializationTests(driverTestConfig); + runActorDbTests(driverTestConfig); runActorDestroyTests(driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts index 438348285a..efa2d96cd9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts @@ -1,5 +1,6 @@ import { describe } from "vitest"; import type { DriverTestConfig } from "../mod"; +import { runActorLifecycleTests } from "./actor-lifecycle"; import { runActorScheduleTests } from "./actor-schedule"; import { runActorSleepTests } from "./actor-sleep"; import { runActorStateTests } from "./actor-state"; @@ -14,5 +15,8 @@ export function runActorDriverTests(driverTestConfig: DriverTestConfig) { // Run actor sleep tests runActorSleepTests(driverTestConfig); + + // Run actor lifecycle tests + runActorLifecycleTests(driverTestConfig); }); } diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-lifecycle.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-lifecycle.ts new file mode 100644 index 0000000000..7333cfa977 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-lifecycle.ts @@ -0,0 +1,157 @@ +import { describe, expect, test } from "vitest"; +import type { DriverTestConfig } from "../mod"; +import { setupDriverTest } from "../utils"; + +export function runActorLifecycleTests(driverTestConfig: DriverTestConfig) { + describe("Actor Lifecycle Tests", () => { + test("actor stop during start waits for start to complete", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = `test-stop-during-start-${Date.now()}`; + + // Create actor - this starts the actor + const actor = client.startStopRaceActor.getOrCreate([actorKey]); + + // Immediately try to call an action and then destroy + // This creates a race where the actor might not be fully started yet + const pingPromise = actor.ping(); + + // Get actor ID + const actorId = await actor.resolve(); + + // Destroy immediately while start might still be in progress + await actor.destroy(); + + // The ping should still complete successfully because destroy waits for start + const result = await pingPromise; + expect(result).toBe("pong"); + + // Verify actor was actually destroyed + let destroyed = false; + try { + await client.startStopRaceActor.getForId(actorId).ping(); + } catch (err: any) { + destroyed = true; + expect(err.group).toBe("actor"); + expect(err.code).toBe("not_found"); + } + expect(destroyed).toBe(true); + }); + + test("actor stop before actor instantiation completes cleans up handler", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = `test-stop-before-instantiation-${Date.now()}`; + + // Create multiple actors rapidly to increase chance of race + const actors = Array.from({ length: 5 }, (_, i) => + client.startStopRaceActor.getOrCreate([ + `${actorKey}-${i}`, + ]), + ); + + // Resolve all actor IDs (this triggers start) + const ids = await Promise.all(actors.map((a) => a.resolve())); + + // Immediately destroy all actors + await Promise.all(actors.map((a) => a.destroy())); + + // Verify all actors were cleaned up + for (const id of ids) { + let destroyed = false; + try { + await client.startStopRaceActor.getForId(id).ping(); + } catch (err: any) { + destroyed = true; + expect(err.group).toBe("actor"); + expect(err.code).toBe("not_found"); + } + expect(destroyed, `actor ${id} should be destroyed`).toBe( + true, + ); + } + }); + + test("onBeforeActorStart completes before stop proceeds", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = `test-before-actor-start-${Date.now()}`; + + // Create actor + const actor = client.startStopRaceActor.getOrCreate([actorKey]); + + // Call action to ensure actor is starting + const statePromise = actor.getState(); + + // Destroy immediately + await actor.destroy(); + + // State should be initialized because onBeforeActorStart must complete + const state = await statePromise; + expect(state.initialized).toBe(true); + expect(state.startCompleted).toBe(true); + }); + + test("multiple rapid create/destroy cycles handle race correctly", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Perform multiple rapid create/destroy cycles + for (let i = 0; i < 10; i++) { + const actorKey = `test-rapid-cycle-${Date.now()}-${i}`; + const actor = client.startStopRaceActor.getOrCreate([ + actorKey, + ]); + + // Trigger start + const resolvePromise = actor.resolve(); + + // Immediately destroy + const destroyPromise = actor.destroy(); + + // Both should complete without errors + await Promise.all([resolvePromise, destroyPromise]); + } + + // If we get here without errors, the race condition is handled correctly + expect(true).toBe(true); + }); + + test("actor stop called with no actor instance cleans up handler", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = `test-cleanup-no-instance-${Date.now()}`; + + // Create and immediately destroy + const actor = client.startStopRaceActor.getOrCreate([actorKey]); + const id = await actor.resolve(); + await actor.destroy(); + + // Try to recreate with same key - should work without issues + const newActor = client.startStopRaceActor.getOrCreate([ + actorKey, + ]); + const result = await newActor.ping(); + expect(result).toBe("pong"); + + // Clean up + await newActor.destroy(); + }); + + test("onDestroy is called even when actor is destroyed during start", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = `test-ondestroy-during-start-${Date.now()}`; + + // Create actor + const actor = client.startStopRaceActor.getOrCreate([actorKey]); + + // Start and immediately destroy + const statePromise = actor.getState(); + await actor.destroy(); + + // Verify onDestroy was called (requires actor to be started) + const state = await statePromise; + expect(state.destroyCalled).toBe(true); + }); + }); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/conn-error-serialization.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/conn-error-serialization.ts new file mode 100644 index 0000000000..e5ccf1ef23 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/conn-error-serialization.ts @@ -0,0 +1,64 @@ +import { describe, expect, test } from "vitest"; +import type { DriverTestConfig } from "../mod"; +import { setupDriverTest } from "../utils"; + +export function runConnErrorSerializationTests(driverTestConfig: DriverTestConfig) { + describe("Connection Error Serialization Tests", () => { + test("error thrown in createConnState preserves group and code through WebSocket serialization", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = `test-error-serialization-${Date.now()}`; + + // Create actor handle with params that will trigger error in createConnState + const actor = client.connErrorSerializationActor.getOrCreate( + [actorKey], + { params: { shouldThrow: true } }, + ); + + // Try to connect, which will trigger error in createConnState + const conn = actor.connect(); + + // Wait for connection to fail + let caughtError: any; + try { + // Try to call an action, which should fail because connection couldn't be established + await conn.getValue(); + } catch (err) { + caughtError = err; + } + + // Verify the error was caught + expect(caughtError).toBeDefined(); + + // Verify the error has the correct group and code from the original error + // Original error: new CustomConnectionError("...") with group="connection", code="custom_error" + expect(caughtError.group).toBe("connection"); + expect(caughtError.code).toBe("custom_error"); + + // Clean up + await conn.dispose(); + }); + + test("successful createConnState does not throw error", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = `test-no-error-${Date.now()}`; + + // Create actor handle with params that will NOT trigger error + const actor = client.connErrorSerializationActor.getOrCreate( + [actorKey], + { params: { shouldThrow: false } }, + ); + + // Connect without triggering error + const conn = actor.connect(); + + // This should succeed + const value = await conn.getValue(); + expect(value).toBe(0); + + // Clean up + await conn.dispose(); + }); + }); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index fb3b50f133..b76c5f69d2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -156,7 +156,7 @@ export class EngineActorDriver implements ActorDriver { onConnected: () => { this.#runnerStarted.resolve(undefined); }, - onDisconnected: (_code, _reason) => {}, + onDisconnected: (_code, _reason) => { }, onShutdown: () => { this.#runnerStopped.resolve(undefined); this.#isRunnerStopped = true; @@ -395,7 +395,7 @@ export class EngineActorDriver implements ActorDriver { async serverlessHandleStart(c: HonoContext): Promise { return streamSSE(c, async (stream) => { // NOTE: onAbort does not work reliably - stream.onAbort(() => {}); + stream.onAbort(() => { }); c.req.raw.signal.addEventListener("abort", () => { logger().debug("SSE aborted, shutting down runner"); @@ -514,9 +514,9 @@ export class EngineActorDriver implements ActorDriver { const error = innerError instanceof Error ? new Error( - `Failed to start actor ${actorId}: ${innerError.message}`, - { cause: innerError }, - ) + `Failed to start actor ${actorId}: ${innerError.message}`, + { cause: innerError }, + ) : new Error(`Failed to start actor ${actorId}: ${String(innerError)}`); handler.actor = undefined; handler.actorStartError = error; @@ -559,15 +559,26 @@ export class EngineActorDriver implements ActorDriver { this.#actorStopIntent.delete(actorId); const handler = this.#actors.get(actorId); - if (handler?.actorStartPromise) { - const startError = - handler.actorStartError ?? - new Error(`Actor ${actorId} stopped before start completed`); - handler.actorStartError = startError; - handler.actorStartPromise.reject(startError); - handler.actorStartPromise = undefined; + if (!handler) { + logger().debug({ msg: "no runner actor handler to stop", actorId, reason }); + return; + } + + if (handler.actorStartPromise) { + try { + logger().debug({ msg: "runner actor stopping before it started, waiting", actorId, generation }); + await handler.actorStartPromise.promise; + } catch (err) { + // Start failed, but we still want to clean up the handler + logger().debug({ + msg: "actor start failed during stop, cleaning up handler", + actorId, + err: stringifyError(err), + }); + } } - if (handler?.actor) { + + if (handler.actor) { try { await handler.actor.onStop(reason); } catch (err) { @@ -577,7 +588,8 @@ export class EngineActorDriver implements ActorDriver { }); } } - if (handler) this.#actors.delete(actorId); + + this.#actors.delete(actorId); logger().debug({ msg: "runner actor stopped", actorId, reason }); }