Skip to content

Commit c7cfda9

Browse files
committed
Revert removal of external producers
1 parent d4eb35a commit c7cfda9

File tree

4 files changed

+81
-11
lines changed

4 files changed

+81
-11
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
- TODO: Placeholder for Span processor related things
66
- *Fix* SpanProcessor::on_start is no longer called on non recording spans
7+
- Revert removal of `MetricProducer` which allowed metrics from
8+
external sources to be sent through OpenTelemetry.
79

810
## 0.30.0
911

opentelemetry-sdk/src/metrics/manual_reader.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
use super::{
1414
data::ResourceMetrics,
1515
pipeline::Pipeline,
16-
reader::{MetricReader, SdkProducer},
16+
reader::{MetricProducer, MetricReader, SdkProducer},
1717
};
1818

1919
/// A simple [MetricReader] that allows an application to read metrics on demand.
@@ -50,6 +50,7 @@ impl fmt::Debug for ManualReader {
5050
struct ManualReaderInner {
5151
sdk_producer: Option<Weak<dyn SdkProducer>>,
5252
is_shutdown: bool,
53+
external_producers: Vec<Box<dyn MetricProducer>>,
5354
}
5455

5556
impl ManualReader {
@@ -59,11 +60,15 @@ impl ManualReader {
5960
}
6061

6162
/// A [MetricReader] which is directly called to collect metrics.
62-
pub(crate) fn new(temporality: Temporality) -> Self {
63+
pub(crate) fn new(
64+
temporality: Temporality,
65+
external_producers: Vec<Box<dyn MetricProducer>>,
66+
) -> Self {
6367
ManualReader {
6468
inner: Mutex::new(ManualReaderInner {
6569
sdk_producer: None,
6670
is_shutdown: false,
71+
external_producers,
6772
}),
6873
temporality,
6974
}
@@ -86,7 +91,7 @@ impl MetricReader for ManualReader {
8691
});
8792
}
8893

89-
/// Gathers all metrics from the SDK, calling any
94+
/// Gathers all metrics from the SDK and other [MetricProducer]s, calling any
9095
/// callbacks necessary and returning the results.
9196
///
9297
/// Returns an error if called after shutdown.
@@ -105,7 +110,19 @@ impl MetricReader for ManualReader {
105110
}
106111
};
107112

108-
Ok(())
113+
let mut errs = vec![];
114+
for producer in &inner.external_producers {
115+
match producer.produce() {
116+
Ok(metrics) => rm.scope_metrics.push(metrics),
117+
Err(err) => errs.push(err),
118+
}
119+
}
120+
121+
if errs.is_empty() {
122+
Ok(())
123+
} else {
124+
Err(OTelSdkError::InternalFailure(format!("{:?}", errs)))
125+
}
109126
}
110127

111128
/// ForceFlush is a no-op, it always returns nil.
@@ -123,6 +140,7 @@ impl MetricReader for ManualReader {
123140
// Any future call to collect will now return an error.
124141
inner.sdk_producer = None;
125142
inner.is_shutdown = true;
143+
inner.external_producers = Vec::new();
126144

127145
Ok(())
128146
}
@@ -136,6 +154,7 @@ impl MetricReader for ManualReader {
136154
#[derive(Default)]
137155
pub struct ManualReaderBuilder {
138156
temporality: Temporality,
157+
producers: Vec<Box<dyn MetricProducer>>,
139158
}
140159

141160
impl fmt::Debug for ManualReaderBuilder {
@@ -156,8 +175,17 @@ impl ManualReaderBuilder {
156175
self
157176
}
158177

178+
/// Registers a an external [MetricProducer] with this reader.
179+
///
180+
/// The producer is used as a source of aggregated metric data which is
181+
/// incorporated into metrics collected from the SDK.
182+
pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
183+
self.producers.push(Box::new(producer));
184+
self
185+
}
186+
159187
/// Create a new [ManualReader] from this configuration.
160188
pub fn build(self) -> ManualReader {
161-
ManualReader::new(self.temporality)
189+
ManualReader::new(self.temporality, self.producers)
162190
}
163191
}

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
1212

1313
use crate::{
1414
error::{OTelSdkError, OTelSdkResult},
15-
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
15+
metrics::{exporter::PushMetricExporter, reader::MetricProducer, reader::SdkProducer},
1616
Resource,
1717
};
1818

@@ -30,6 +30,7 @@ const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
3030
pub struct PeriodicReaderBuilder<E> {
3131
interval: Duration,
3232
exporter: E,
33+
producers: Vec<Box<dyn MetricProducer>>,
3334
}
3435

3536
impl<E> PeriodicReaderBuilder<E>
@@ -42,7 +43,11 @@ where
4243
.and_then(|v| v.parse().map(Duration::from_millis).ok())
4344
.unwrap_or(DEFAULT_INTERVAL);
4445

45-
PeriodicReaderBuilder { interval, exporter }
46+
PeriodicReaderBuilder {
47+
interval,
48+
exporter,
49+
producers: Vec::new(),
50+
}
4651
}
4752

4853
/// Configures the intervening time between exports for a [PeriodicReader].
@@ -59,6 +64,15 @@ where
5964
self
6065
}
6166

67+
/// Registers a an external [MetricProducer] with this reader.
68+
///
69+
/// The producer is used as a source of aggregated metric data which is
70+
/// incorporated into metrics collected from the SDK.
71+
pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
72+
self.producers.push(Box::new(producer));
73+
self
74+
}
75+
6276
/// Create a [PeriodicReader] with the given config.
6377
pub fn build(self) -> PeriodicReader<E> {
6478
PeriodicReader::new(self.exporter, self.interval)
@@ -152,6 +166,7 @@ impl<E: PushMetricExporter> PeriodicReader<E> {
152166
message_sender,
153167
producer: Mutex::new(None),
154168
exporter: exporter_arc.clone(),
169+
external_producers: Vec::new(),
155170
}),
156171
};
157172
let cloned_reader = reader.clone();
@@ -351,6 +366,7 @@ struct PeriodicReaderInner<E: PushMetricExporter> {
351366
exporter: Arc<E>,
352367
message_sender: mpsc::Sender<Message>,
353368
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
369+
external_producers: Vec<Box<dyn MetricProducer>>,
354370
}
355371

356372
impl<E: PushMetricExporter> PeriodicReaderInner<E> {
@@ -364,23 +380,36 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
364380
}
365381

366382
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
383+
let mut errs = vec![];
367384
let producer = self.producer.lock().expect("lock poisoned");
368385
if let Some(p) = producer.as_ref() {
369386
p.upgrade()
370387
.ok_or(OTelSdkError::AlreadyShutdown)?
371388
.produce(rm)?;
372-
Ok(())
373389
} else {
374390
otel_warn!(
375391
name: "PeriodReader.MeterProviderNotRegistered",
376392
message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
377393
This occurs when a periodic reader is created but not associated with a MeterProvider \
378394
by calling `.with_reader(reader)` on MeterProviderBuilder."
379395
);
380-
Err(OTelSdkError::InternalFailure(
396+
errs.push(OTelSdkError::InternalFailure(
381397
"MeterProvider is not registered".into(),
382398
))
383399
}
400+
401+
for producer in &self.external_producers {
402+
match producer.produce() {
403+
Ok(metrics) => rm.scope_metrics.push(metrics),
404+
Err(err) => errs.push(err),
405+
}
406+
}
407+
408+
if errs.is_empty() {
409+
Ok(())
410+
} else {
411+
Err(OTelSdkError::InternalFailure(format!("{:?}", errs)))
412+
}
384413
}
385414

386415
fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {

opentelemetry-sdk/src/metrics/reader.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
//! Interfaces for reading and producing metrics
2-
use crate::error::OTelSdkResult;
2+
use crate::error::{OTelSdkError, OTelSdkResult};
33
use std::time::Duration;
44
use std::{fmt, sync::Weak};
55

6-
use super::{data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, Temporality};
6+
use super::{
7+
data::{ResourceMetrics, ScopeMetrics},
8+
instrument::InstrumentKind,
9+
pipeline::Pipeline,
10+
Temporality,
11+
};
712

813
/// The interface used between the SDK and an exporter.
914
///
@@ -65,3 +70,9 @@ pub(crate) trait SdkProducer: fmt::Debug + Send + Sync {
6570
/// Returns aggregated metrics from a single collection.
6671
fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult;
6772
}
73+
74+
/// Produces metrics for a [MetricReader] from an external source.
75+
pub trait MetricProducer: fmt::Debug + Send + Sync {
76+
/// Returns aggregated metrics from an external source.
77+
fn produce(&self) -> Result<ScopeMetrics, OTelSdkError>;
78+
}

0 commit comments

Comments
 (0)