Skip to content

Commit 735f358

Browse files
apollo_network: broadcast network stress test draft
1 parent d6d6346 commit 735f358

27 files changed

+4431
-241
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ strum_macros.workspace = true
4242
sysinfo.workspace = true
4343
thiserror.workspace = true
4444
tokio = { workspace = true, features = ["full", "sync"] }
45+
tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] }
4546
tokio-retry.workspace = true
4647
tracing.workspace = true
4748
tracing-subscriber.workspace = true

crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md

Lines changed: 435 additions & 25 deletions
Large diffs are not rendered by default.
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use std::fmt::Display;
2+
3+
use clap::{Parser, ValueEnum};
4+
5+
#[derive(Debug, Clone, ValueEnum, PartialEq, Eq)]
6+
pub enum Mode {
7+
/// All nodes broadcast messages
8+
#[value(name = "all")]
9+
AllBroadcast,
10+
/// Only the node specified by --broadcaster broadcasts messages
11+
#[value(name = "one")]
12+
OneBroadcast,
13+
/// Nodes take turns broadcasting in round-robin fashion
14+
#[value(name = "rr")]
15+
RoundRobin,
16+
/// Only the node specified by --broadcaster broadcasts messages,
17+
/// Every explore_run_duration_seconds + explore_cool_down_duration_seconds seconds
18+
/// a new combination of MPS and message size is explored.
19+
/// Increases the throughput with each new trial.
20+
/// Configurations are filtered by minimum throughput and minimum message size.
21+
#[value(name = "explore")]
22+
Explore,
23+
}
24+
25+
#[derive(Debug, Clone, ValueEnum, PartialEq, Eq)]
26+
pub enum NetworkProtocol {
27+
/// Use gossipsub for broadcasting (default)
28+
#[value(name = "gossipsub")]
29+
Gossipsub,
30+
/// Use SQMR for point-to-point communication
31+
#[value(name = "sqmr")]
32+
Sqmr,
33+
/// Use Reversed SQMR where receivers initiate requests to broadcasters
34+
#[value(name = "reversed-sqmr")]
35+
ReveresedSqmr,
36+
}
37+
38+
impl Display for Mode {
39+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40+
write!(f, "{}", self.to_possible_value().unwrap().get_name())
41+
}
42+
}
43+
44+
impl Display for NetworkProtocol {
45+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46+
write!(f, "{}", self.to_possible_value().unwrap().get_name())
47+
}
48+
}
49+
50+
#[derive(Parser, Debug, Clone)]
51+
#[command(version, about, long_about = None)]
52+
pub struct Args {
53+
/// ID for Prometheus logging
54+
#[arg(short, long, env)]
55+
pub id: u64,
56+
57+
/// Total number of nodes in the network
58+
#[arg(long, env)]
59+
pub num_nodes: u64,
60+
61+
/// The port to run the Prometheus metrics server on
62+
#[arg(long, env)]
63+
pub metric_port: u16,
64+
65+
/// The port to run the P2P network on
66+
#[arg(short, env, long)]
67+
pub p2p_port: u16,
68+
69+
/// The addresses of the bootstrap peers (can specify multiple)
70+
#[arg(long, env, value_delimiter = ',')]
71+
pub bootstrap: Vec<String>,
72+
73+
/// Set the verbosity level of the logger, the higher the more verbose
74+
#[arg(short, long, env)]
75+
pub verbosity: u8,
76+
77+
/// Buffer size for the broadcast topic
78+
#[arg(long, env)]
79+
pub buffer_size: usize,
80+
81+
/// The mode to use for the stress test.
82+
#[arg(long, env)]
83+
pub mode: Mode,
84+
85+
/// The network protocol to use for communication (default: gossipsub)
86+
#[arg(long, env)]
87+
pub network_protocol: NetworkProtocol,
88+
89+
/// Which node ID should do the broadcasting - for OneBroadcast and Explore modes
90+
#[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "explore")]))]
91+
pub broadcaster: Option<u64>,
92+
93+
/// Duration each node broadcasts before switching (in seconds) - for RoundRobin mode
94+
#[arg(long, env, required_if_eq("mode", "rr"))]
95+
pub round_duration_seconds: Option<u64>,
96+
97+
/// Size of StressTestMessage
98+
#[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "all"), ("mode", "rr")]))]
99+
pub message_size_bytes: Option<usize>,
100+
101+
/// The time to sleep between broadcasts of StressTestMessage in milliseconds
102+
#[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "all"), ("mode", "rr")]))]
103+
pub heartbeat_millis: Option<u64>,
104+
105+
/// Cool down duration between configuration changes in seconds - for Explore mode
106+
#[arg(long, env, required_if_eq("mode", "explore"))]
107+
pub explore_cool_down_duration_seconds: Option<u64>,
108+
109+
/// Duration to run each configuration in seconds - for Explore mode
110+
#[arg(long, env, required_if_eq("mode", "explore"))]
111+
pub explore_run_duration_seconds: Option<u64>,
112+
113+
/// Minimum throughput in bytes per second - for Explore mode
114+
#[arg(long, env, required_if_eq("mode", "explore"))]
115+
pub explore_min_throughput_byte_per_seconds: Option<f64>,
116+
117+
/// Minimum message size in bytes - for Explore mode
118+
#[arg(long, env, required_if_eq("mode", "explore"))]
119+
pub explore_min_message_size_bytes: Option<usize>,
120+
121+
/// The timeout in seconds for the node.
122+
/// When the node runs for longer than this, it will be killed.
123+
#[arg(long, env)]
124+
pub timeout: u64,
125+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use std::time::Duration;
2+
3+
use crate::args::Args;
4+
use crate::metrics::{get_throughput, seconds_since_epoch};
5+
6+
const EXPLORE_MESSAGE_SIZES_BYTES: [usize; 13] = [
7+
1 << 10,
8+
1 << 11,
9+
1 << 12,
10+
1 << 13,
11+
1 << 14,
12+
1 << 15,
13+
1 << 16,
14+
1 << 17,
15+
1 << 18,
16+
1 << 19,
17+
1 << 20,
18+
1 << 21,
19+
1 << 22,
20+
];
21+
const EXPLORE_MESSAGE_HEARTBEAT_MILLIS: [u64; 16] =
22+
[1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 100, 150, 200, 250, 500, 1000];
23+
24+
#[derive(Debug, Clone, Copy, PartialEq)]
25+
pub enum ExplorePhase {
26+
/// In cooldown period - no broadcasting should occur
27+
CoolDown,
28+
/// In running period - broadcasting should occur (if this node is the broadcaster)
29+
Running,
30+
}
31+
32+
#[derive(Clone)]
33+
pub struct ExploreConfiguration {
34+
sorted_configurations: Vec<(usize, Duration)>,
35+
/// The broadcaster configuration index
36+
configuration_index: usize,
37+
/// Duration of the Running phase of the cycle
38+
run_duration_seconds: u64,
39+
/// Total duration for one complete cycle (cooldown + run_duration_seconds)
40+
cycle_duration_seconds: u64,
41+
}
42+
43+
impl ExploreConfiguration {
44+
pub fn new(
45+
cool_down_duration_seconds: u64,
46+
run_duration_seconds: u64,
47+
min_throughput_byte_per_seconds: f64,
48+
min_message_size_bytes: usize,
49+
) -> ExploreConfiguration {
50+
let mut sorted_configurations = Vec::with_capacity(
51+
EXPLORE_MESSAGE_SIZES_BYTES.len() * EXPLORE_MESSAGE_HEARTBEAT_MILLIS.len(),
52+
);
53+
for message_size in EXPLORE_MESSAGE_SIZES_BYTES {
54+
for heartbeat_millis in EXPLORE_MESSAGE_HEARTBEAT_MILLIS {
55+
sorted_configurations.push((message_size, Duration::from_millis(heartbeat_millis)));
56+
}
57+
}
58+
sorted_configurations.retain(|(size, duration)| {
59+
*size >= min_message_size_bytes
60+
&& get_throughput(*size, *duration) >= min_throughput_byte_per_seconds
61+
});
62+
sorted_configurations
63+
.sort_by_cached_key(|(size, duration)| get_throughput(*size, *duration) as u64);
64+
65+
let cycle_duration_seconds = cool_down_duration_seconds + run_duration_seconds;
66+
67+
Self {
68+
sorted_configurations,
69+
configuration_index: 0,
70+
run_duration_seconds,
71+
cycle_duration_seconds,
72+
}
73+
}
74+
75+
/// Gets the current phase within the current configuration cycle
76+
pub fn get_current_phase(&self) -> ExplorePhase {
77+
let now_seconds = seconds_since_epoch();
78+
let position_in_cycle_seconds = now_seconds % self.cycle_duration_seconds;
79+
80+
if position_in_cycle_seconds < self.run_duration_seconds {
81+
ExplorePhase::Running
82+
} else {
83+
ExplorePhase::CoolDown
84+
}
85+
}
86+
87+
/// Gets the current message size and duration based on synchronized time
88+
pub fn get_current_size_and_heartbeat(&mut self) -> (usize, Duration) {
89+
let config_index = self.configuration_index;
90+
self.configuration_index += 1;
91+
if self.configuration_index >= self.sorted_configurations.len() {
92+
self.configuration_index = 0;
93+
}
94+
self.sorted_configurations[config_index]
95+
}
96+
}
97+
98+
/// Extracts explore mode parameters from arguments with validation
99+
pub fn extract_explore_params(args: &Args) -> (u64, u64, f64, usize) {
100+
let cool_down = args
101+
.explore_cool_down_duration_seconds
102+
.expect("explore_cool_down_duration_seconds required for explore mode");
103+
let run_duration = args
104+
.explore_run_duration_seconds
105+
.expect("explore_run_duration_seconds required for explore mode");
106+
let min_throughput = args
107+
.explore_min_throughput_byte_per_seconds
108+
.expect("explore_min_throughput_byte_per_seconds required for explore mode");
109+
let min_message_size = args
110+
.explore_min_message_size_bytes
111+
.expect("explore_min_message_size_bytes required for explore mode");
112+
113+
(cool_down, run_duration, min_throughput, min_message_size)
114+
}

0 commit comments

Comments
 (0)