diff --git a/.github/workflows/changes.yml b/.github/workflows/changes.yml index c68eec6109509..252e1d7672131 100644 --- a/.github/workflows/changes.yml +++ b/.github/workflows/changes.yml @@ -143,6 +143,8 @@ on: value: ${{ jobs.e2e_tests.outputs.opentelemetry-logs }} e2e-opentelemetry-metrics: value: ${{ jobs.e2e_tests.outputs.opentelemetry-metrics }} + e2e-opentelemetry-traces: + value: ${{ jobs.e2e_tests.outputs.opentelemetry-traces }} int-tests-any: value: ${{ jobs.int_tests.outputs.any }} e2e-tests-any: @@ -403,6 +405,7 @@ jobs: datadog-metrics: ${{ steps.filter.outputs.datadog-metrics }} opentelemetry-logs: ${{ steps.filter.outputs.opentelemetry-logs }} opentelemetry-metrics: ${{ steps.filter.outputs.opentelemetry-metrics }} + opentelemetry-traces: ${{ steps.filter.outputs.opentelemetry-traces }} any: ${{ steps.detect-changes.outputs.any }} steps: - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 @@ -434,7 +437,8 @@ jobs: "datadog-logs": ${{ steps.filter.outputs.datadog-logs }}, "datadog-metrics": ${{ steps.filter.outputs.datadog-metrics }}, "opentelemetry-logs": ${{ steps.filter.outputs.opentelemetry-logs }}, - "opentelemetry-metrics": ${{ steps.filter.outputs.opentelemetry-metrics }} + "opentelemetry-metrics": ${{ steps.filter.outputs.opentelemetry-metrics }}, + "opentelemetry-traces": ${{ steps.filter.outputs.opentelemetry-traces }} } EOF ) diff --git a/.github/workflows/ci-integration-review.yml b/.github/workflows/ci-integration-review.yml index 933caab0801ae..e5bb6ad954df0 100644 --- a/.github/workflows/ci-integration-review.yml +++ b/.github/workflows/ci-integration-review.yml @@ -136,7 +136,7 @@ jobs: strategy: matrix: service: [ - "datadog-logs", "datadog-metrics", "opentelemetry-logs", "opentelemetry-metrics" + "datadog-logs", "datadog-metrics", "opentelemetry-logs", "opentelemetry-metrics", "opentelemetry-traces" ] steps: - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 @@ -162,7 +162,6 @@ jobs: max_attempts: 3 command: bash scripts/run-integration-test.sh e2e ${{ matrix.service }} - update-pr-status: name: Signal result to PR runs-on: ubuntu-24.04 diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 805ee63d39b43..4da8b5357d255 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -136,7 +136,7 @@ jobs: if: ${{ !failure() && !cancelled() && (github.event_name == 'merge_group' || github.event_name == 'workflow_dispatch') }} strategy: matrix: - service: [ "datadog-logs", "datadog-metrics", "opentelemetry-logs", "opentelemetry-metrics" ] + service: [ "datadog-logs", "datadog-metrics", "opentelemetry-logs", "opentelemetry-metrics", "opentelemetry-traces" ] timeout-minutes: 90 steps: diff --git a/scripts/e2e/opentelemetry-traces/compose.yaml b/scripts/e2e/opentelemetry-traces/compose.yaml new file mode 100644 index 0000000000000..bf7e533d47af5 --- /dev/null +++ b/scripts/e2e/opentelemetry-traces/compose.yaml @@ -0,0 +1,92 @@ +name: opentelemetry-vector-e2e +services: + otel-collector-source: + container_name: otel-collector-source + image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION} + init: true + user: "0:0" # test only, override special user with root + volumes: + - type: bind + source: ../../../tests/data/e2e/opentelemetry/traces/collector-source.yaml + target: /etc/otelcol-contrib/config.yaml + read_only: true + - type: volume + source: vector_target + target: /output + ports: + - "${OTEL_COLLECTOR_SOURCE_GRPC_PORT:-4317}:4317" + - "${OTEL_COLLECTOR_SOURCE_HTTP_PORT:-4318}:4318" + command: [ "--config=/etc/otelcol-contrib/config.yaml" ] + + traces-generator: + container_name: traces-generator + build: + context: ../../../ + dockerfile: ./scripts/e2e/opentelemetry-common/telemetrygen.Dockerfile + init: true + depends_on: + otel-collector-source: + condition: service_started + vector: + condition: service_started + otel-collector-sink: + condition: service_started + command: + - "-c" + - | + until nc -z otel-collector-source 4318; do + sleep 0.5 + done + telemetrygen traces \ + --otlp-endpoint=otel-collector-source:4318 \ + --otlp-insecure \ + --otlp-http \ + --traces=100 \ + --rate=0 + + otel-collector-sink: + container_name: otel-collector-sink + build: + context: ../../../ + dockerfile: ./scripts/e2e/opentelemetry-common/collector.Dockerfile + args: + CONFIG_COLLECTOR_VERSION: ${CONFIG_COLLECTOR_VERSION} + init: true + user: "0:0" # test only, override special user with root + command: [ "--config", "/etc/otelcol-contrib/config.yaml" ] + volumes: + - type: bind + source: ../../../tests/data/e2e/opentelemetry/traces/collector-sink.yaml + target: /etc/otelcol-contrib/config.yaml + read_only: true + - type: volume + source: vector_target + target: /output + ports: + - "${OTEL_COLLECTOR_SINK_HTTP_PORT:-5318}:5318" + + vector: + container_name: vector-otel-traces-e2e + image: ${CONFIG_VECTOR_IMAGE} + init: true + volumes: + - type: bind + source: ../../../tests/data/e2e/opentelemetry/traces/vector_otlp.yaml + target: /etc/vector/vector.yaml + read_only: true + - type: volume + source: vector_target + target: /output + environment: + - VECTOR_LOG=${VECTOR_LOG:-info} + - FEATURES=e2e-tests-opentelemetry + command: [ "vector", "-c", "/etc/vector/vector.yaml" ] + +volumes: + vector_target: + external: true + +networks: + default: + name: ${VECTOR_NETWORK} + external: true diff --git a/scripts/e2e/opentelemetry-traces/test.yaml b/scripts/e2e/opentelemetry-traces/test.yaml new file mode 100644 index 0000000000000..38c3c457d6d3f --- /dev/null +++ b/scripts/e2e/opentelemetry-traces/test.yaml @@ -0,0 +1,26 @@ +features: + - e2e-tests-opentelemetry + +test: "e2e" + +test_filter: "opentelemetry::traces::" + +runner: + needs_docker_socket: true + env: + OTEL_COLLECTOR_SOURCE_GRPC_PORT: '4317' + OTEL_COLLECTOR_SOURCE_HTTP_PORT: '4318' + OTEL_COLLECTOR_SINK_HTTP_PORT: '5318' + +matrix: + # Determines which `otel/opentelemetry-collector-contrib` version to use + collector_version: [ 'latest' ] + +# Only trigger this integration test if relevant OTEL source/sink files change +paths: + - "src/sources/opentelemetry/**" + - "src/sinks/opentelemetry/**" + - "src/internal_events/opentelemetry_*" + - "tests/e2e/opentelemetry/traces/**" + - "scripts/e2e/opentelemetry-traces/**" + - "lib/codecs/src/**/otlp.rs" diff --git a/tests/data/e2e/opentelemetry/traces/collector-sink.yaml b/tests/data/e2e/opentelemetry/traces/collector-sink.yaml new file mode 100644 index 0000000000000..3dbc477164f28 --- /dev/null +++ b/tests/data/e2e/opentelemetry/traces/collector-sink.yaml @@ -0,0 +1,26 @@ +receivers: + otlp: + protocols: + http: + endpoint: "0.0.0.0:5318" + +processors: + batch: { } + +exporters: + debug: { } + file: + path: /output/opentelemetry-traces/collector-file-exporter.log + rotation: + max_megabytes: 10 + max_days: 1 + +service: + pipelines: + traces: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ debug, file ] + telemetry: + logs: + level: "debug" diff --git a/tests/data/e2e/opentelemetry/traces/collector-source.yaml b/tests/data/e2e/opentelemetry/traces/collector-source.yaml new file mode 100644 index 0000000000000..0422b6cdd5a3b --- /dev/null +++ b/tests/data/e2e/opentelemetry/traces/collector-source.yaml @@ -0,0 +1,32 @@ +receivers: + otlp: + protocols: + http: + endpoint: "0.0.0.0:4318" + grpc: + +processors: + batch: { } + +exporters: + otlp/grpc: + endpoint: vector:4317 + tls: + insecure: true + otlphttp/vector: + endpoint: http://vector:4318 + tls: + insecure: true + debug: { } + file: + path: /output/opentelemetry-traces/collector-source-file-exporter.log + rotation: + max_megabytes: 10 + max_days: 1 + +service: + pipelines: + traces: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ debug, otlp/grpc, otlphttp/vector, file ] diff --git a/tests/data/e2e/opentelemetry/traces/vector_otlp.yaml b/tests/data/e2e/opentelemetry/traces/vector_otlp.yaml new file mode 100644 index 0000000000000..e1af7b10988ee --- /dev/null +++ b/tests/data/e2e/opentelemetry/traces/vector_otlp.yaml @@ -0,0 +1,42 @@ +sources: + source0: + type: opentelemetry + grpc: + address: 0.0.0.0:4317 + http: + address: 0.0.0.0:4318 + keepalive: + max_connection_age_jitter_factor: 0.1 + max_connection_age_secs: 300 + use_otlp_decoding: true + + internal_metrics: + type: internal_metrics + scrape_interval_secs: 60 + +sinks: + otel_sink: + inputs: + - source0.traces + type: opentelemetry + protocol: + type: http + uri: http://otel-collector-sink:5318/v1/traces + encoding: + codec: otlp + + otel_file_sink: + type: file + path: "/output/opentelemetry-traces/vector-file-sink.log" + inputs: + - source0.traces + encoding: + codec: json + + metrics_file_sink: + type: file + path: "/output/opentelemetry-traces/vector-internal-metrics-sink.log" + inputs: + - internal_metrics + encoding: + codec: json diff --git a/tests/e2e/opentelemetry/mod.rs b/tests/e2e/opentelemetry/mod.rs index 0ce403ce76429..728ee4c8cd2f1 100644 --- a/tests/e2e/opentelemetry/mod.rs +++ b/tests/e2e/opentelemetry/mod.rs @@ -1,5 +1,6 @@ pub mod logs; pub mod metrics; +pub mod traces; use std::{io, path::Path, process::Command}; diff --git a/tests/e2e/opentelemetry/traces/mod.rs b/tests/e2e/opentelemetry/traces/mod.rs new file mode 100644 index 0000000000000..510cc12dc5f7b --- /dev/null +++ b/tests/e2e/opentelemetry/traces/mod.rs @@ -0,0 +1,280 @@ +use vector_lib::opentelemetry::proto::TRACES_REQUEST_MESSAGE_TYPE; +use vector_lib::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; + +use crate::opentelemetry::{ + assert_service_name_with, parse_line_to_export_type_request, read_file_helper, +}; +use base64::prelude::*; + +// telemetrygen generates 100 traces, each trace contains exactly 2 spans (parent + child) +// Collector forwards via both gRPC and HTTP to Vector, so: 100 traces * 2 spans * 2 protocols = 400 spans +const EXPECTED_SPAN_COUNT: usize = 400; + +fn parse_export_traces_request(content: &str) -> Result { + // The file may contain multiple lines, each with a JSON object containing an array of resourceSpans + let mut merged_request = ExportTraceServiceRequest { + resource_spans: Vec::new(), + }; + + for (line_num, line) in content.lines().enumerate() { + let line = line.trim(); + if line.is_empty() { + continue; + } + + // Merge resource_spans from this request into the accumulated result + merged_request.resource_spans.extend( + parse_line_to_export_type_request::( + TRACES_REQUEST_MESSAGE_TYPE, + line, + ) + .map_err(|e| format!("Line {}: {}", line_num + 1, e))? + .resource_spans, + ); + } + + if merged_request.resource_spans.is_empty() { + return Err("No resource spans found in file".to_string()); + } + + Ok(merged_request) +} + +/// Asserts that all spans have expected static fields set: +/// - `name`: Should be non-empty +/// - `kind`: Should be set +fn assert_span_static_fields(request: &ExportTraceServiceRequest) { + for (rs_idx, rs) in request.resource_spans.iter().enumerate() { + for (ss_idx, ss) in rs.scope_spans.iter().enumerate() { + for (span_idx, span) in ss.spans.iter().enumerate() { + let prefix = + format!("resource_spans[{rs_idx}].scope_spans[{ss_idx}].spans[{span_idx}]"); + + // Assert name is not empty + assert!( + !span.name.is_empty(), + "{prefix} span name should not be empty" + ); + + // Assert span has a kind set (default is 0, but telemetrygen should set it) + // Note: SpanKind 0 is SPAN_KIND_UNSPECIFIED, but we're just checking it exists + // timeUnixNano fields are ignored as they vary + } + } + } +} + +/// Converts a span/trace ID from encoded bytes to raw binary bytes. +/// The collector outputs IDs as hex strings (e.g., "804ab72eed55cea1"), +/// Vector outputs as base64 (standard JSON encoding for binary fields). +/// Works for both span_id (8 bytes) and trace_id (16 bytes). +fn decode_span_id(id: &[u8]) -> Vec { + // Check if it's hex-encoded (even length, all ASCII hex characters) + if id.len().is_multiple_of(2) + && id.len() >= 16 + && id.iter().all(|&b| { + b.is_ascii_digit() || (b'a'..=b'f').contains(&b) || (b'A'..=b'F').contains(&b) + }) + { + // It's hex-encoded, decode it + return (0..id.len()) + .step_by(2) + .map(|i| { + let high = char::from(id[i]).to_digit(16).unwrap() as u8; + let low = char::from(id[i + 1]).to_digit(16).unwrap() as u8; + (high << 4) | low + }) + .collect(); + } + + // Check if it's base64-encoded (contains only base64 characters) + if id.iter().all(|&b| { + b.is_ascii_uppercase() + || b.is_ascii_lowercase() + || b.is_ascii_digit() + || b == b'+' + || b == b'/' + || b == b'=' + }) { + // Try to decode as base64 + if let Ok(decoded) = BASE64_STANDARD.decode(id) { + return decoded; + } + } + + // Already binary or unrecognized format + id.to_vec() +} + +/// Asserts that the span IDs and trace IDs from collector and vector match exactly. +/// This verifies that Vector correctly preserves span identity through the pipeline. +/// Note: Collector outputs IDs as hex strings, Vector outputs as binary. +fn assert_span_ids_match( + collector_request: &ExportTraceServiceRequest, + vector_request: &ExportTraceServiceRequest, +) { + use std::collections::HashSet; + + // Collect all span IDs from collector output (decode from hex) + let collector_span_ids: HashSet<_> = collector_request + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .map(|span| decode_span_id(&span.span_id)) + .collect(); + + // Collect all span IDs from vector output (decode from base64) + let vector_span_ids: HashSet<_> = vector_request + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .map(|span| decode_span_id(&span.span_id)) + .collect(); + + assert_eq!( + collector_span_ids.len(), + EXPECTED_SPAN_COUNT / 2, + "Collector should have {} unique span IDs", + EXPECTED_SPAN_COUNT / 2 + ); + + assert_eq!( + vector_span_ids.len(), + EXPECTED_SPAN_COUNT / 2, + "Vector should have {} unique span IDs", + EXPECTED_SPAN_COUNT / 2 + ); + + assert_eq!( + collector_span_ids, vector_span_ids, + "Span IDs from collector and Vector should match exactly" + ); + + // Also verify trace IDs match + let collector_trace_ids: HashSet<_> = collector_request + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .map(|span| decode_span_id(&span.trace_id)) + .collect(); + + let vector_trace_ids: HashSet<_> = vector_request + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .map(|span| decode_span_id(&span.trace_id)) + .collect(); + + assert_eq!( + collector_trace_ids, vector_trace_ids, + "Trace IDs from collector and Vector should match exactly" + ); +} + +#[test] +fn vector_sink_otel_sink_traces_match() { + // Read the collector-source output (what telemetrygen sent) + let collector_source_content = read_file_helper("traces", "collector-source-file-exporter.log") + .expect("Failed to read collector-source file"); + + // Read the collector-sink output (what Vector forwarded via OTLP) + let collector_sink_content = read_file_helper("traces", "collector-file-exporter.log") + .expect("Failed to read collector-sink file"); + + let collector_source_request = parse_export_traces_request(&collector_source_content) + .expect("Failed to parse collector-source traces as ExportTraceServiceRequest"); + let collector_sink_request = parse_export_traces_request(&collector_sink_content) + .expect("Failed to parse collector-sink traces as ExportTraceServiceRequest"); + + // Count total spans + let source_span_count = collector_source_request + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .count(); + + let sink_span_count = collector_sink_request + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .count(); + + assert_eq!( + source_span_count, + EXPECTED_SPAN_COUNT / 2, // TODO find out why /2 + "Collector-source received {source_span_count} spans, expected {}", + EXPECTED_SPAN_COUNT / 2 + ); + + assert_eq!( + sink_span_count, EXPECTED_SPAN_COUNT, + "Collector-sink received {sink_span_count} spans from Vector, expected {EXPECTED_SPAN_COUNT}" + ); + + // Verify service.name attribute + assert_service_name_with( + &collector_source_request.resource_spans, + "resource_spans", + "telemetrygen", + |rs| rs.resource.as_ref(), + ); + assert_service_name_with( + &collector_sink_request.resource_spans, + "resource_spans", + "telemetrygen", + |rs| rs.resource.as_ref(), + ); + + // Verify static span fields + assert_span_static_fields(&collector_source_request); + assert_span_static_fields(&collector_sink_request); + + // Verify span IDs match exactly between source and sink + // Both use the collector's file exporter with hex encoding, so they should match perfectly + assert_span_ids_match(&collector_source_request, &collector_sink_request); + + // Deduplicate collector-sink data by span_id before comparison + // Vector receives the same data via both gRPC and HTTP, so collector-sink has duplicates + let mut deduped_sink_request = ExportTraceServiceRequest { + resource_spans: Vec::new(), + }; + + let mut seen_span_ids = std::collections::HashSet::new(); + for rs in &collector_sink_request.resource_spans { + let mut deduped_rs = rs.clone(); + deduped_rs.scope_spans.clear(); + + for ss in &rs.scope_spans { + let mut deduped_ss = ss.clone(); + deduped_ss.spans.clear(); + + for span in &ss.spans { + let span_id = decode_span_id(&span.span_id); + if seen_span_ids.insert(span_id) { + deduped_ss.spans.push(span.clone()); + } + } + + if !deduped_ss.spans.is_empty() { + deduped_rs.scope_spans.push(deduped_ss); + } + } + + if !deduped_rs.scope_spans.is_empty() { + deduped_sink_request.resource_spans.push(deduped_rs); + } + } + + // Compare the full requests to verify Vector correctly forwarded all trace data via OTLP + // This tests the complete pipeline: telemetrygen -> collector-source -> Vector -> collector-sink + assert_eq!( + collector_source_request, deduped_sink_request, + "Traces received by collector-source should match deduplicated traces forwarded through Vector to collector-sink" + ); +}