Skip to content

Commit 5bb726f

Browse files
committed
feat: add back path congestion metrics
1 parent dc0c8de commit 5bb726f

File tree

3 files changed

+226
-60
lines changed

3 files changed

+226
-60
lines changed

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ use crate::{
3737

3838
mod guarded_channel;
3939

40+
#[cfg(feature = "metrics")]
41+
mod metrics;
42+
4043
// TODO: use this
4144
// /// Number of addresses that are not active that we keep around per endpoint.
4245
// ///
@@ -75,6 +78,9 @@ const APPLICATION_ABANDON_PATH: u8 = 30;
7578
/// in a high frequency, and to keep data about previous path around for subsequent connections.
7679
const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
7780

81+
/// Interval in which connection and path metrics are emitted.
82+
const METRICS_INTERVAL: Duration = Duration::from_secs(10);
83+
7884
/// A stream of events from all paths for all connections.
7985
///
8086
/// The connection is identified using [`ConnId`]. The event `Err` variant happens when the
@@ -220,6 +226,7 @@ impl EndpointStateActor {
220226
trace!("actor started");
221227
let idle_timeout = MaybeFuture::None;
222228
tokio::pin!(idle_timeout);
229+
let mut metrics_interval = time::interval(METRICS_INTERVAL);
223230
loop {
224231
let scheduled_path_open = match self.scheduled_open_path {
225232
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
@@ -262,6 +269,9 @@ impl EndpointStateActor {
262269
self.scheduled_holepunch = None;
263270
self.trigger_holepunching().await;
264271
}
272+
_ = metrics_interval.tick() => {
273+
self.record_metrics();
274+
}
265275
_ = &mut idle_timeout => {
266276
if self.connections.is_empty() && inbox.close_if_idle() {
267277
trace!("idle timeout expired and still idle: terminate actor");
@@ -384,7 +394,8 @@ impl EndpointStateActor {
384394
paths: Default::default(),
385395
open_paths: Default::default(),
386396
path_ids: Default::default(),
387-
transport_summary: TransportSummary::default(),
397+
#[cfg(feature = "metrics")]
398+
metrics: Default::default(),
388399
})
389400
.into_mut();
390401

@@ -571,8 +582,10 @@ impl EndpointStateActor {
571582

572583
fn handle_connection_close(&mut self, conn_id: ConnId) {
573584
if let Some(state) = self.connections.remove(&conn_id) {
574-
self.metrics.num_conns_closed.inc();
575-
state.transport_summary.record(&self.metrics);
585+
#[cfg(feature = "metrics")]
586+
state.metrics.record_closed(&self.metrics);
587+
#[cfg(not(feature = "metrics"))]
588+
let _ = state;
576589
}
577590
if self.connections.is_empty() {
578591
trace!("last connection closed - clearing selected_path");
@@ -1021,6 +1034,13 @@ impl EndpointStateActor {
10211034
}
10221035
}
10231036
}
1037+
1038+
fn record_metrics(&mut self) {
1039+
#[cfg(feature = "metrics")]
1040+
for state in self.connections.values_mut() {
1041+
state.record_metrics_periodic(&self.metrics, self.selected_path.get());
1042+
}
1043+
}
10241044
}
10251045

10261046
/// Messages to send to the [`EndpointStateActor`].
@@ -1123,8 +1143,11 @@ struct ConnectionState {
11231143
open_paths: FxHashMap<PathId, transports::Addr>,
11241144
/// Reverse map of [`Self::paths].
11251145
path_ids: FxHashMap<transports::Addr, PathId>,
1126-
/// Summary over transports used in this connection, for metrics tracking.
1127-
transport_summary: TransportSummary,
1146+
/// Tracker for stateful metrics for this connection and its paths
1147+
///
1148+
/// Feature-gated on the `metrics` feature because it increases memory use.
1149+
#[cfg(feature = "metrics")]
1150+
metrics: self::metrics::MetricsTracker,
11281151
}
11291152

11301153
impl ConnectionState {
@@ -1136,7 +1159,8 @@ impl ConnectionState {
11361159

11371160
/// Tracks an open path for the connection.
11381161
fn add_open_path(&mut self, remote: transports::Addr, path_id: PathId) {
1139-
self.transport_summary.add_path(&remote);
1162+
#[cfg(feature = "metrics")]
1163+
self.metrics.add_path(path_id, &remote);
11401164
self.paths.insert(path_id, remote.clone());
11411165
self.open_paths.insert(path_id, remote.clone());
11421166
self.path_ids.insert(remote, path_id);
@@ -1149,11 +1173,15 @@ impl ConnectionState {
11491173
self.path_ids.remove(&addr);
11501174
}
11511175
self.open_paths.remove(path_id);
1176+
#[cfg(feature = "metrics")]
1177+
self.metrics.remove_path(path_id);
11521178
}
11531179

11541180
/// Removes the path from the open paths.
11551181
fn remove_open_path(&mut self, path_id: &PathId) {
11561182
self.open_paths.remove(path_id);
1183+
#[cfg(feature = "metrics")]
1184+
self.metrics.remove_path(path_id);
11571185

11581186
self.update_pub_path_info();
11591187
}
@@ -1173,6 +1201,19 @@ impl ConnectionState {
11731201

11741202
self.pub_open_paths.set(new).ok();
11751203
}
1204+
1205+
#[cfg(feature = "metrics")]
1206+
fn record_metrics_periodic(
1207+
&mut self,
1208+
metrics: &MagicsockMetrics,
1209+
selected_path: Option<transports::Addr>,
1210+
) {
1211+
let Some(conn) = self.handle.upgrade() else {
1212+
return;
1213+
};
1214+
self.metrics
1215+
.record_periodic(metrics, &conn, &self.open_paths, selected_path);
1216+
}
11761217
}
11771218

11781219
/// Watcher for the open paths and selected transmission path in a connection.
@@ -1383,41 +1424,3 @@ impl Future for OnClosed {
13831424
Poll::Ready(self.conn_id)
13841425
}
13851426
}
1386-
1387-
/// Used for metrics tracking.
1388-
#[derive(Debug, Clone, Copy, Default)]
1389-
enum TransportSummary {
1390-
#[default]
1391-
None,
1392-
IpOnly,
1393-
RelayOnly,
1394-
IpAndRelay,
1395-
}
1396-
1397-
impl TransportSummary {
1398-
fn add_path(&mut self, addr: &transports::Addr) {
1399-
use transports::Addr;
1400-
*self = match (*self, addr) {
1401-
(TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly,
1402-
(TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => {
1403-
Self::RelayOnly
1404-
}
1405-
_ => Self::IpAndRelay,
1406-
}
1407-
}
1408-
1409-
fn record(&self, metrics: &MagicsockMetrics) {
1410-
match self {
1411-
TransportSummary::IpOnly => {
1412-
metrics.num_conns_transport_ip_only.inc();
1413-
}
1414-
TransportSummary::RelayOnly => {
1415-
metrics.num_conns_transport_relay_only.inc();
1416-
}
1417-
TransportSummary::IpAndRelay => {
1418-
metrics.num_conns_transport_ip_and_relay.inc();
1419-
}
1420-
TransportSummary::None => {}
1421-
}
1422-
}
1423-
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
//! Tracker for stateful metrics for connections and paths.
2+
3+
use std::time::Duration;
4+
5+
use quinn_proto::PathId;
6+
use rustc_hash::FxHashMap;
7+
8+
use crate::{magicsock::transports, metrics::MagicsockMetrics};
9+
10+
#[derive(Debug, Default)]
11+
pub(super) struct MetricsTracker {
12+
transport_summary: TransportSummary,
13+
path_rtt_variance: FxHashMap<PathId, RttVariance>,
14+
}
15+
16+
impl MetricsTracker {
17+
pub(super) fn add_path(&mut self, path_id: PathId, remote: &transports::Addr) {
18+
self.transport_summary.add_path(remote);
19+
self.path_rtt_variance.insert(path_id, Default::default());
20+
}
21+
22+
pub(super) fn remove_path(&mut self, path_id: &PathId) {
23+
self.path_rtt_variance.remove(path_id);
24+
}
25+
26+
pub(super) fn record_periodic(
27+
&mut self,
28+
metrics: &MagicsockMetrics,
29+
conn: &quinn::Connection,
30+
path_remotes: &FxHashMap<PathId, transports::Addr>,
31+
selected_path: Option<transports::Addr>,
32+
) {
33+
for (path_id, remote) in path_remotes.iter() {
34+
let Some(stats) = conn.path_stats(*path_id) else {
35+
continue;
36+
};
37+
38+
let loss_rate = if stats.sent_packets == 0 {
39+
0.0
40+
} else {
41+
stats.lost_packets as f64 / stats.sent_packets as f64
42+
};
43+
metrics.path_packet_loss_rate.observe(loss_rate);
44+
45+
if Some(remote) == selected_path.as_ref() {
46+
metrics
47+
.connection_latency_ms
48+
.observe(stats.rtt.as_millis() as f64);
49+
}
50+
51+
let Some(rtt_variance) = self.path_rtt_variance.get_mut(path_id) else {
52+
continue;
53+
};
54+
rtt_variance.add_rtt_sample(stats.rtt);
55+
if let Some(variance) = rtt_variance.rtt_variance() {
56+
metrics
57+
.path_rtt_variance_ms
58+
.observe(variance.as_millis() as f64);
59+
}
60+
61+
let quality = rtt_variance.quality_score(loss_rate);
62+
metrics.path_quality_score.observe(quality);
63+
}
64+
}
65+
66+
pub(super) fn record_closed(&self, metrics: &MagicsockMetrics) {
67+
metrics.num_conns_closed.inc();
68+
match self.transport_summary {
69+
TransportSummary::IpOnly => {
70+
metrics.num_conns_transport_ip_only.inc();
71+
}
72+
TransportSummary::RelayOnly => {
73+
metrics.num_conns_transport_relay_only.inc();
74+
}
75+
TransportSummary::IpAndRelay => {
76+
metrics.num_conns_transport_ip_and_relay.inc();
77+
}
78+
TransportSummary::None => {}
79+
}
80+
}
81+
}
82+
83+
/// Tracks RTT variance over time, as a congestion marker.
84+
#[derive(Debug, Default)]
85+
struct RttVariance {
86+
/// Rolling window of recent latency measurements (stores up to 8 samples).
87+
samples: [Option<Duration>; 8],
88+
/// Index for next sample insertion (circular buffer).
89+
index: usize,
90+
}
91+
92+
impl RttVariance {
93+
fn add_rtt_sample(&mut self, rtt: Duration) {
94+
self.samples[self.index] = Some(rtt);
95+
self.index = (self.index + 1) % self.samples.len();
96+
}
97+
98+
/// Calculate RTT variance as a congestion indicator.
99+
/// Higher variance suggests congestion or unstable path.
100+
fn rtt_variance(&self) -> Option<Duration> {
101+
let samples: Vec<Duration> = self.samples.iter().filter_map(|&s| s).collect();
102+
103+
if samples.len() < 2 {
104+
return None;
105+
}
106+
107+
let mean = samples.iter().sum::<Duration>() / samples.len() as u32;
108+
let variance: f64 = samples
109+
.iter()
110+
.map(|&s| {
111+
let diff = s.as_secs_f64() - mean.as_secs_f64();
112+
diff * diff
113+
})
114+
.sum::<f64>()
115+
/ samples.len() as f64;
116+
117+
Some(Duration::from_secs_f64(variance.sqrt()))
118+
}
119+
120+
/// Path quality score (0.0 = worst, 1.0 = best).
121+
/// Factors in packet loss and RTT variance.
122+
fn quality_score(&self, packet_loss: f64) -> f64 {
123+
let loss_penalty = (1.0 - packet_loss).clamp(0.0, 1.0);
124+
125+
// Penalize high RTT variance
126+
let variance_penalty = match self.rtt_variance() {
127+
Some(var) if var.as_millis() > 50 => 0.7,
128+
Some(var) if var.as_millis() > 20 => 0.85,
129+
Some(_) => 1.0,
130+
None => 1.0,
131+
};
132+
133+
loss_penalty * variance_penalty
134+
}
135+
}
136+
137+
/// Used for metrics tracking.
138+
#[derive(Debug, Clone, Copy, Default)]
139+
enum TransportSummary {
140+
#[default]
141+
None,
142+
IpOnly,
143+
RelayOnly,
144+
IpAndRelay,
145+
}
146+
147+
impl TransportSummary {
148+
fn add_path(&mut self, addr: &transports::Addr) {
149+
use transports::Addr;
150+
*self = match (*self, addr) {
151+
(TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly,
152+
(TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => {
153+
Self::RelayOnly
154+
}
155+
_ => Self::IpAndRelay,
156+
}
157+
}
158+
}

iroh/src/magicsock/metrics.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use iroh_metrics::{Counter, MetricsGroup};
1+
use iroh_metrics::{Counter, Histogram, MetricsGroup};
22
use serde::{Deserialize, Serialize};
33

44
/// Enum of metrics for the module
@@ -14,8 +14,11 @@ pub struct Metrics {
1414
pub send_ipv4: Counter,
1515
pub send_ipv6: Counter,
1616
pub send_relay: Counter,
17+
pub send_relay_error: Counter,
1718

1819
// Data packets (non-disco)
20+
pub send_data: Counter,
21+
pub send_data_network_down: Counter,
1922
pub recv_data_relay: Counter,
2023
pub recv_data_ipv4: Counter,
2124
pub recv_data_ipv6: Counter,
@@ -69,25 +72,27 @@ pub struct Metrics {
6972
pub actor_tick_direct_addr_heartbeat: Counter,
7073
pub actor_link_change: Counter,
7174
pub actor_tick_other: Counter,
72-
// /// Histogram of connection latency in milliseconds across all endpoint connections.
73-
// #[default(Histogram::new(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY]))]
74-
// pub connection_latency_ms: Histogram,
75-
// /*
76-
// * Path Congestion Metrics
77-
// */
75+
76+
/// Histogram of connection latency in milliseconds across all endpoint connections.
77+
#[default(Histogram::new(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY]))]
78+
pub connection_latency_ms: Histogram,
79+
80+
/*
81+
* Path Congestion Metrics
82+
*/
7883
// /// Number of times a path was marked as outdated due to consecutive ping failures.
7984
// pub path_marked_outdated: Counter,
8085
// /// Number of ping failures recorded across all paths.
8186
// pub path_ping_failures: Counter,
8287
// /// Number of consecutive failure resets (path recovered).
8388
// pub path_failure_resets: Counter,
84-
// /// Histogram of packet loss rates (0.0-1.0) observed on UDP paths.
85-
// #[default(Histogram::new(vec![0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 1.0]))]
86-
// pub path_packet_loss_rate: Histogram,
87-
// /// Histogram of RTT variance (in milliseconds) as a congestion indicator.
88-
// #[default(Histogram::new(vec![0.0, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]))]
89-
// pub path_rtt_variance_ms: Histogram,
90-
// /// Histogram of path quality scores (0.0-1.0).
91-
// #[default(Histogram::new(vec![0.0, 0.3, 0.5, 0.7, 0.85, 0.95, 1.0]))]
92-
// pub path_quality_score: Histogram,
89+
/// Histogram of packet loss rates (0.0-1.0) observed on UDP paths.
90+
#[default(Histogram::new(vec![0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 1.0]))]
91+
pub path_packet_loss_rate: Histogram,
92+
/// Histogram of RTT variance (in milliseconds) as a congestion indicator.
93+
#[default(Histogram::new(vec![0.0, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]))]
94+
pub path_rtt_variance_ms: Histogram,
95+
/// Histogram of path quality scores (0.0-1.0).
96+
#[default(Histogram::new(vec![0.0, 0.3, 0.5, 0.7, 0.85, 0.95, 1.0]))]
97+
pub path_quality_score: Histogram,
9398
}

0 commit comments

Comments
 (0)