Skip to content

Commit 47c2949

Browse files
committed
refactor: [torrust#1598] make recalculate udp avg scrape processing time metric and update atomic
It also fixes a division by zero bug when the metrics is updated before the counter for number of conenctions has been increased. It only avoid the division by zero. I will propoerly fixed with independent request counter for the moving average calculation.
1 parent 59fbb39 commit 47c2949

File tree

3 files changed

+55
-48
lines changed

3 files changed

+55
-48
lines changed

packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use torrust_tracker_primitives::DurationSinceUnixEpoch;
44

55
use crate::event::{ConnectionContext, UdpRequestKind, UdpResponseKind};
66
use crate::statistics::repository::Repository;
7-
use crate::statistics::{UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL};
7+
use crate::statistics::UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL;
88

99
pub async fn handle_event(
1010
context: ConnectionContext,
@@ -36,33 +36,20 @@ pub async fn handle_event(
3636
(LabelValue::new("ok"), UdpRequestKind::Announce { announce_request }.into())
3737
}
3838
UdpRequestKind::Scrape => {
39-
let new_avg = stats_repository
40-
.recalculate_udp_avg_scrape_processing_time_ns(req_processing_time)
41-
.await;
42-
43-
tracing::debug!("Updating average processing time metric for scrape requests: {} ns", new_avg);
44-
4539
let mut label_set = LabelSet::from(context.clone());
4640
label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string()));
47-
match stats_repository
48-
.set_gauge(
49-
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
50-
&label_set,
51-
new_avg,
52-
now,
53-
)
54-
.await
55-
{
56-
Ok(()) => {}
57-
Err(err) => tracing::error!("Failed to set gauge: {}", err),
58-
}
41+
42+
let _new_avg = stats_repository
43+
.recalculate_udp_avg_scrape_processing_time_ns(req_processing_time, &label_set, now)
44+
.await;
45+
5946
(LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string()))
6047
}
6148
},
6249
UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()),
6350
};
6451

65-
// Extendable metrics
52+
// Increase the number of responses sent
6653
let mut label_set = LabelSet::from(context);
6754
if result_label_value == LabelValue::new("ok") {
6855
label_set.upsert(label_name!("request_kind"), kind_label_value);

packages/udp-tracker-server/src/statistics/metrics.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -115,34 +115,25 @@ impl Metrics {
115115
new_avg
116116
}
117117

118-
fn update_udp_avg_processing_time_ns(&mut self, new_avg: f64, label_set: &LabelSet, now: DurationSinceUnixEpoch) {
119-
tracing::debug!(
120-
"Updating average processing time metric to {} ns for label set {}",
121-
new_avg,
122-
label_set,
123-
);
124-
125-
match self.set_gauge(
126-
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
127-
label_set,
128-
new_avg,
129-
now,
130-
) {
131-
Ok(()) => {}
132-
Err(err) => tracing::error!("Failed to set gauge: {}", err),
133-
}
134-
}
135-
136118
#[allow(clippy::cast_precision_loss)]
137-
pub fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
119+
pub fn recalculate_udp_avg_scrape_processing_time_ns(
120+
&mut self,
121+
req_processing_time: Duration,
122+
label_set: &LabelSet,
123+
now: DurationSinceUnixEpoch,
124+
) -> f64 {
138125
let req_processing_time = req_processing_time.as_nanos() as f64;
139126

140127
let udp_scrapes_handled = (self.udp4_scrapes_handled() + self.udp6_scrapes_handled()) as f64;
141128

142129
let previous_avg = self.udp_avg_scrape_processing_time_ns();
143130

144-
// Moving average: https://en.wikipedia.org/wiki/Moving_average
145-
let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_scrapes_handled;
131+
let new_avg = if udp_scrapes_handled == 0.0 {
132+
req_processing_time
133+
} else {
134+
// Moving average: https://en.wikipedia.org/wiki/Moving_average
135+
previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_scrapes_handled
136+
};
146137

147138
tracing::debug!(
148139
"Recalculated UDP average scrape processing time: {} ns (previous: {} ns, req_processing_time: {} ns, udp_scrapes_handled: {})",
@@ -152,9 +143,29 @@ impl Metrics {
152143
udp_scrapes_handled
153144
);
154145

146+
self.update_udp_avg_processing_time_ns(new_avg, label_set, now);
147+
155148
new_avg
156149
}
157150

151+
fn update_udp_avg_processing_time_ns(&mut self, new_avg: f64, label_set: &LabelSet, now: DurationSinceUnixEpoch) {
152+
tracing::debug!(
153+
"Updating average processing time metric to {} ns for label set {}",
154+
new_avg,
155+
label_set,
156+
);
157+
158+
match self.set_gauge(
159+
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
160+
label_set,
161+
new_avg,
162+
now,
163+
) {
164+
Ok(()) => {}
165+
Err(err) => tracing::error!("Failed to set gauge: {}", err),
166+
}
167+
}
168+
158169
// UDP
159170
/// Total number of UDP (UDP tracker) requests aborted.
160171
#[must_use]

packages/udp-tracker-server/src/statistics/repository.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,15 @@ impl Repository {
103103
new_avg
104104
}
105105

106-
pub async fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
107-
let stats_lock = self.stats.write().await;
106+
pub async fn recalculate_udp_avg_scrape_processing_time_ns(
107+
&self,
108+
req_processing_time: Duration,
109+
label_set: &LabelSet,
110+
now: DurationSinceUnixEpoch,
111+
) -> f64 {
112+
let mut stats_lock = self.stats.write().await;
108113

109-
let new_avg = stats_lock.recalculate_udp_avg_scrape_processing_time_ns(req_processing_time);
114+
let new_avg = stats_lock.recalculate_udp_avg_scrape_processing_time_ns(req_processing_time, label_set, now);
110115

111116
drop(stats_lock);
112117

@@ -436,7 +441,9 @@ mod tests {
436441

437442
// Calculate new average with processing time of 1200ns
438443
let processing_time = Duration::from_nanos(1200);
439-
let new_avg = repo.recalculate_udp_avg_scrape_processing_time_ns(processing_time).await;
444+
let new_avg = repo
445+
.recalculate_udp_avg_scrape_processing_time_ns(processing_time, &scrape_labels, now)
446+
.await;
440447

441448
// Moving average: previous_avg + (new_value - previous_avg) / total_scrapes
442449
// 800 + (1200 - 800) / 4 = 800 + 100 = 900
@@ -465,14 +472,16 @@ mod tests {
465472
.recalculate_udp_avg_announce_processing_time_ns(processing_time, &announce_labels, now)
466473
.await;
467474

468-
let _scrape_labels = LabelSet::from([("request_kind", "scrape")]);
469-
let scrape_avg = repo.recalculate_udp_avg_scrape_processing_time_ns(processing_time).await;
475+
let scrape_labels = LabelSet::from([("request_kind", "scrape")]);
476+
let scrape_avg = repo
477+
.recalculate_udp_avg_scrape_processing_time_ns(processing_time, &scrape_labels, now)
478+
.await;
470479

471480
// With 0 total connections, the formula becomes 0 + (1000 - 0) / 0
472481
// This should handle the division by zero case gracefully
473482
assert!((connect_avg - 1000.0).abs() < f64::EPSILON);
474483
assert!((announce_avg - 1000.0).abs() < f64::EPSILON);
475-
assert!(scrape_avg.is_infinite() || scrape_avg.is_nan());
484+
assert!((scrape_avg - 1000.0).abs() < f64::EPSILON);
476485
}
477486

478487
#[tokio::test]

0 commit comments

Comments
 (0)