Skip to content

feat: Simplify opentelemetry-proto: SDK decoupling and gRPC separation #3046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ tonic = { workspace = true, features = ["router", "server"] }
trace = ["opentelemetry/trace", "opentelemetry_sdk/trace", "opentelemetry-proto/trace"]
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics", "opentelemetry-proto/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "opentelemetry-proto/logs"]
zpages = ["trace", "opentelemetry-proto/zpages"]
profiles = ["opentelemetry-proto/profiles"]
internal-logs = ["tracing", "opentelemetry/internal-logs"]

# add ons
Expand All @@ -71,9 +73,9 @@ tls = ["tonic/tls-ring"]
tls-roots = ["tls", "tonic/tls-native-roots"]
tls-webpki-roots = ["tls", "tonic/tls-webpki-roots"]

# http binary
http-proto = ["prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "http", "trace", "metrics"]
http-json = ["serde_json", "prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "opentelemetry-proto/with-serde", "http", "trace", "metrics"]
# http binary
http-proto = ["prost", "opentelemetry-http", "opentelemetry-proto/prost", "http", "trace", "metrics"]
http-json = ["serde_json", "prost", "opentelemetry-http", "opentelemetry-proto/prost", "opentelemetry-proto/with-serde", "http", "trace", "metrics"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
reqwest-rustls = ["reqwest", "opentelemetry-http/reqwest-rustls"]
Expand Down
12 changes: 7 additions & 5 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use http::{HeaderName, HeaderValue, Uri};
#[cfg(feature = "http-json")]
use opentelemetry::otel_debug;
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
use crate::transform::common::tonic::ResourceAttributesWithSchema;
#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
use crate::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "metrics")]
use crate::transform::metrics::tonic::resource_metrics_to_export_request;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
use crate::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::logs::LogBatch;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -263,7 +265,7 @@ pub(crate) struct OtlpHttpClient {
_timeout: Duration,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
resource: crate::transform::common::tonic::ResourceAttributesWithSchema,
}

impl OtlpHttpClient {
Expand Down Expand Up @@ -330,7 +332,7 @@ impl OtlpHttpClient {
) -> Option<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;

let req: ExportMetricsServiceRequest = metrics.into();
let req: ExportMetricsServiceRequest = resource_metrics_to_export_request(metrics);

match self.protocol {
#[cfg(feature = "http-json")]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use std::time;
use tokio::sync::Mutex;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
use crate::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicLogsClient {
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
resource: crate::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::sync::Mutex;

use opentelemetry::otel_debug;
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
metrics_service_client::MetricsServiceClient,
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use super::BoxInterceptor;
use crate::metric::MetricsClient;
use crate::transform::metrics::tonic::resource_metrics_to_export_request;

pub(crate) struct TonicMetricsClient {
inner: Mutex<Option<ClientInner>>,
Expand Down Expand Up @@ -81,7 +82,7 @@ impl MetricsClient for TonicMetricsClient {
.export(Request::from_parts(
metadata,
extensions,
ExportMetricsServiceRequest::from(metrics),
resource_metrics_to_export_request(metrics),
))
.await;

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use opentelemetry::otel_debug;
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
use crate::transform::trace::tonic::group_spans_by_resource_and_scope;
use opentelemetry_sdk::error::OTelSdkError;
use opentelemetry_sdk::{
error::OTelSdkResult,
Expand All @@ -19,7 +19,7 @@ pub(crate) struct TonicTracesClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
resource: crate::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ mod metric;
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
mod span;

// Transform logic moved from opentelemetry-proto for SDK decoupling
/// Transformation utilities for converting SDK types to protobuf types.
pub mod transform;

pub use crate::exporter::Compression;
pub use crate::exporter::ExportConfig;
pub use crate::exporter::ExporterBuildError;
Expand Down
164 changes: 164 additions & 0 deletions opentelemetry-otlp/src/transform/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#[cfg(all(
any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"),
any(feature = "trace", feature = "metrics", feature = "logs")
))]
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[cfg(all(
any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"),
any(feature = "trace", feature = "metrics", feature = "logs")
))]
pub(crate) fn to_nanos(time: SystemTime) -> u64 {
time.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_nanos() as u64
}

#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
/// Tonic-specific common transformation utilities.
pub mod tonic {
use opentelemetry_proto::tonic::common::v1::{
any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
};
use opentelemetry::{Array, Value};
use std::borrow::Cow;

#[cfg(any(feature = "trace", feature = "logs"))]
#[derive(Debug, Default)]
/// Resource attributes with schema information.
pub struct ResourceAttributesWithSchema {
/// Resource attributes.
pub attributes: Attributes,
/// Schema URL for the resource.
pub schema_url: Option<String>,
}

#[cfg(any(feature = "trace", feature = "logs"))]
impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema {
fn from(resource: &opentelemetry_sdk::Resource) -> Self {
ResourceAttributesWithSchema {
attributes: resource_attributes(resource),
schema_url: resource.schema_url().map(ToString::to_string),
}
}
}

#[cfg(any(feature = "trace", feature = "logs"))]
use opentelemetry_sdk::Resource;

/// Creates instrumentation scope from scope and target.
pub fn instrumentation_scope_from_scope_and_target(
scope: opentelemetry::InstrumentationScope,
target: Option<Cow<'static, str>>,
) -> InstrumentationScope {
if let Some(t) = target {
InstrumentationScope {
name: t.to_string(),
version: String::new(),
attributes: vec![],
..Default::default()
}
} else {
InstrumentationScope {
name: scope.name().to_owned(),
version: scope.version().map(ToOwned::to_owned).unwrap_or_default(),
attributes: Attributes::from(scope.attributes().cloned()).0,
..Default::default()
}
}
}

/// Creates instrumentation scope from scope reference and target.
pub fn instrumentation_scope_from_scope_ref_and_target(
scope: &opentelemetry::InstrumentationScope,
target: Option<Cow<'static, str>>,
) -> InstrumentationScope {
if let Some(t) = target {
InstrumentationScope {
name: t.to_string(),
version: String::new(),
attributes: vec![],
..Default::default()
}
} else {
InstrumentationScope {
name: scope.name().to_owned(),
version: scope.version().map(ToOwned::to_owned).unwrap_or_default(),
attributes: Attributes::from(scope.attributes().cloned()).0,
..Default::default()
}
}
}

/// Wrapper type for Vec<`KeyValue`>
#[derive(Default, Debug)]
pub struct Attributes(pub ::std::vec::Vec<opentelemetry_proto::tonic::common::v1::KeyValue>);

impl<I: IntoIterator<Item = opentelemetry::KeyValue>> From<I> for Attributes {
fn from(kvs: I) -> Self {
Attributes(
kvs.into_iter()
.map(|api_kv| KeyValue {
key: api_kv.key.as_str().to_string(),
value: Some(value_to_any_value(api_kv.value)),
})
.collect(),
)
}
}

#[cfg(feature = "logs")]
impl<K: Into<String>, V: Into<AnyValue>> FromIterator<(K, V)> for Attributes {
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
Attributes(
iter.into_iter()
.map(|(k, v)| KeyValue {
key: k.into(),
value: Some(v.into()),
})
.collect(),
)
}
}

/// Converts OpenTelemetry value to protobuf any value.
pub fn value_to_any_value(value: Value) -> AnyValue {
AnyValue {
value: match value {
Value::Bool(val) => Some(any_value::Value::BoolValue(val)),
Value::I64(val) => Some(any_value::Value::IntValue(val)),
Value::F64(val) => Some(any_value::Value::DoubleValue(val)),
Value::String(val) => Some(any_value::Value::StringValue(val.to_string())),
Value::Array(array) => Some(any_value::Value::ArrayValue(match array {
Array::Bool(vals) => array_into_proto(vals),
Array::I64(vals) => array_into_proto(vals),
Array::F64(vals) => array_into_proto(vals),
Array::String(vals) => array_into_proto(vals),
_ => unreachable!("Nonexistent array type"), // Needs to be updated when new array types are added
})),
_ => unreachable!("Nonexistent value type"), // Needs to be updated when new value types are added
},
}
}

fn array_into_proto<T>(vals: Vec<T>) -> ArrayValue
where
Value: From<T>,
{
let values = vals
.into_iter()
.map(|val| value_to_any_value(Value::from(val)))
.collect();

ArrayValue { values }
}

#[cfg(any(feature = "trace", feature = "logs"))]
pub(crate) fn resource_attributes(resource: &Resource) -> Attributes {
resource
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
.into()
}
}
Loading
Loading