Skip to content

Commit

Permalink
Remove export timeout configuration for PeriodicReader (#2598)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Feb 5, 2025
1 parent 15b5fa4 commit 5f7f2d5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 46 deletions.
14 changes: 12 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,13 @@ limit.
`opentelemetry_sdk::trace::{InMemorySpanExporter, InMemorySpanExporterBuilder};`
`opentelemetry_sdk::metrics::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};`

- *Breaking*: The `BatchLogProcessor` no longer supports configuration of `max_export_timeout`
- **Breaking**: The `BatchLogProcessor` no longer supports configuration of `max_export_timeout`
or the `OTEL_BLRP_EXPORT_TIMEOUT` environment variable. Timeout handling is now the
responsibility of the exporter.
For example, in the OTLP Logs exporter, the export timeout can be configured using:
- The environment variables `OTEL_EXPORTER_OTLP_TIMEOUT` or `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT`.
- The opentelemetry_otlp API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.

Before:
```rust
let processor = BatchLogProcessor::builder(exporter)
Expand Down Expand Up @@ -340,12 +341,13 @@ let processor = BatchLogProcessor::builder(exporter)
.build();
```

- *Breaking*: The `BatchSpanProcessor` no longer supports configuration of `max_export_timeout`
- **Breaking**: The `BatchSpanProcessor` no longer supports configuration of `max_export_timeout`
or the `OTEL_BLRP_EXPORT_TIMEOUT` environment variable. Timeout handling is now the
responsibility of the exporter.
For example, in the OTLP Span exporter, the export timeout can be configured using:
- The environment variables `OTEL_EXPORTER_OTLP_TIMEOUT` or `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`.
- The opentelemetry_otlp API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.

Before:
```rust
let processor = BatchSpanProcessor::builder(exporter)
Expand Down Expand Up @@ -373,6 +375,14 @@ let processor = BatchSpanProcessor::builder(exporter)
.build();
```

- **Breaking**: The `PeriodicReader` no longer supports configuration of export timeout using
`with_timeout` API method.
Timeout handling is now the responsibility of the exporter.

For example, in the OTLP Metrics exporter, the export timeout can be configured using:
- The environment variables `OTEL_EXPORTER_OTLP_TIMEOUT` or `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`.
- The `opentelemetry_otlp` API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.

- **Breaking**
- The public API changes in the Tracing:
- Before:
Expand Down
54 changes: 10 additions & 44 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@ use super::{
data::ResourceMetrics, instrument::InstrumentKind, reader::MetricReader, Pipeline, Temporality,
};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);

const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";

/// Configuration options for [PeriodicReader].
#[derive(Debug)]
pub struct PeriodicReaderBuilder<E> {
interval: Duration,
timeout: Duration,
exporter: E,
}

Expand All @@ -43,16 +40,8 @@ where
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_INTERVAL);
let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_TIMEOUT);

PeriodicReaderBuilder {
interval,
timeout,
exporter,
}
PeriodicReaderBuilder { interval, exporter }
}

/// Configures the intervening time between exports for a [PeriodicReader].
Expand All @@ -69,25 +58,9 @@ where
self
}

/// Configures the timeout for an export to complete. PeriodicReader itself
/// does not enforce timeout. Instead timeout is passed on to the exporter
/// for each export attempt.
///
/// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT`
/// environment variable.
///
/// If this option is not used or `timeout` is equal to zero, 30 seconds is used
/// as the default.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
if !timeout.is_zero() {
self.timeout = timeout;
}
self
}

/// Create a [PeriodicReader] with the given config.
pub fn build(self) -> PeriodicReader {
PeriodicReader::new(self.exporter, self.interval, self.timeout)
PeriodicReader::new(self.exporter, self.interval)
}
}

Expand Down Expand Up @@ -165,7 +138,7 @@ impl PeriodicReader {
PeriodicReaderBuilder::new(exporter)
}

fn new<E>(exporter: E, interval: Duration, timeout: Duration) -> Self
fn new<E>(exporter: E, interval: Duration) -> Self
where
E: PushMetricExporter,
{
Expand All @@ -189,7 +162,6 @@ impl PeriodicReader {
otel_info!(
name: "PeriodReaderThreadStarted",
interval_in_millisecs = interval.as_millis(),
timeout_in_millisecs = timeout.as_millis()
);
loop {
otel_debug!(
Expand All @@ -200,8 +172,7 @@ impl PeriodicReader {
otel_debug!(
name: "PeriodReaderThreadExportingDueToFlush"
);

let export_result = cloned_reader.collect_and_export(timeout);
let export_result = cloned_reader.collect_and_export();
otel_debug!(
name: "PeriodReaderInvokedExport",
export_result = format!("{:?}", export_result)
Expand Down Expand Up @@ -257,7 +228,7 @@ impl PeriodicReader {
Ok(Message::Shutdown(response_sender)) => {
// Perform final export and break out of loop and exit the thread
otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
let export_result = cloned_reader.collect_and_export(timeout);
let export_result = cloned_reader.collect_and_export();
otel_debug!(
name: "PeriodReaderInvokedExport",
export_result = format!("{:?}", export_result)
Expand Down Expand Up @@ -305,7 +276,7 @@ impl PeriodicReader {
name: "PeriodReaderThreadExportingDueToTimer"
);

let export_result = cloned_reader.collect_and_export(timeout);
let export_result = cloned_reader.collect_and_export();
otel_debug!(
name: "PeriodReaderInvokedExport",
export_result = format!("{:?}", export_result)
Expand Down Expand Up @@ -357,8 +328,8 @@ impl PeriodicReader {
reader
}

fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult {
self.inner.collect_and_export(timeout)
fn collect_and_export(&self) -> OTelSdkResult {
self.inner.collect_and_export()
}
}

Expand Down Expand Up @@ -402,23 +373,18 @@ impl PeriodicReaderInner {
}
}

fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult {
fn collect_and_export(&self) -> OTelSdkResult {
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
// owned data structures to be passed to exporters.
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};

// Measure time taken for collect, and subtract it from the timeout.
let current_time = Instant::now();
let collect_result = self.collect(&mut rm);
let time_taken_for_collect = current_time.elapsed();
let _timeout = if time_taken_for_collect > timeout {
Duration::from_secs(0)
} else {
timeout - time_taken_for_collect
};

#[allow(clippy::question_mark)]
if let Err(e) = collect_result {
otel_warn!(
Expand Down

0 comments on commit 5f7f2d5

Please sign in to comment.