Skip to content

Commit 2f4092f

Browse files
committed
feat: multipath metrics
1 parent 1a7a88b commit 2f4092f

File tree

3 files changed

+235
-29
lines changed

3 files changed

+235
-29
lines changed

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ use crate::{
3737

3838
mod guarded_channel;
3939

40+
#[cfg(feature = "metrics")]
41+
mod metrics;
42+
4043
// TODO: use this
4144
// /// Number of addresses that are not active that we keep around per endpoint.
4245
// ///
@@ -75,6 +78,9 @@ const APPLICATION_ABANDON_PATH: u8 = 30;
7578
/// in a high frequency, and to keep data about previous path around for subsequent connections.
7679
const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
7780

81+
/// Interval in which connection and path metrics are emitted.
82+
const METRICS_INTERVAL: Duration = Duration::from_secs(10);
83+
7884
/// A stream of events from all paths for all connections.
7985
///
8086
/// The connection is identified using [`ConnId`]. The event `Err` variant happens when the
@@ -220,6 +226,7 @@ impl EndpointStateActor {
220226
trace!("actor started");
221227
let idle_timeout = MaybeFuture::None;
222228
tokio::pin!(idle_timeout);
229+
let mut metrics_interval = time::interval(METRICS_INTERVAL);
223230
loop {
224231
let scheduled_path_open = match self.scheduled_open_path {
225232
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
@@ -243,11 +250,7 @@ impl EndpointStateActor {
243250
self.handle_path_event(id, evt);
244251
}
245252
Some(conn_id) = self.connections_close.next(), if !self.connections_close.is_empty() => {
246-
self.connections.remove(&conn_id);
247-
if self.connections.is_empty() {
248-
trace!("last connection closed - clearing selected_path");
249-
self.selected_path.set(None).ok();
250-
}
253+
self.handle_connection_close(conn_id);
251254
}
252255
_ = self.local_addrs.updated() => {
253256
trace!("local addrs updated, triggering holepunching");
@@ -266,6 +269,9 @@ impl EndpointStateActor {
266269
self.scheduled_holepunch = None;
267270
self.trigger_holepunching().await;
268271
}
272+
_ = metrics_interval.tick() => {
273+
self.record_metrics();
274+
}
269275
_ = &mut idle_timeout => {
270276
if self.connections.is_empty() && inbox.close_if_idle() {
271277
trace!("idle timeout expired and still idle: terminate actor");
@@ -369,6 +375,7 @@ impl EndpointStateActor {
369375
) {
370376
let pub_open_paths = Watchable::default();
371377
if let Some(conn) = handle.upgrade() {
378+
self.metrics.num_conns_opened.inc();
372379
// Remove any conflicting stable_ids from the local state.
373380
let conn_id = ConnId(conn.stable_id());
374381
self.connections.remove(&conn_id);
@@ -387,6 +394,8 @@ impl EndpointStateActor {
387394
paths: Default::default(),
388395
open_paths: Default::default(),
389396
path_ids: Default::default(),
397+
#[cfg(feature = "metrics")]
398+
metrics: Default::default(),
390399
})
391400
.into_mut();
392401

@@ -571,6 +580,19 @@ impl EndpointStateActor {
571580
tx.send(rtt).ok();
572581
}
573582

583+
fn handle_connection_close(&mut self, conn_id: ConnId) {
584+
if let Some(state) = self.connections.remove(&conn_id) {
585+
#[cfg(feature = "metrics")]
586+
state.metrics.record_closed(&self.metrics);
587+
#[cfg(not(feature = "metrics"))]
588+
let _ = state;
589+
}
590+
if self.connections.is_empty() {
591+
trace!("last connection closed - clearing selected_path");
592+
self.selected_path.set(None).ok();
593+
}
594+
}
595+
574596
/// Triggers holepunching to the remote endpoint.
575597
///
576598
/// This will manage the entire process of holepunching with the remote endpoint.
@@ -1012,6 +1034,13 @@ impl EndpointStateActor {
10121034
}
10131035
}
10141036
}
1037+
1038+
fn record_metrics(&mut self) {
1039+
#[cfg(feature = "metrics")]
1040+
for state in self.connections.values_mut() {
1041+
state.record_metrics_periodic(&self.metrics, self.selected_path.get());
1042+
}
1043+
}
10151044
}
10161045

10171046
/// Messages to send to the [`EndpointStateActor`].
@@ -1114,6 +1143,11 @@ struct ConnectionState {
11141143
open_paths: FxHashMap<PathId, transports::Addr>,
11151144
/// Reverse map of [`Self::paths].
11161145
path_ids: FxHashMap<transports::Addr, PathId>,
1146+
/// Tracker for stateful metrics for this connection and its paths
1147+
///
1148+
/// Feature-gated on the `metrics` feature because it increases memory use.
1149+
#[cfg(feature = "metrics")]
1150+
metrics: self::metrics::MetricsTracker,
11171151
}
11181152

11191153
impl ConnectionState {
@@ -1125,10 +1159,11 @@ impl ConnectionState {
11251159

11261160
/// Tracks an open path for the connection.
11271161
fn add_open_path(&mut self, remote: transports::Addr, path_id: PathId) {
1162+
#[cfg(feature = "metrics")]
1163+
self.metrics.add_path(path_id, &remote);
11281164
self.paths.insert(path_id, remote.clone());
11291165
self.open_paths.insert(path_id, remote.clone());
11301166
self.path_ids.insert(remote, path_id);
1131-
11321167
self.update_pub_path_info();
11331168
}
11341169

@@ -1138,11 +1173,15 @@ impl ConnectionState {
11381173
self.path_ids.remove(&addr);
11391174
}
11401175
self.open_paths.remove(path_id);
1176+
#[cfg(feature = "metrics")]
1177+
self.metrics.remove_path(path_id);
11411178
}
11421179

11431180
/// Removes the path from the open paths.
11441181
fn remove_open_path(&mut self, path_id: &PathId) {
11451182
self.open_paths.remove(path_id);
1183+
#[cfg(feature = "metrics")]
1184+
self.metrics.remove_path(path_id);
11461185

11471186
self.update_pub_path_info();
11481187
}
@@ -1162,6 +1201,19 @@ impl ConnectionState {
11621201

11631202
self.pub_open_paths.set(new).ok();
11641203
}
1204+
1205+
#[cfg(feature = "metrics")]
1206+
fn record_metrics_periodic(
1207+
&mut self,
1208+
metrics: &MagicsockMetrics,
1209+
selected_path: Option<transports::Addr>,
1210+
) {
1211+
let Some(conn) = self.handle.upgrade() else {
1212+
return;
1213+
};
1214+
self.metrics
1215+
.record_periodic(metrics, &conn, &self.open_paths, selected_path);
1216+
}
11651217
}
11661218

11671219
/// Watcher for the open paths and selected transmission path in a connection.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
//! Tracker for stateful metrics for connections and paths.
2+
3+
use std::time::Duration;
4+
5+
use quinn_proto::PathId;
6+
use rustc_hash::FxHashMap;
7+
8+
use crate::{magicsock::transports, metrics::MagicsockMetrics};
9+
10+
#[derive(Debug, Default)]
11+
pub(super) struct MetricsTracker {
12+
transport_summary: TransportSummary,
13+
path_rtt_variance: FxHashMap<PathId, RttVariance>,
14+
}
15+
16+
impl MetricsTracker {
17+
pub(super) fn add_path(&mut self, path_id: PathId, remote: &transports::Addr) {
18+
self.transport_summary.add_path(&remote);
19+
self.path_rtt_variance.insert(path_id, Default::default());
20+
}
21+
22+
pub(super) fn remove_path(&mut self, path_id: &PathId) {
23+
self.path_rtt_variance.remove(path_id);
24+
}
25+
26+
pub(super) fn record_periodic(
27+
&mut self,
28+
metrics: &MagicsockMetrics,
29+
conn: &quinn::Connection,
30+
path_remotes: &FxHashMap<PathId, transports::Addr>,
31+
selected_path: Option<transports::Addr>,
32+
) {
33+
for (path_id, remote) in path_remotes.iter() {
34+
let Some(stats) = conn.path_stats(*path_id) else {
35+
continue;
36+
};
37+
38+
let loss_rate = if stats.sent_packets == 0 {
39+
0.0
40+
} else {
41+
stats.lost_packets as f64 / stats.sent_packets as f64
42+
};
43+
metrics.path_packet_loss_rate.observe(loss_rate);
44+
45+
if Some(remote) == selected_path.as_ref() {
46+
metrics
47+
.connection_latency_ms
48+
.observe(stats.rtt.as_millis() as f64);
49+
}
50+
51+
let Some(rtt_variance) = self.path_rtt_variance.get_mut(path_id) else {
52+
continue;
53+
};
54+
rtt_variance.add_rtt_sample(stats.rtt);
55+
if let Some(variance) = rtt_variance.rtt_variance() {
56+
metrics
57+
.path_rtt_variance_ms
58+
.observe(variance.as_millis() as f64);
59+
}
60+
61+
let quality = rtt_variance.quality_score(loss_rate);
62+
metrics.path_quality_score.observe(quality);
63+
}
64+
}
65+
66+
pub(super) fn record_closed(&self, metrics: &MagicsockMetrics) {
67+
metrics.num_conns_closed.inc();
68+
match self.transport_summary {
69+
TransportSummary::IpOnly => {
70+
metrics.num_conns_transport_ip_only.inc();
71+
}
72+
TransportSummary::RelayOnly => {
73+
metrics.num_conns_transport_relay_only.inc();
74+
}
75+
TransportSummary::IpAndRelay => {
76+
metrics.num_conns_transport_ip_and_relay.inc();
77+
}
78+
TransportSummary::None => {}
79+
}
80+
}
81+
}
82+
83+
/// Tracks RTT variance over time, as a congestion marker.
84+
#[derive(Debug, Default)]
85+
struct RttVariance {
86+
/// Rolling window of recent latency measurements (stores up to 8 samples).
87+
samples: [Option<Duration>; 8],
88+
/// Index for next sample insertion (circular buffer).
89+
index: usize,
90+
}
91+
92+
impl RttVariance {
93+
fn add_rtt_sample(&mut self, rtt: Duration) {
94+
self.samples[self.index] = Some(rtt);
95+
self.index = (self.index + 1) % self.samples.len();
96+
}
97+
98+
/// Calculate RTT variance as a congestion indicator.
99+
/// Higher variance suggests congestion or unstable path.
100+
fn rtt_variance(&self) -> Option<Duration> {
101+
let samples: Vec<Duration> = self.samples.iter().filter_map(|&s| s).collect();
102+
103+
if samples.len() < 2 {
104+
return None;
105+
}
106+
107+
let mean = samples.iter().sum::<Duration>() / samples.len() as u32;
108+
let variance: f64 = samples
109+
.iter()
110+
.map(|&s| {
111+
let diff = s.as_secs_f64() - mean.as_secs_f64();
112+
diff * diff
113+
})
114+
.sum::<f64>()
115+
/ samples.len() as f64;
116+
117+
Some(Duration::from_secs_f64(variance.sqrt()))
118+
}
119+
120+
/// Path quality score (0.0 = worst, 1.0 = best).
121+
/// Factors in packet loss and RTT variance.
122+
fn quality_score(&self, packet_loss: f64) -> f64 {
123+
let loss_penalty = (1.0 - packet_loss).clamp(0.0, 1.0);
124+
125+
// Penalize high RTT variance
126+
let variance_penalty = match self.rtt_variance() {
127+
Some(var) if var.as_millis() > 50 => 0.7,
128+
Some(var) if var.as_millis() > 20 => 0.85,
129+
Some(_) => 1.0,
130+
None => 1.0,
131+
};
132+
133+
loss_penalty * variance_penalty
134+
}
135+
}
136+
137+
/// Used for metrics tracking.
138+
#[derive(Debug, Clone, Copy, Default)]
139+
enum TransportSummary {
140+
#[default]
141+
None,
142+
IpOnly,
143+
RelayOnly,
144+
IpAndRelay,
145+
}
146+
147+
impl TransportSummary {
148+
fn add_path(&mut self, addr: &transports::Addr) {
149+
use transports::Addr;
150+
*self = match (*self, addr) {
151+
(TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly,
152+
(TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => {
153+
Self::RelayOnly
154+
}
155+
_ => Self::IpAndRelay,
156+
}
157+
}
158+
}

iroh/src/magicsock/metrics.rs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,20 @@ pub struct Metrics {
5050

5151
/*
5252
* Connection Metrics
53+
*
54+
* These all only count connections that completed the TLS handshake successfully. This means
55+
* that short lived 0RTT connections are potentially not included in these counts.
5356
*/
54-
/// The number of direct connections we have made to peers.
55-
pub num_direct_conns_added: Counter,
56-
/// The number of direct connections we have lost to peers.
57-
pub num_direct_conns_removed: Counter,
58-
/// The number of connections to peers we have added over relay.
59-
pub num_relay_conns_added: Counter,
60-
/// The number of connections to peers we have removed over relay.
61-
pub num_relay_conns_removed: Counter,
57+
/// Number of connections opened (only handshaked connections are counted).
58+
pub num_conns_opened: Counter,
59+
/// Number of connections closed (only handshaked connections are counted).
60+
pub num_conns_closed: Counter,
61+
/// Number of connections that had only relay paths over their lifetime.
62+
pub num_conns_transport_relay_only: Counter,
63+
/// Number of connections that had only IP paths over their lifetime.
64+
pub num_conns_transport_ip_only: Counter,
65+
/// Number of connections that had both IP and relay paths.
66+
pub num_conns_transport_ip_and_relay: Counter,
6267

6368
pub actor_tick_main: Counter,
6469
pub actor_tick_msg: Counter,
@@ -68,28 +73,19 @@ pub struct Metrics {
6873
pub actor_link_change: Counter,
6974
pub actor_tick_other: Counter,
7075

71-
/// Number of endpoints we have attempted to contact.
72-
pub endpoints_contacted: Counter,
73-
/// Number of endpoints we have managed to contact directly.
74-
pub endpoints_contacted_directly: Counter,
75-
76-
/// Number of connections with a successful handshake.
77-
pub connection_handshake_success: Counter,
78-
/// Number of connections with a successful handshake that became direct.
79-
pub connection_became_direct: Counter,
8076
/// Histogram of connection latency in milliseconds across all endpoint connections.
8177
#[default(Histogram::new(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY]))]
8278
pub connection_latency_ms: Histogram,
8379

8480
/*
8581
* Path Congestion Metrics
8682
*/
87-
/// Number of times a path was marked as outdated due to consecutive ping failures.
88-
pub path_marked_outdated: Counter,
89-
/// Number of ping failures recorded across all paths.
90-
pub path_ping_failures: Counter,
91-
/// Number of consecutive failure resets (path recovered).
92-
pub path_failure_resets: Counter,
83+
// /// Number of times a path was marked as outdated due to consecutive ping failures.
84+
// pub path_marked_outdated: Counter,
85+
// /// Number of ping failures recorded across all paths.
86+
// pub path_ping_failures: Counter,
87+
// /// Number of consecutive failure resets (path recovered).
88+
// pub path_failure_resets: Counter,
9389
/// Histogram of packet loss rates (0.0-1.0) observed on UDP paths.
9490
#[default(Histogram::new(vec![0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 1.0]))]
9591
pub path_packet_loss_rate: Histogram,

0 commit comments

Comments
 (0)