Skip to content

feat(span_processor): add on_ending callback #3010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/tracing-http-propagator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ hyper = { workspace = true, features = ["full"] }
hyper-util = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["experimental_span_processor_on_ending"]}
opentelemetry-http = { path = "../../opentelemetry-http" }
opentelemetry-stdout = { workspace = true, features = ["trace", "logs"] }
opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" }
Expand Down
46 changes: 45 additions & 1 deletion examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use opentelemetry_sdk::{
error::OTelSdkResult,
logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider},
propagation::{BaggagePropagator, TraceContextPropagator},
trace::{SdkTracerProvider, SpanProcessor},
trace::{SdkTracerProvider, SpanData, SpanProcessor},
};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::{LogExporter, SpanExporter};
Expand Down Expand Up @@ -105,6 +105,49 @@ async fn router(
response
}

fn obfuscate_http_auth_url(s: &str) -> Option<String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use a simpler example to show the purpose of on_ending?
eg : checking for a specific attribute and if not present, add it.

#[allow(clippy::unnecessary_to_owned)]
let uri = hyper::http::Uri::from_maybe_shared(s.to_string()).ok()?;
let authority = uri.authority()?;
let (_, url) = authority.as_str().split_once('@')?;
let new_auth = format!("REDACTED_USERNAME:REDACTED_PASSWORD@{url}");
let mut parts = uri.into_parts();
parts.authority = Some(hyper::http::uri::Authority::from_maybe_shared(new_auth).ok()?);
Some(hyper::Uri::from_parts(parts).ok()?.to_string())
}

#[derive(Debug)]
/// A custom span processor that uses on_ending to obfuscate sensitive information in span attributes.
///
/// Currently this only overrides http auth information in the URI.
struct SpanObfuscationProcessor;

impl SpanProcessor for SpanObfuscationProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::OTelSdkResult {
Ok(())
}

fn on_start(&self, _span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) {}

fn on_ending(&self, span: &mut opentelemetry_sdk::trace::Span) {
let mut obfuscated_attributes = Vec::new();
let Some(span) = span.exported_data() else {
return;
};
for KeyValue { key, value, .. } in span.attributes {
if let Some(redacted_uri) = obfuscate_http_auth_url(value.as_str().as_ref()) {
obfuscated_attributes.push((key.clone(), KeyValue::new(key.clone(), redacted_uri)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm...doesn't looks like the span is mutated with the updated attribute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to add the add_attribute call at the end 🙈

}
}
}

fn on_end(&self, _span: SpanData) {}
}

/// A custom log processor that enriches LogRecords with baggage attributes.
/// Baggage information is not added automatically without this processor.
#[derive(Debug)]
Expand Down Expand Up @@ -159,6 +202,7 @@ fn init_tracer() -> SdkTracerProvider {
// that prints the spans to stdout.
let provider = SdkTracerProvider::builder()
.with_span_processor(EnrichWithBaggageSpanProcessor)
.with_span_processor(SpanObfuscationProcessor)
.with_simple_exporter(SpanExporter::default())
.build();

Expand Down
5 changes: 5 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## vNext

- TODO: Placeholder for Span processor related things
- Add `on_ending` API on the span processor. This allows mutating spans while
they are teriminating.


## 0.30.0

Released 2025-May-23
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimenta
experimental_logs_concurrent_log_processor = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"]
experimental_metrics_disable_name_validation = ["metrics"]
experimental_span_processor_on_ending = ["trace"]

[[bench]]
name = "context"
Expand Down
91 changes: 82 additions & 9 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,26 +198,33 @@

impl Span {
fn ensure_ended_and_exported(&mut self, timestamp: Option<SystemTime>) {
// skip if data has already been exported
let mut data = match self.data.take() {
Some(data) => data,
None => return,
};

let provider = self.tracer.provider();
// skip if provider has been shut down
if provider.is_shutdown() {
if self.tracer.provider().is_shutdown() {
return;
}

// skip if data has already been exported
let Some(data) = self.data.as_mut() else {
return;
};
// ensure end time is set via explicit end or implicitly on drop
if let Some(timestamp) = timestamp {
data.end_time = timestamp;
} else if data.end_time == data.start_time {
data.end_time = opentelemetry::time::now();
}

match provider.span_processors() {
#[cfg(feature = "experimental_span_processor_on_ending")]
{
let provider = self.tracer.provider().clone();
for processor in provider.span_processors() {
processor.on_ending(self);
}
}

let Some(data) = self.data.take() else { return };

match self.tracer.provider().span_processors() {
[] => {}
[processor] => {
processor.on_end(build_export_data(
Expand Down Expand Up @@ -725,4 +732,70 @@
// return none if the provider has already been dropped
assert!(dropped_span.exported_data().is_none());
}

#[test]
#[cfg(feature = "experimental_span_processor_on_ending")]
fn test_on_ending_mutate_span() {
use crate::trace::SpanProcessor;

#[derive(Debug)]
struct FirstProcessor;
impl SpanProcessor for FirstProcessor {
fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {}
fn on_end(&self, _span: crate::trace::SpanData) {}
fn on_ending(&self, span: &mut Span) {
span.set_attribute(KeyValue::new("first_processor", "true"));
}

fn force_flush(&self) -> crate::error::OTelSdkResult {
Ok(())
}

Check warning on line 752 in opentelemetry-sdk/src/trace/span.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span.rs#L750-L752

Added lines #L750 - L752 were not covered by tests

fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::error::OTelSdkResult {
Ok(())
}
}

#[derive(Debug)]
struct SecondProcessor;
impl SpanProcessor for SecondProcessor {
fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {}
fn on_end(&self, _span: crate::trace::SpanData) {}
fn on_ending(&self, span: &mut Span) {
assert!(span
.exported_data()
.unwrap()
.attributes
.contains(&KeyValue::new("first_processor", "true")));
span.set_attribute(KeyValue::new("second_processor", "true"));
}

fn force_flush(&self) -> crate::error::OTelSdkResult {
Ok(())
}

Check warning on line 775 in opentelemetry-sdk/src/trace/span.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span.rs#L773-L775

Added lines #L773 - L775 were not covered by tests

fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::error::OTelSdkResult {
Ok(())
}
}

let exporter = crate::trace::span_processor::tests::MockSpanExporter::new();
let spans = exporter.exported_spans.clone();

let provider = crate::trace::SdkTracerProvider::builder()
.with_span_processor(FirstProcessor)
.with_span_processor(SecondProcessor)
.with_simple_exporter(exporter)
.build();
provider.tracer("test").start("test_span");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test suggestion:
start the span with some attributes.
then add few more attributes to it.
validate that on_start sees only those attributes added at start time.
validate that on_ending sees attributes added at start time, attributes added via span.setattribute, attributes added by any previous processor.


spans.lock().unwrap().iter().for_each(|span| {
assert!(span
.attributes
.contains(&KeyValue::new("first_processor", "true")));
assert!(span
.attributes
.contains(&KeyValue::new("second_processor", "true")));
});
}
}
28 changes: 23 additions & 5 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// synchronously on the thread that started the span, therefore it should
/// not block or throw exceptions.
fn on_start(&self, span: &mut Span, cx: &Context);

#[cfg(feature = "experimental_span_processor_on_ending")]
/// `on_ending` is called when a `Span` is ending. The end timestamp has already
/// been computed.
/// This method is called synchronously within the `Span::end` API, therefore it
/// should not block or throw an exception.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the "throw exception" part as it won't fit Rust...

///
/// If multiple span processors are registered, their on_ending methods are invoked
/// in the order the span processors have been registered, and mutations to the span
/// will be visible to the next processor.
///
/// The tracer will call `on_ending` for all span processors before calling `on_end`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: skip details about who is calling. It's good enough to mention that ending is called before end.
Also mention the difference between overriding on_start vs on_ending, as on_start only has those span attributes provide at span_cration time.

/// for any of them.
fn on_ending(&self, _span: &mut Span) {
// Default implementation is a no-op so existing processor implementations
// don't break if this feature in enabled transitively.
}

/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
/// already set). This method is called synchronously within the `Span::end`
/// API, therefore it should not block or throw an exception.
Expand Down Expand Up @@ -852,7 +870,7 @@ impl BatchConfigBuilder {
}

#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
pub(crate) mod tests {
// cargo test trace::span_processor::tests:: --features=testing
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
Expand Down Expand Up @@ -1069,13 +1087,13 @@ mod tests {

// Mock exporter to test functionality
#[derive(Debug)]
struct MockSpanExporter {
exported_spans: Arc<Mutex<Vec<SpanData>>>,
exported_resource: Arc<Mutex<Option<Resource>>>,
pub(crate) struct MockSpanExporter {
pub exported_spans: Arc<Mutex<Vec<SpanData>>>,
pub exported_resource: Arc<Mutex<Option<Resource>>>,
}

impl MockSpanExporter {
fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
exported_spans: Arc::new(Mutex::new(Vec::new())),
exported_resource: Arc::new(Mutex::new(None)),
Expand Down