Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions engine/packages/epoxy/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::*;
use anyhow::{Context, Result, bail};
use epoxy_protocol::{
PROTOCOL_VERSION,
protocol::{self, ReplicaId},
Expand Down Expand Up @@ -37,7 +37,13 @@ where
Fut: Future<Output = Result<T>> + Send,
T: Send,
{
let quorum_size = utils::calculate_quorum(replica_ids.len(), quorum_type);
let target_responses = utils::calculate_fanout_quorum(replica_ids.len(), quorum_type);

if target_responses == 0 {
tracing::warn!("no fanout, target is 0");

return Ok(Vec::new());
}

// Create futures for all replicas (excluding the sender)
let mut responses = futures_util::stream::iter(
Expand All @@ -57,32 +63,22 @@ where
)
.collect::<FuturesUnordered<_>>()
.await;
tracing::debug!(?quorum_size, len = ?responses.len(), ?quorum_type, "fanout quorum size");

// Choose how many successful responses we need before considering a success
let target_responses = match quorum_type {
// Only require 1 response
utils::QuorumType::Any => 1,
// Include all responses
utils::QuorumType::All => responses.len(),
// Subtract 1 from quorum size since we're not counting ourselves
utils::QuorumType::Fast | utils::QuorumType::Slow => quorum_size - 1,
};
tracing::debug!(?target_responses, len=?responses.len(), "fanout target");

// Collect responses until we reach quorum or all futures complete
let mut successful_responses = Vec::new();
while successful_responses.len() < target_responses {
if let Some(response) = responses.next().await {
match response {
std::result::Result::Ok(result) => match result {
std::result::Result::Ok(response) => {
Ok(result) => match result {
Ok(response) => {
successful_responses.push(response);
}
std::result::Result::Err(err) => {
Err(err) => {
tracing::warn!(?err, "received error from replica");
}
},
std::result::Result::Err(err) => {
Err(err) => {
tracing::warn!(?err, "received timeout from replica");
}
}
Expand Down Expand Up @@ -159,8 +155,8 @@ pub async fn send_message_to_address(
.await;

let response = match response_result {
std::result::Result::Ok(resp) => resp,
std::result::Result::Err(e) => {
Ok(resp) => resp,
Err(e) => {
tracing::error!(
to_replica = to_replica_id,
replica_url = %replica_url,
Expand Down
52 changes: 47 additions & 5 deletions engine/packages/epoxy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,54 @@ pub fn get_all_replicas(config: &protocol::ClusterConfig) -> Vec<ReplicaId> {
config.replicas.iter().map(|r| r.replica_id).collect()
}

// See EPaxos 4.3
pub fn calculate_quorum(n: usize, q: QuorumType) -> usize {
match q {
QuorumType::Fast => (n * 3) / 4 + 1,
QuorumType::Slow => n / 2 + 1,
QuorumType::All => n,
QuorumType::Any => 1,
match n {
// Nonsensical
0 => 0,
1 => 1,
// EPaxos does not apply to clusters with N < 3 because you cannot tolerate any faults. However we can
// still get correctness invariants to hold by requiring both nodes to agree on everything (quorum
// size is always 2)
2 => match q {
QuorumType::Fast => 2,
QuorumType::Slow => 2,
QuorumType::All => 2,
QuorumType::Any => 1,
},
// Note that for even N's we don't gain any extra fault tolerance but we get potentially better read
// latency. N=4 acts like N=3 in terms of fault tolerance.
n => {
let f = (n - 1) / 2;

match q {
QuorumType::Fast => f + (f + 1) / 2,
QuorumType::Slow => f + 1,
QuorumType::All => n,
QuorumType::Any => 1,
}
}
}
}

/// Calculates quorum size assuming the sender is excluded.
pub fn calculate_fanout_quorum(n: usize, q: QuorumType) -> usize {
match n {
// Nonsensical
0 => 0,
1 => 0,
// NOTE: See comments in `calculate_quorum`
2 => 1,
n => {
let f = (n - 1) / 2;

match q {
QuorumType::Fast => (f + (f + 1) / 2) - 1,
QuorumType::Slow => f,
QuorumType::All => n - 1,
QuorumType::Any => 1,
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { actor } from "rivetkit";
import { ActorError } from "@/actor/errors";

// Custom error that will be thrown in createConnState
class CustomConnectionError extends ActorError {
constructor(message: string) {
super("connection", "custom_error", message, { public: true });
}
}

/**
* Actor that throws a custom error in createConnState to test error serialization
*/
export const connErrorSerializationActor = actor({
state: {
value: 0,
},
createConnState: (_c, params: { shouldThrow?: boolean }) => {
if (params.shouldThrow) {
throw new CustomConnectionError("Test error from createConnState");
}
return { initialized: true };
},
actions: {
getValue: (c) => c.state.value,
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ import {
workflowSleepActor,
workflowStopTeardownActor,
} from "./workflow";
import { startStopRaceActor, lifecycleObserver } from "./start-stop-race";
import { connErrorSerializationActor } from "./conn-error-serialization";

// Consolidated setup with all actors
export const registry = setup({
Expand Down Expand Up @@ -177,5 +179,10 @@ export const registry = setup({
// From access-control.ts
accessControlActor,
accessControlNoQueuesActor,
// From start-stop-race.ts
startStopRaceActor,
lifecycleObserver,
// From conn-error-serialization.ts
connErrorSerializationActor,
},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { actor } from "rivetkit";

/**
* Actor designed to test start/stop race conditions.
* Has a slow initialization to make race conditions easier to trigger.
*/
export const startStopRaceActor = actor({
state: {
initialized: false,
startTime: 0,
destroyCalled: false,
startCompleted: false,
},
onWake: async (c) => {
c.state.startTime = Date.now();

// Simulate slow initialization to create window for race condition
await new Promise((resolve) => setTimeout(resolve, 100));

c.state.initialized = true;
c.state.startCompleted = true;
},
onDestroy: (c) => {
c.state.destroyCalled = true;
// Don't save state here - the actor framework will save it automatically
},
actions: {
getState: (c) => {
return {
initialized: c.state.initialized,
startTime: c.state.startTime,
destroyCalled: c.state.destroyCalled,
startCompleted: c.state.startCompleted,
};
},
ping: (c) => {
return "pong";
},
destroy: (c) => {
c.destroy();
},
},
});

/**
* Observer actor to track lifecycle events from other actors
*/
export const lifecycleObserver = actor({
state: {
events: [] as Array<{
actorKey: string;
event: string;
timestamp: number;
}>,
},
actions: {
recordEvent: (c, params: { actorKey: string; event: string }) => {
c.state.events.push({
actorKey: params.actorKey,
event: params.event,
timestamp: Date.now(),
});
},
getEvents: (c) => {
return c.state.events;
},
clearEvents: (c) => {
c.state.events = [];
},
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { runActorConnTests } from "./tests/actor-conn";
import { runActorConnHibernationTests } from "./tests/actor-conn-hibernation";
import { runActorConnStateTests } from "./tests/actor-conn-state";
import { runActorDbTests } from "./tests/actor-db";
import { runConnErrorSerializationTests } from "./tests/conn-error-serialization";
import { runActorDestroyTests } from "./tests/actor-destroy";
import { runActorDriverTests } from "./tests/actor-driver";
import { runActorErrorHandlingTests } from "./tests/actor-error-handling";
Expand Down Expand Up @@ -111,6 +112,8 @@ export function runDriverTests(

runActorConnHibernationTests(driverTestConfig);

runConnErrorSerializationTests(driverTestConfig);

runActorDbTests(driverTestConfig);

runActorDestroyTests(driverTestConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe } from "vitest";
import type { DriverTestConfig } from "../mod";
import { runActorLifecycleTests } from "./actor-lifecycle";
import { runActorScheduleTests } from "./actor-schedule";
import { runActorSleepTests } from "./actor-sleep";
import { runActorStateTests } from "./actor-state";
Expand All @@ -14,5 +15,8 @@ export function runActorDriverTests(driverTestConfig: DriverTestConfig) {

// Run actor sleep tests
runActorSleepTests(driverTestConfig);

// Run actor lifecycle tests
runActorLifecycleTests(driverTestConfig);
});
}
Loading
Loading