Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/apollo_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ strum_macros.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full", "sync"] }
tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] }
tokio-retry.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::fmt::Display;

use clap::{Parser, ValueEnum};

#[derive(Debug, Clone, ValueEnum, PartialEq, Eq)]
pub enum Mode {
/// All nodes broadcast messages
#[value(name = "all")]
AllBroadcast,
/// Only the node specified by --broadcaster broadcasts messages
#[value(name = "one")]
OneBroadcast,
/// Nodes take turns broadcasting in round-robin fashion
#[value(name = "rr")]
RoundRobin,
/// Only the node specified by --broadcaster broadcasts messages,
/// Every explore_run_duration_seconds + explore_cool_down_duration_seconds seconds
/// a new combination of MPS and message size is explored.
/// Increases the throughput with each new trial.
/// Configurations are filtered by minimum throughput and minimum message size.
#[value(name = "explore")]
Explore,
}

#[derive(Debug, Clone, ValueEnum, PartialEq, Eq)]
pub enum NetworkProtocol {
/// Use gossipsub for broadcasting (default)
#[value(name = "gossipsub")]
Gossipsub,
/// Use SQMR for point-to-point communication
#[value(name = "sqmr")]
Sqmr,
/// Use Reversed SQMR where receivers initiate requests to broadcasters
#[value(name = "reversed-sqmr")]
ReveresedSqmr,
}

impl Display for Mode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_possible_value().unwrap().get_name())
}
}

impl Display for NetworkProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_possible_value().unwrap().get_name())
}
}

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
/// ID for Prometheus logging
#[arg(short, long, env)]
pub id: u64,

/// Total number of nodes in the network
#[arg(long, env)]
pub num_nodes: u64,

/// The port to run the Prometheus metrics server on
#[arg(long, env)]
pub metric_port: u16,

/// The port to run the P2P network on
#[arg(short, env, long)]
pub p2p_port: u16,

/// The addresses of the bootstrap peers (can specify multiple)
#[arg(long, env, value_delimiter = ',')]
pub bootstrap: Vec<String>,

/// Set the verbosity level of the logger, the higher the more verbose
#[arg(short, long, env)]
pub verbosity: u8,

/// Buffer size for the broadcast topic
#[arg(long, env)]
pub buffer_size: usize,

/// The mode to use for the stress test.
#[arg(long, env)]
pub mode: Mode,

/// The network protocol to use for communication (default: gossipsub)
#[arg(long, env)]
pub network_protocol: NetworkProtocol,

/// Which node ID should do the broadcasting - for OneBroadcast and Explore modes
#[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "explore")]))]
pub broadcaster: Option<u64>,

/// Duration each node broadcasts before switching (in seconds) - for RoundRobin mode
#[arg(long, env, required_if_eq("mode", "rr"))]
pub round_duration_seconds: Option<u64>,

/// Size of StressTestMessage
#[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "all"), ("mode", "rr")]))]
pub message_size_bytes: Option<usize>,

/// The time to sleep between broadcasts of StressTestMessage in milliseconds
#[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "all"), ("mode", "rr")]))]
pub heartbeat_millis: Option<u64>,

/// Cool down duration between configuration changes in seconds - for Explore mode
#[arg(long, env, required_if_eq("mode", "explore"))]
pub explore_cool_down_duration_seconds: Option<u64>,

/// Duration to run each configuration in seconds - for Explore mode
#[arg(long, env, required_if_eq("mode", "explore"))]
pub explore_run_duration_seconds: Option<u64>,

/// Minimum throughput in bytes per second - for Explore mode
#[arg(long, env, required_if_eq("mode", "explore"))]
pub explore_min_throughput_byte_per_seconds: Option<f64>,

/// Minimum message size in bytes - for Explore mode
#[arg(long, env, required_if_eq("mode", "explore"))]
pub explore_min_message_size_bytes: Option<usize>,

/// The timeout in seconds for the node.
/// When the node runs for longer than this, it will be killed.
#[arg(long, env)]
pub timeout: u64,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::time::Duration;

use crate::args::Args;
use crate::metrics::{get_throughput, seconds_since_epoch};

const EXPLORE_MESSAGE_SIZES_BYTES: [usize; 13] = [
1 << 10,
1 << 11,
1 << 12,
1 << 13,
1 << 14,
1 << 15,
1 << 16,
1 << 17,
1 << 18,
1 << 19,
1 << 20,
1 << 21,
1 << 22,
];
const EXPLORE_MESSAGE_HEARTBEAT_MILLIS: [u64; 16] =
[1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 100, 150, 200, 250, 500, 1000];

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ExplorePhase {
/// In cooldown period - no broadcasting should occur
CoolDown,
/// In running period - broadcasting should occur (if this node is the broadcaster)
Running,
}

#[derive(Clone)]
pub struct ExploreConfiguration {
sorted_configurations: Vec<(usize, Duration)>,
/// The broadcaster configuration index
configuration_index: usize,
/// Duration of the Running phase of the cycle
run_duration_seconds: u64,
/// Total duration for one complete cycle (cooldown + run_duration_seconds)
cycle_duration_seconds: u64,
}

impl ExploreConfiguration {
pub fn new(
cool_down_duration_seconds: u64,
run_duration_seconds: u64,
min_throughput_byte_per_seconds: f64,
min_message_size_bytes: usize,
) -> ExploreConfiguration {
let mut sorted_configurations = Vec::with_capacity(
EXPLORE_MESSAGE_SIZES_BYTES.len() * EXPLORE_MESSAGE_HEARTBEAT_MILLIS.len(),
);
for message_size in EXPLORE_MESSAGE_SIZES_BYTES {
for heartbeat_millis in EXPLORE_MESSAGE_HEARTBEAT_MILLIS {
sorted_configurations.push((message_size, Duration::from_millis(heartbeat_millis)));
}
}
sorted_configurations.retain(|(size, duration)| {
*size >= min_message_size_bytes
&& get_throughput(*size, *duration) >= min_throughput_byte_per_seconds
});
sorted_configurations
.sort_by_cached_key(|(size, duration)| get_throughput(*size, *duration) as u64);

let cycle_duration_seconds = cool_down_duration_seconds + run_duration_seconds;

Self {
sorted_configurations,
configuration_index: 0,
run_duration_seconds,
cycle_duration_seconds,
}
}

/// Gets the current phase within the current configuration cycle
pub fn get_current_phase(&self) -> ExplorePhase {
let now_seconds = seconds_since_epoch();
let position_in_cycle_seconds = now_seconds % self.cycle_duration_seconds;

if position_in_cycle_seconds < self.run_duration_seconds {
ExplorePhase::Running
} else {
ExplorePhase::CoolDown
}
}

/// Gets the current message size and duration based on synchronized time
pub fn get_current_size_and_heartbeat(&mut self) -> (usize, Duration) {
let config_index = self.configuration_index;
self.configuration_index += 1;
if self.configuration_index >= self.sorted_configurations.len() {
self.configuration_index = 0;
}
self.sorted_configurations[config_index]
}
}

/// Extracts explore mode parameters from arguments with validation
pub fn extract_explore_params(args: &Args) -> (u64, u64, f64, usize) {
let cool_down = args
.explore_cool_down_duration_seconds
.expect("explore_cool_down_duration_seconds required for explore mode");
let run_duration = args
.explore_run_duration_seconds
.expect("explore_run_duration_seconds required for explore mode");
let min_throughput = args
.explore_min_throughput_byte_per_seconds
.expect("explore_min_throughput_byte_per_seconds required for explore mode");
let min_message_size = args
.explore_min_message_size_bytes
.expect("explore_min_message_size_bytes required for explore mode");

(cool_down, run_duration, min_throughput, min_message_size)
}
Loading
Loading