Skip to content

Commit 281e84b

Browse files
committed
feat: add back path congestion metrics
1 parent 1d08068 commit 281e84b

File tree

3 files changed

+226
-60
lines changed

3 files changed

+226
-60
lines changed

iroh/src/magicsock/metrics.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use iroh_metrics::{Counter, MetricsGroup};
1+
use iroh_metrics::{Counter, Histogram, MetricsGroup};
22
use serde::{Deserialize, Serialize};
33

44
/// Enum of metrics for the module
@@ -14,8 +14,11 @@ pub struct Metrics {
1414
pub send_ipv4: Counter,
1515
pub send_ipv6: Counter,
1616
pub send_relay: Counter,
17+
pub send_relay_error: Counter,
1718

1819
// Data packets (non-disco)
20+
pub send_data: Counter,
21+
pub send_data_network_down: Counter,
1922
pub recv_data_relay: Counter,
2023
pub recv_data_ipv4: Counter,
2124
pub recv_data_ipv6: Counter,
@@ -69,25 +72,27 @@ pub struct Metrics {
6972
pub actor_tick_direct_addr_heartbeat: Counter,
7073
pub actor_link_change: Counter,
7174
pub actor_tick_other: Counter,
72-
// /// Histogram of connection latency in milliseconds across all endpoint connections.
73-
// #[default(Histogram::new(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY]))]
74-
// pub connection_latency_ms: Histogram,
75-
// /*
76-
// * Path Congestion Metrics
77-
// */
75+
76+
/// Histogram of connection latency in milliseconds across all endpoint connections.
77+
#[default(Histogram::new(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY]))]
78+
pub connection_latency_ms: Histogram,
79+
80+
/*
81+
* Path Congestion Metrics
82+
*/
7883
// /// Number of times a path was marked as outdated due to consecutive ping failures.
7984
// pub path_marked_outdated: Counter,
8085
// /// Number of ping failures recorded across all paths.
8186
// pub path_ping_failures: Counter,
8287
// /// Number of consecutive failure resets (path recovered).
8388
// pub path_failure_resets: Counter,
84-
// /// Histogram of packet loss rates (0.0-1.0) observed on UDP paths.
85-
// #[default(Histogram::new(vec![0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 1.0]))]
86-
// pub path_packet_loss_rate: Histogram,
87-
// /// Histogram of RTT variance (in milliseconds) as a congestion indicator.
88-
// #[default(Histogram::new(vec![0.0, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]))]
89-
// pub path_rtt_variance_ms: Histogram,
90-
// /// Histogram of path quality scores (0.0-1.0).
91-
// #[default(Histogram::new(vec![0.0, 0.3, 0.5, 0.7, 0.85, 0.95, 1.0]))]
92-
// pub path_quality_score: Histogram,
89+
/// Histogram of packet loss rates (0.0-1.0) observed on UDP paths.
90+
#[default(Histogram::new(vec![0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 1.0]))]
91+
pub path_packet_loss_rate: Histogram,
92+
/// Histogram of RTT variance (in milliseconds) as a congestion indicator.
93+
#[default(Histogram::new(vec![0.0, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]))]
94+
pub path_rtt_variance_ms: Histogram,
95+
/// Histogram of path quality scores (0.0-1.0).
96+
#[default(Histogram::new(vec![0.0, 0.3, 0.5, 0.7, 0.85, 0.95, 1.0]))]
97+
pub path_quality_score: Histogram,
9398
}

iroh/src/magicsock/remote_map/remote_state.rs

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ use crate::{
4141
mod guarded_channel;
4242
mod path_state;
4343

44+
#[cfg(feature = "metrics")]
45+
mod metrics;
46+
4447
// TODO: use this
4548
// /// Number of addresses that are not active that we keep around per endpoint.
4649
// ///
@@ -79,6 +82,9 @@ const APPLICATION_ABANDON_PATH: u8 = 30;
7982
/// in a high frequency, and to keep data about previous path around for subsequent connections.
8083
const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
8184

85+
/// Interval in which connection and path metrics are emitted.
86+
const METRICS_INTERVAL: Duration = Duration::from_secs(10);
87+
8288
/// A stream of events from all paths for all connections.
8389
///
8490
/// The connection is identified using [`ConnId`]. The event `Err` variant happens when the
@@ -224,6 +230,7 @@ impl RemoteStateActor {
224230
trace!("actor started");
225231
let idle_timeout = MaybeFuture::None;
226232
tokio::pin!(idle_timeout);
233+
let mut metrics_interval = time::interval(METRICS_INTERVAL);
227234
loop {
228235
let scheduled_path_open = match self.scheduled_open_path {
229236
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
@@ -266,6 +273,9 @@ impl RemoteStateActor {
266273
self.scheduled_holepunch = None;
267274
self.trigger_holepunching().await;
268275
}
276+
_ = metrics_interval.tick() => {
277+
self.record_metrics();
278+
}
269279
_ = &mut idle_timeout => {
270280
if self.connections.is_empty() && inbox.close_if_idle() {
271281
trace!("idle timeout expired and still idle: terminate actor");
@@ -388,7 +398,8 @@ impl RemoteStateActor {
388398
paths: Default::default(),
389399
open_paths: Default::default(),
390400
path_ids: Default::default(),
391-
transport_summary: TransportSummary::default(),
401+
#[cfg(feature = "metrics")]
402+
metrics: Default::default(),
392403
})
393404
.into_mut();
394405

@@ -575,8 +586,10 @@ impl RemoteStateActor {
575586

576587
fn handle_connection_close(&mut self, conn_id: ConnId) {
577588
if let Some(state) = self.connections.remove(&conn_id) {
578-
self.metrics.num_conns_closed.inc();
579-
state.transport_summary.record(&self.metrics);
589+
#[cfg(feature = "metrics")]
590+
state.metrics.record_closed(&self.metrics);
591+
#[cfg(not(feature = "metrics"))]
592+
let _ = state;
580593
}
581594
if self.connections.is_empty() {
582595
trace!("last connection closed - clearing selected_path");
@@ -1025,6 +1038,13 @@ impl RemoteStateActor {
10251038
}
10261039
}
10271040
}
1041+
1042+
fn record_metrics(&mut self) {
1043+
#[cfg(feature = "metrics")]
1044+
for state in self.connections.values_mut() {
1045+
state.record_metrics_periodic(&self.metrics, self.selected_path.get());
1046+
}
1047+
}
10281048
}
10291049

10301050
/// Messages to send to the [`RemoteStateActor`].
@@ -1127,8 +1147,11 @@ struct ConnectionState {
11271147
open_paths: FxHashMap<PathId, transports::Addr>,
11281148
/// Reverse map of [`Self::paths].
11291149
path_ids: FxHashMap<transports::Addr, PathId>,
1130-
/// Summary over transports used in this connection, for metrics tracking.
1131-
transport_summary: TransportSummary,
1150+
/// Tracker for stateful metrics for this connection and its paths
1151+
///
1152+
/// Feature-gated on the `metrics` feature because it increases memory use.
1153+
#[cfg(feature = "metrics")]
1154+
metrics: self::metrics::MetricsTracker,
11321155
}
11331156

11341157
impl ConnectionState {
@@ -1140,7 +1163,8 @@ impl ConnectionState {
11401163

11411164
/// Tracks an open path for the connection.
11421165
fn add_open_path(&mut self, remote: transports::Addr, path_id: PathId) {
1143-
self.transport_summary.add_path(&remote);
1166+
#[cfg(feature = "metrics")]
1167+
self.metrics.add_path(path_id, &remote);
11441168
self.paths.insert(path_id, remote.clone());
11451169
self.open_paths.insert(path_id, remote.clone());
11461170
self.path_ids.insert(remote, path_id);
@@ -1153,11 +1177,15 @@ impl ConnectionState {
11531177
self.path_ids.remove(&addr);
11541178
}
11551179
self.open_paths.remove(path_id);
1180+
#[cfg(feature = "metrics")]
1181+
self.metrics.remove_path(path_id);
11561182
}
11571183

11581184
/// Removes the path from the open paths.
11591185
fn remove_open_path(&mut self, path_id: &PathId) {
11601186
self.open_paths.remove(path_id);
1187+
#[cfg(feature = "metrics")]
1188+
self.metrics.remove_path(path_id);
11611189

11621190
self.update_pub_path_info();
11631191
}
@@ -1177,6 +1205,19 @@ impl ConnectionState {
11771205

11781206
self.pub_open_paths.set(new).ok();
11791207
}
1208+
1209+
#[cfg(feature = "metrics")]
1210+
fn record_metrics_periodic(
1211+
&mut self,
1212+
metrics: &MagicsockMetrics,
1213+
selected_path: Option<transports::Addr>,
1214+
) {
1215+
let Some(conn) = self.handle.upgrade() else {
1216+
return;
1217+
};
1218+
self.metrics
1219+
.record_periodic(metrics, &conn, &self.open_paths, selected_path);
1220+
}
11801221
}
11811222

11821223
/// Watcher for the open paths and selected transmission path in a connection.
@@ -1387,41 +1428,3 @@ impl Future for OnClosed {
13871428
Poll::Ready(self.conn_id)
13881429
}
13891430
}
1390-
1391-
/// Used for metrics tracking.
1392-
#[derive(Debug, Clone, Copy, Default)]
1393-
enum TransportSummary {
1394-
#[default]
1395-
None,
1396-
IpOnly,
1397-
RelayOnly,
1398-
IpAndRelay,
1399-
}
1400-
1401-
impl TransportSummary {
1402-
fn add_path(&mut self, addr: &transports::Addr) {
1403-
use transports::Addr;
1404-
*self = match (*self, addr) {
1405-
(TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly,
1406-
(TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => {
1407-
Self::RelayOnly
1408-
}
1409-
_ => Self::IpAndRelay,
1410-
}
1411-
}
1412-
1413-
fn record(&self, metrics: &MagicsockMetrics) {
1414-
match self {
1415-
TransportSummary::IpOnly => {
1416-
metrics.num_conns_transport_ip_only.inc();
1417-
}
1418-
TransportSummary::RelayOnly => {
1419-
metrics.num_conns_transport_relay_only.inc();
1420-
}
1421-
TransportSummary::IpAndRelay => {
1422-
metrics.num_conns_transport_ip_and_relay.inc();
1423-
}
1424-
TransportSummary::None => {}
1425-
}
1426-
}
1427-
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
//! Tracker for stateful metrics for connections and paths.
2+
3+
use std::time::Duration;
4+
5+
use quinn_proto::PathId;
6+
use rustc_hash::FxHashMap;
7+
8+
use crate::{magicsock::transports, metrics::MagicsockMetrics};
9+
10+
#[derive(Debug, Default)]
11+
pub(super) struct MetricsTracker {
12+
transport_summary: TransportSummary,
13+
path_rtt_variance: FxHashMap<PathId, RttVariance>,
14+
}
15+
16+
impl MetricsTracker {
17+
pub(super) fn add_path(&mut self, path_id: PathId, remote: &transports::Addr) {
18+
self.transport_summary.add_path(remote);
19+
self.path_rtt_variance.insert(path_id, Default::default());
20+
}
21+
22+
pub(super) fn remove_path(&mut self, path_id: &PathId) {
23+
self.path_rtt_variance.remove(path_id);
24+
}
25+
26+
pub(super) fn record_periodic(
27+
&mut self,
28+
metrics: &MagicsockMetrics,
29+
conn: &quinn::Connection,
30+
path_remotes: &FxHashMap<PathId, transports::Addr>,
31+
selected_path: Option<transports::Addr>,
32+
) {
33+
for (path_id, remote) in path_remotes.iter() {
34+
let Some(stats) = conn.path_stats(*path_id) else {
35+
continue;
36+
};
37+
38+
let loss_rate = if stats.sent_packets == 0 {
39+
0.0
40+
} else {
41+
stats.lost_packets as f64 / stats.sent_packets as f64
42+
};
43+
metrics.path_packet_loss_rate.observe(loss_rate);
44+
45+
if Some(remote) == selected_path.as_ref() {
46+
metrics
47+
.connection_latency_ms
48+
.observe(stats.rtt.as_millis() as f64);
49+
}
50+
51+
let Some(rtt_variance) = self.path_rtt_variance.get_mut(path_id) else {
52+
continue;
53+
};
54+
rtt_variance.add_rtt_sample(stats.rtt);
55+
if let Some(variance) = rtt_variance.rtt_variance() {
56+
metrics
57+
.path_rtt_variance_ms
58+
.observe(variance.as_millis() as f64);
59+
}
60+
61+
let quality = rtt_variance.quality_score(loss_rate);
62+
metrics.path_quality_score.observe(quality);
63+
}
64+
}
65+
66+
pub(super) fn record_closed(&self, metrics: &MagicsockMetrics) {
67+
metrics.num_conns_closed.inc();
68+
match self.transport_summary {
69+
TransportSummary::IpOnly => {
70+
metrics.num_conns_transport_ip_only.inc();
71+
}
72+
TransportSummary::RelayOnly => {
73+
metrics.num_conns_transport_relay_only.inc();
74+
}
75+
TransportSummary::IpAndRelay => {
76+
metrics.num_conns_transport_ip_and_relay.inc();
77+
}
78+
TransportSummary::None => {}
79+
}
80+
}
81+
}
82+
83+
/// Tracks RTT variance over time, as a congestion marker.
84+
#[derive(Debug, Default)]
85+
struct RttVariance {
86+
/// Rolling window of recent latency measurements (stores up to 8 samples).
87+
samples: [Option<Duration>; 8],
88+
/// Index for next sample insertion (circular buffer).
89+
index: usize,
90+
}
91+
92+
impl RttVariance {
93+
fn add_rtt_sample(&mut self, rtt: Duration) {
94+
self.samples[self.index] = Some(rtt);
95+
self.index = (self.index + 1) % self.samples.len();
96+
}
97+
98+
/// Calculate RTT variance as a congestion indicator.
99+
/// Higher variance suggests congestion or unstable path.
100+
fn rtt_variance(&self) -> Option<Duration> {
101+
let samples: Vec<Duration> = self.samples.iter().filter_map(|&s| s).collect();
102+
103+
if samples.len() < 2 {
104+
return None;
105+
}
106+
107+
let mean = samples.iter().sum::<Duration>() / samples.len() as u32;
108+
let variance: f64 = samples
109+
.iter()
110+
.map(|&s| {
111+
let diff = s.as_secs_f64() - mean.as_secs_f64();
112+
diff * diff
113+
})
114+
.sum::<f64>()
115+
/ samples.len() as f64;
116+
117+
Some(Duration::from_secs_f64(variance.sqrt()))
118+
}
119+
120+
/// Path quality score (0.0 = worst, 1.0 = best).
121+
/// Factors in packet loss and RTT variance.
122+
fn quality_score(&self, packet_loss: f64) -> f64 {
123+
let loss_penalty = (1.0 - packet_loss).clamp(0.0, 1.0);
124+
125+
// Penalize high RTT variance
126+
let variance_penalty = match self.rtt_variance() {
127+
Some(var) if var.as_millis() > 50 => 0.7,
128+
Some(var) if var.as_millis() > 20 => 0.85,
129+
Some(_) => 1.0,
130+
None => 1.0,
131+
};
132+
133+
loss_penalty * variance_penalty
134+
}
135+
}
136+
137+
/// Used for metrics tracking.
138+
#[derive(Debug, Clone, Copy, Default)]
139+
enum TransportSummary {
140+
#[default]
141+
None,
142+
IpOnly,
143+
RelayOnly,
144+
IpAndRelay,
145+
}
146+
147+
impl TransportSummary {
148+
fn add_path(&mut self, addr: &transports::Addr) {
149+
use transports::Addr;
150+
*self = match (*self, addr) {
151+
(TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly,
152+
(TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => {
153+
Self::RelayOnly
154+
}
155+
_ => Self::IpAndRelay,
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)