Skip to content

Commit

Permalink
ObservableGauge collect data points since previous collection (#2618)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
fraillt and cijothomas authored Feb 5, 2025
1 parent ad38303 commit d79950d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## vNext

- *Bug fix*: ObservableGauge returns data points recorded since previous collection, despite temporality. Other asynchronous (observable) instruments with Cumulative temporality behave as synchronous ones and return data points on every collection. [#2213](https://github.com/open-telemetry/opentelemetry-rust/issues/2213)

- *Feature*: Introduced a new feature flag, `experimental_metrics_disable_name_validation`, under the `opentelemetry-sdk`, which allows disabling the Instrument Name Validation. This is useful in scenarios where you need to use *special characters*, *Windows Perf Counter Wildcard Path*, or similar cases. For more details, check [#2543](https://github.com/open-telemetry/opentelemetry-rust/pull/2543).
> **WARNING**: While this feature provides flexibility, **be cautious** when using it, as platforms like **Prometheus** impose restrictions on metric names and labels (e.g., no spaces, capital letters, or certain special characters). Using invalid characters may result in compatibility issues or incorrect behavior. Ensure that instrument names comply with the requirements of your target platform to avoid potential problems.
Expand Down
10 changes: 7 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,12 @@ impl<T: Number> AggregateBuilder<T> {
}

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> AggregateFns<T> {
LastValue::new(self.temporality, self.filter.clone()).into()
pub(crate) fn last_value(&self, overwrite_temporality: Option<Temporality>) -> AggregateFns<T> {
LastValue::new(
overwrite_temporality.unwrap_or(self.temporality),
self.filter.clone(),
)
.into()
}

/// Builds a precomputed sum aggregate function input and output.
Expand Down Expand Up @@ -210,7 +214,7 @@ mod tests {
#[test]
fn last_value_aggregation() {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
Expand Down
32 changes: 23 additions & 9 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,22 +1394,32 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn asynchronous_instruments_cumulative_with_gap_in_measurements() {
async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
// Run this test with stdout enabled to see output.
// cargo test asynchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture
// cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture

asynchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"gauge", true,
);
// TODO fix: all asynchronous instruments should not emit data points if not measured
// but these implementations are still buggy
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"counter", false,
);
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"updown_counter",
false,
);
}

fn asynchronous_instruments_cumulative_with_gap_in_measurements_helper(
fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
instrument_name: &'static str,
should_not_emit: bool,
) {
let mut test_context = TestContext::new(Temporality::Cumulative);
let attributes = Arc::new([KeyValue::new("key1", "value1")]);

// Create instrument and emit measurements
// Create instrument and emit measurements once
match instrument_name {
"counter" => {
let has_run = AtomicBool::new(false);
Expand Down Expand Up @@ -1466,8 +1476,12 @@ mod tests {

test_context.flush_metrics();

// Test that latest export has the same data as the previous one
assert_correct_export(&mut test_context, instrument_name);
if should_not_emit {
test_context.check_no_metrics();
} else {
// Test that latest export has the same data as the previous one
assert_correct_export(&mut test_context, instrument_name);
}

fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
match instrument_name {
Expand Down
18 changes: 16 additions & 2 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{

use self::internal::AggregateFns;

use super::Aggregation;
use super::{Aggregation, Temporality};

/// Connects all of the instruments created by a meter provider to a [MetricReader].
///
Expand Down Expand Up @@ -488,9 +488,20 @@ fn aggregate_fn<T: Number>(
match agg {
Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
Aggregation::Drop => Ok(None),
Aggregation::LastValue => Ok(Some(b.last_value())),
Aggregation::LastValue => {
match kind {
InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
// temporality for LastValue only affects how data points are reported, so we can always use
// delta temporality, because observable instruments should report data points only since previous collection
InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
_ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
}
}
Aggregation::Sum => {
let fns = match kind {
// TODO implement: observable instruments should not report data points on every collect
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
InstrumentKind::ObservableCounter => b.precomputed_sum(true),
InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
Expand All @@ -508,6 +519,9 @@ fn aggregate_fn<T: Number>(
| InstrumentKind::ObservableUpDownCounter
| InstrumentKind::ObservableGauge
);
// TODO implement: observable instruments should not report data points on every collect
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
Ok(Some(b.explicit_bucket_histogram(
boundaries.to_vec(),
*record_min_max,
Expand Down

0 comments on commit d79950d

Please sign in to comment.