Skip to content

Commit 00dcd1d

Browse files
apollo_network: broadcast network stress test metrics
1 parent a1d81e6 commit 00dcd1d

File tree

5 files changed

+276
-16
lines changed

5 files changed

+276
-16
lines changed

Cargo.lock

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ statistical = "1.0.0"
360360
strum = "0.25.0"
361361
strum_macros = "0.25.2"
362362
syn = "2.0.39"
363+
sysinfo = "0.37.1"
363364
tar = "0.4.38"
364365
tempfile = "3.7.0"
365366
test-case = "3.2.1"

crates/apollo_network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ serde = { workspace = true, features = ["derive"] }
3939
starknet_api.workspace = true
4040
strum.workspace = true
4141
strum_macros.workspace = true
42+
sysinfo.workspace = true
4243
thiserror.workspace = true
4344
tokio = { workspace = true, features = ["full", "sync"] }
4445
tokio-retry.workspace = true

crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use futures::future::join_all;
2121
use futures::StreamExt;
2222
use libp2p::gossipsub::{Sha256Topic, Topic};
2323
use libp2p::Multiaddr;
24-
use metrics::{counter, gauge};
24+
// use metrics::{counter, gauge};
2525
use metrics_exporter_prometheus::PrometheusBuilder;
2626
use tokio::time::Duration;
2727
use tracing::{info, trace, Level};
@@ -30,6 +30,7 @@ use tracing::{info, trace, Level};
3030
mod converters_test;
3131

3232
mod converters;
33+
pub mod metrics;
3334
mod utils;
3435

3536
lazy_static::lazy_static! {
@@ -94,7 +95,7 @@ async fn send_stress_test_messages(
9495
message.metadata.message_index = i;
9596
broadcast_topic_client.broadcast_message(message.clone()).await.unwrap();
9697
trace!("Sent message {i}: {:?}", message);
97-
counter!("sent_messages").increment(1);
98+
// counter!("sent_messages").increment(1);
9899
tokio::time::sleep(duration).await;
99100
}
100101
}
@@ -112,24 +113,25 @@ fn receive_stress_test_message(
112113
Err(_) => panic!("Got a negative duration, the clocks are not synced!"),
113114
};
114115

115-
let delay_seconds = duration.as_secs_f64();
116-
let delay_micros = duration.as_micros().try_into().unwrap();
116+
let _delay_seconds = duration.as_secs_f64();
117+
// let delay_micros = duration.as_micros().try_into().unwrap();
117118

118119
// TODO(AndrewL): Concentrate all string metrics to constants in a different file
119-
counter!("message_received").increment(1);
120-
counter!(format!("message_received_from_{}", received_message.metadata.sender_id)).increment(1);
120+
// counter!("message_received").increment(1);
121+
// counter!(format!("message_received_from_{}",
122+
// received_message.metadata.sender_id)).increment(1);
121123

122124
// TODO(AndrewL): This should be a historgram
123-
gauge!("message_received_delay_seconds").set(delay_seconds);
124-
gauge!(format!("message_received_delay_seconds_from_{}", received_message.metadata.sender_id))
125-
.set(delay_seconds);
126-
127-
counter!("message_received_delay_micros_sum").increment(delay_micros);
128-
counter!(format!(
129-
"message_received_delay_micros_sum_from_{}",
130-
received_message.metadata.sender_id
131-
))
132-
.increment(delay_micros);
125+
// gauge!("message_received_delay_seconds").set(delay_seconds);
126+
// gauge!(format!("message_received_delay_seconds_from_{}",
127+
// received_message.metadata.sender_id)) .set(delay_seconds);
128+
129+
// counter!("message_received_delay_micros_sum").increment(delay_micros);
130+
// counter!(format!(
131+
// "message_received_delay_micros_sum_from_{}",
132+
// received_message.metadata.sender_id
133+
// ))
134+
// .increment(delay_micros);
133135
// TODO(AndrewL): Figure out what to log here
134136
}
135137

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
use std::collections::HashMap;
2+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
3+
4+
use apollo_metrics::define_metrics;
5+
use apollo_metrics::metrics::LossyIntoF64;
6+
use apollo_network::network_manager::metrics::{
7+
BroadcastNetworkMetrics,
8+
EventMetrics,
9+
NetworkMetrics,
10+
SqmrNetworkMetrics,
11+
EVENT_TYPE_LABELS,
12+
NETWORK_BROADCAST_DROP_LABELS,
13+
};
14+
use libp2p::gossipsub::{Sha256Topic, Topic};
15+
use sysinfo::{Networks, System};
16+
use tokio::time::interval;
17+
use tracing::warn;
18+
19+
use crate::converters::StressTestMessage;
20+
21+
lazy_static::lazy_static! {
22+
pub static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string());
23+
}
24+
25+
define_metrics!(
26+
Infra => {
27+
MetricGauge { BROADCAST_MESSAGE_HEARTBEAT_MILLIS, "broadcast_message_theoretical_heartbeat_millis", "The number of ms we sleep between each two consecutive broadcasts" },
28+
MetricGauge { BROADCAST_MESSAGE_THROUGHPUT, "broadcast_message_theoretical_throughput", "Throughput in bytes/second of the broadcasted " },
29+
MetricGauge { BROADCAST_MESSAGE_BYTES, "broadcast_message_bytes", "Size of the stress test sent message in bytes" },
30+
MetricCounter { BROADCAST_MESSAGE_COUNT, "broadcast_message_count", "Number of stress test messages sent via broadcast", init = 0 },
31+
MetricCounter { BROADCAST_MESSAGE_BYTES_SUM, "broadcast_message_bytes_sum", "Sum of the stress test messages sent via broadcast", init = 0 },
32+
MetricHistogram { BROADCAST_MESSAGE_SEND_DELAY_SECONDS, "broadcast_message_send_delay_seconds", "Message sending delay in seconds" },
33+
34+
MetricGauge { RECEIVE_MESSAGE_BYTES, "receive_message_bytes", "Size of the stress test received message in bytes" },
35+
MetricCounter { RECEIVE_MESSAGE_COUNT, "receive_message_count", "Number of stress test messages received via broadcast", init = 0 },
36+
MetricCounter { RECEIVE_MESSAGE_BYTES_SUM, "receive_message_bytes_sum", "Sum of the stress test messages received via broadcast", init = 0 },
37+
MetricHistogram { RECEIVE_MESSAGE_DELAY_SECONDS, "receive_message_delay_seconds", "Message delay in seconds" },
38+
MetricHistogram { RECEIVE_MESSAGE_NEGATIVE_DELAY_SECONDS, "receive_message_negative_delay_seconds", "Negative message delay in seconds" },
39+
40+
MetricGauge { NETWORK_CONNECTED_PEERS, "network_connected_peers", "Number of connected peers in the network" },
41+
MetricGauge { NETWORK_BLACKLISTED_PEERS, "network_blacklisted_peers", "Number of blacklisted peers in the network" },
42+
MetricGauge { NETWORK_ACTIVE_INBOUND_SESSIONS, "network_active_inbound_sessions", "Number of active inbound SQMR sessions" },
43+
MetricGauge { NETWORK_ACTIVE_OUTBOUND_SESSIONS, "network_active_outbound_sessions", "Number of active outbound SQMR sessions" },
44+
MetricCounter { NETWORK_STRESS_TEST_SENT_MESSAGES, "network_stress_test_sent_messages", "Number of stress test messages sent via broadcast", init = 0 },
45+
MetricCounter { NETWORK_STRESS_TEST_RECEIVED_MESSAGES, "network_stress_test_received_messages", "Number of stress test messages received via broadcast", init = 0 },
46+
47+
MetricGauge { SYSTEM_PROCESS_CPU_USAGE_PERCENT, "system_process_cpu_usage_percent", "CPU usage percentage of the current process" },
48+
MetricGauge { SYSTEM_PROCESS_MEMORY_USAGE_BYTES, "system_process_memory_usage_bytes", "Memory usage in bytes of the current process" },
49+
MetricGauge { SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES, "system_process_virtual_memory_usage_bytes", "Virtual memory usage in bytes of the current process" },
50+
MetricGauge { SYSTEM_NETWORK_BYTES_SENT_TOTAL, "system_network_bytes_sent_total", "Total bytes sent across all network interfaces since system start" },
51+
MetricGauge { SYSTEM_NETWORK_BYTES_RECEIVED_TOTAL, "system_network_bytes_received_total", "Total bytes received across all network interfaces since system start" },
52+
MetricGauge { SYSTEM_NETWORK_BYTES_SENT_CURRENT, "system_network_bytes_sent_current", "Bytes sent across all network interfaces since last measurement" },
53+
MetricGauge { SYSTEM_NETWORK_BYTES_RECEIVED_CURRENT, "system_network_bytes_received_current", "Bytes received across all network interfaces since last measurement" },
54+
MetricGauge { SYSTEM_TOTAL_MEMORY_BYTES, "system_total_memory_bytes", "Total system memory in bytes" },
55+
MetricGauge { SYSTEM_AVAILABLE_MEMORY_BYTES, "system_available_memory_bytes", "Available system memory in bytes" },
56+
MetricGauge { SYSTEM_USED_MEMORY_BYTES, "system_used_memory_bytes", "Used system memory in bytes" },
57+
MetricGauge { SYSTEM_CPU_COUNT, "system_cpu_count", "Number of logical CPU cores in the system" },
58+
59+
MetricCounter { NETWORK_RESET_TOTAL, "network_reset_total", "Total number of network resets performed", init = 0 },
60+
LabeledMetricCounter { NETWORK_DROPPED_BROADCAST_MESSAGES, "network_dropped_broadcast_messages", "Number of dropped broadcast messages by reason", init = 0, labels = NETWORK_BROADCAST_DROP_LABELS },
61+
LabeledMetricCounter { NETWORK_EVENT_COUNTER, "network_event_counter", "Network events counter by type", init = 0, labels = EVENT_TYPE_LABELS },
62+
},
63+
);
64+
65+
pub fn update_broadcast_metrics(message_size_bytes: usize, broadcast_heartbeat: Duration) {
66+
BROADCAST_MESSAGE_HEARTBEAT_MILLIS.set(broadcast_heartbeat.as_millis().into_f64());
67+
BROADCAST_MESSAGE_THROUGHPUT.set(get_throughput(message_size_bytes, broadcast_heartbeat));
68+
}
69+
70+
pub fn receive_stress_test_message(received_message: Vec<u8>) {
71+
let end_time = SystemTime::now();
72+
73+
let received_message: StressTestMessage = received_message.into();
74+
let start_time = received_message.metadata.time;
75+
let delay_seconds = match end_time.duration_since(start_time) {
76+
Ok(duration) => duration.as_secs_f64(),
77+
Err(_) => {
78+
let negative_duration = start_time.duration_since(end_time).unwrap();
79+
-negative_duration.as_secs_f64()
80+
}
81+
};
82+
83+
// Use apollo_metrics for all metrics including labeled ones
84+
RECEIVE_MESSAGE_BYTES.set(received_message.len().into_f64());
85+
RECEIVE_MESSAGE_COUNT.increment(1);
86+
RECEIVE_MESSAGE_BYTES_SUM.increment(
87+
u64::try_from(received_message.len()).expect("Message length too large for u64"),
88+
);
89+
90+
// Use apollo_metrics histograms for latency measurements
91+
if delay_seconds.is_sign_positive() {
92+
RECEIVE_MESSAGE_DELAY_SECONDS.record(delay_seconds);
93+
} else {
94+
RECEIVE_MESSAGE_NEGATIVE_DELAY_SECONDS.record(-delay_seconds);
95+
}
96+
}
97+
98+
pub fn seconds_since_epoch() -> u64 {
99+
let now = SystemTime::now();
100+
now.duration_since(UNIX_EPOCH).unwrap().as_secs()
101+
}
102+
103+
/// Calculates the throughput given the message and how much to sleep between each two consecutive
104+
/// broadcasts
105+
pub fn get_throughput(message_size_bytes: usize, heartbeat_duration: Duration) -> f64 {
106+
let tps = Duration::from_secs(1).as_secs_f64() / heartbeat_duration.as_secs_f64();
107+
tps * message_size_bytes.into_f64()
108+
}
109+
110+
/// Creates comprehensive network metrics for monitoring the stress test network performance.
111+
/// Uses the lazy static metrics defined above.
112+
pub fn create_network_metrics() -> NetworkMetrics {
113+
// Create broadcast metrics for the stress test topic
114+
let stress_test_broadcast_metrics = BroadcastNetworkMetrics {
115+
num_sent_broadcast_messages: NETWORK_STRESS_TEST_SENT_MESSAGES,
116+
num_dropped_broadcast_messages: NETWORK_DROPPED_BROADCAST_MESSAGES,
117+
num_received_broadcast_messages: NETWORK_STRESS_TEST_RECEIVED_MESSAGES,
118+
};
119+
120+
// Create a map with broadcast metrics for our stress test topic
121+
let mut broadcast_metrics_by_topic = HashMap::new();
122+
broadcast_metrics_by_topic.insert(TOPIC.hash(), stress_test_broadcast_metrics);
123+
124+
// Create SQMR metrics for session monitoring
125+
let sqmr_metrics = SqmrNetworkMetrics {
126+
num_active_inbound_sessions: NETWORK_ACTIVE_INBOUND_SESSIONS,
127+
num_active_outbound_sessions: NETWORK_ACTIVE_OUTBOUND_SESSIONS,
128+
};
129+
130+
// Create event metrics for network events monitoring
131+
let event_metrics = EventMetrics { event_counter: NETWORK_EVENT_COUNTER };
132+
133+
NetworkMetrics {
134+
num_connected_peers: NETWORK_CONNECTED_PEERS,
135+
num_blacklisted_peers: NETWORK_BLACKLISTED_PEERS,
136+
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
137+
sqmr_metrics: Some(sqmr_metrics),
138+
event_metrics: Some(event_metrics),
139+
}
140+
}
141+
142+
pub async fn monitor_process_metrics(interval_seconds: u64) {
143+
let mut interval = interval(Duration::from_secs(interval_seconds));
144+
let current_pid = sysinfo::get_current_pid().expect("Failed to get current process PID");
145+
146+
// Initialize networks for network interface monitoring
147+
let mut networks = Networks::new_with_refreshed_list();
148+
149+
// Initialize empty system for CPU monitoring - we'll refresh only what we need
150+
let mut system = System::new_all();
151+
152+
loop {
153+
interval.tick().await;
154+
155+
// Refresh only the specific data we need
156+
// system.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());
157+
// system.refresh_cpu_specifics(CpuRefreshKind::new().with_cpu_usage());
158+
// system.refresh_processes_specifics(
159+
// ProcessesToUpdate::Some(&[current_pid]),
160+
// false,
161+
// ProcessRefreshKind::everything(),
162+
// );
163+
// system.refresh_specifics(
164+
// RefreshKind::new()
165+
// .with_cpu(CpuRefreshKind::everything())
166+
// .with_memory(MemoryRefreshKind::everything())
167+
// .with_processes(ProcessRefreshKind::new().spe),
168+
// );
169+
system.refresh_all();
170+
let total_memory: f64 = system.total_memory().into_f64();
171+
let available_memory: f64 = system.available_memory().into_f64();
172+
let used_memory: f64 = system.used_memory().into_f64();
173+
let cpu_count: f64 = system.cpus().len().into_f64();
174+
// let load_avg: f64 = system.load_average().one.into_f64();
175+
176+
SYSTEM_TOTAL_MEMORY_BYTES.set(total_memory);
177+
SYSTEM_AVAILABLE_MEMORY_BYTES.set(available_memory);
178+
SYSTEM_USED_MEMORY_BYTES.set(used_memory);
179+
SYSTEM_CPU_COUNT.set(cpu_count);
180+
181+
if let Some(process) = system.process(current_pid) {
182+
let cpu_usage: f64 = process.cpu_usage().into();
183+
let memory_usage: f64 = process.memory().into_f64();
184+
let virtual_memory_usage: f64 = process.virtual_memory().into_f64();
185+
186+
SYSTEM_PROCESS_CPU_USAGE_PERCENT.set(cpu_usage);
187+
SYSTEM_PROCESS_MEMORY_USAGE_BYTES.set(memory_usage);
188+
SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES.set(virtual_memory_usage);
189+
} else {
190+
warn!("Could not find process information for PID: {}", current_pid);
191+
}
192+
193+
// Refresh network statistics and collect metrics
194+
networks.refresh(false);
195+
196+
let mut total_bytes_sent: u64 = 0;
197+
let mut total_bytes_received: u64 = 0;
198+
let mut current_bytes_sent: u64 = 0;
199+
let mut current_bytes_received: u64 = 0;
200+
201+
for (_interface_name, data) in &networks {
202+
total_bytes_sent += data.total_transmitted();
203+
total_bytes_received += data.total_received();
204+
current_bytes_sent += data.transmitted();
205+
current_bytes_received += data.received();
206+
}
207+
208+
SYSTEM_NETWORK_BYTES_SENT_TOTAL.set(total_bytes_sent.into_f64());
209+
SYSTEM_NETWORK_BYTES_RECEIVED_TOTAL.set(total_bytes_received.into_f64());
210+
SYSTEM_NETWORK_BYTES_SENT_CURRENT.set(current_bytes_sent.into_f64());
211+
SYSTEM_NETWORK_BYTES_RECEIVED_CURRENT.set(current_bytes_received.into_f64());
212+
}
213+
}

0 commit comments

Comments
 (0)