diff --git a/docs/examples/metrics/measurement-processors/README.md b/docs/examples/metrics/measurement-processors/README.md new file mode 100644 index 0000000000..1cc01e449e --- /dev/null +++ b/docs/examples/metrics/measurement-processors/README.md @@ -0,0 +1,219 @@ +# OpenTelemetry Python - MeasurementProcessor Implementation + +This implementation adds support for **MeasurementProcessor** to the OpenTelemetry Python SDK, following the [OpenTelemetry Specification PR #4318](https://github.com/open-telemetry/opentelemetry-specification/pull/4318). + +## Overview + +The MeasurementProcessor allows you to process measurements before they are aggregated and exported. This enables powerful use cases such as: + +- **Dynamic injection of additional attributes** to measurements based on Context (e.g., from Baggage) +- **Dropping attributes** (e.g., removing sensitive information) +- **Dropping individual measurements** (e.g., filtering invalid values) +- **Modifying measurements** (e.g., unit conversion, value transformation) + +## Key Features + +### Chain-of-Responsibility Pattern + +Unlike existing processors in OpenTelemetry (SpanProcessor, LogRecordProcessor), MeasurementProcessor uses a **chain-of-responsibility pattern** where each processor is responsible for calling the next processor in the chain. This gives processors fine-grained control over the processing flow. + +### High Performance + +The implementation is designed for high-performance scenarios: + +- Minimal overhead when no processors are configured +- Efficient processor chaining using closures +- No unnecessary object creation in the hot path + +## Architecture + +``` +Measurement → Processor 1 → Processor 2 → ... → Processor N → Aggregation +``` + +Each processor can: + +1. **Pass through unchanged**: `next_processor(measurement)` +2. **Modify and pass**: `next_processor(modified_measurement)` +3. **Drop measurement**: Simply don't call `next_processor` +4. **Split into multiple**: Call `next_processor` multiple times + +## Usage + +### Basic Setup + +```python +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics._internal.measurement_processor import ( + BaggageMeasurementProcessor, + StaticAttributeMeasurementProcessor, + ValueRangeMeasurementProcessor, +) + +# Create measurement processors +processors = [ + BaggageMeasurementProcessor(), # Add baggage as attributes + StaticAttributeMeasurementProcessor({"env": "prod"}), # Add static attributes + ValueRangeMeasurementProcessor(min_value=0), # Drop negative values +] + +# Configure MeterProvider with processors +meter_provider = MeterProvider( + measurement_processors=processors, + # ... other configuration +) +``` + +### Built-in Processors + +#### 1. BaggageMeasurementProcessor + +Extracts values from OpenTelemetry Baggage and adds them as measurement attributes, enabling end-to-end telemetry correlation. + +```python +# Add all baggage as attributes with "baggage." prefix +processor = BaggageMeasurementProcessor() + +# Add only specific baggage keys +processor = BaggageMeasurementProcessor(baggage_keys=["user.id", "trace.id"]) +``` + +#### 2. AttributeFilterMeasurementProcessor + +Removes specific attributes from measurements (useful for removing sensitive data). + +```python +processor = AttributeFilterMeasurementProcessor([ + "password", "secret", "auth_token" +]) +``` + +#### 3. StaticAttributeMeasurementProcessor + +Adds static attributes to all measurements. + +```python +processor = StaticAttributeMeasurementProcessor({ + "environment": "production", + "service": "api-server", + "version": "1.0.0" +}) +``` + +### Custom Processors + +Create custom processors by implementing the `MeasurementProcessor` interface: + +```python +from opentelemetry.sdk.metrics._internal.measurement_processor import MeasurementProcessor +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from dataclasses import replace +from typing import Callable + +class CustomMeasurementProcessor(MeasurementProcessor): + def process( + self, + measurement: Measurement, + next_processor: Callable[[Measurement], None] + ) -> None: + # Example: Add timestamp attribute + new_attributes = dict(measurement.attributes or {}) + new_attributes["processed_at"] = str(int(time.time())) + + modified_measurement = replace(measurement, attributes=new_attributes) + next_processor(modified_measurement) + +# Unit conversion processor +class MetersToFeetProcessor(MeasurementProcessor): + def process(self, measurement: Measurement, next_processor: Callable[[Measurement], None]) -> None: + if measurement.instrument.name.endswith("_meters"): + # Convert meters to feet + feet_value = measurement.value * 3.28084 + new_measurement = replace(measurement, value=feet_value) + next_processor(new_measurement) + else: + next_processor(measurement) +``` + +## Integration with Existing Metrics SDK + +The MeasurementProcessor integrates seamlessly with the existing metrics SDK: + +1. **SdkConfiguration** - Extended to include `measurement_processor_chain` +2. **MeasurementConsumer** - Modified to process measurements through the processor chain +3. **MeterProvider** - Extended constructor to accept `measurement_processors` parameter + +### Configuration Flow + +``` +MeterProvider(measurement_processors=[...]) + ↓ +SdkConfiguration(measurement_processor_chain=...) + ↓ +SynchronousMeasurementConsumer(sdk_config) + ↓ +MeasurementProcessorChain.process(measurement, final_consumer) +``` + +## Advanced Examples + +### Baggage-based Attribute Injection + +```python +from opentelemetry import baggage +from opentelemetry.context import attach, detach + +# Set baggage in context +ctx = baggage.set_baggage("user.id", "12345") +ctx = baggage.set_baggage("tenant.id", "acme-corp", context=ctx) +token = attach(ctx) + +try: + # This measurement will automatically get baggage.user.id and baggage.tenant.id attributes + counter.add(1, {"operation": "login"}) +finally: + detach(token) +``` + +### Complex Processing Chain + +```python +processors = [ + # 1. Add baggage for correlation + BaggageMeasurementProcessor(baggage_keys=["user.id", "trace.id"]), + + # 2. Add environment info + StaticAttributeMeasurementProcessor({ + "environment": "production", + "datacenter": "us-west-2" + }), + + # 3. Remove sensitive attributes + AttributeFilterMeasurementProcessor(["password", "secret", "token"]), + + # 4. Custom processing + CustomTimestampProcessor(), +] +``` + +### Error Handling + +Processors should handle errors gracefully to avoid breaking the metrics pipeline: + +```python +class SafeProcessor(MeasurementProcessor): + def process(self, measurement: Measurement, next_processor: Callable[[Measurement], None]) -> None: + try: + # Custom processing logic + processed_measurement = self.transform(measurement) + next_processor(processed_measurement) + except Exception as e: + # Log error but don't break the pipeline + logger.warning(f"Processor error: {e}") + # Pass through original measurement + next_processor(measurement) +``` + +--- + +**Note**: This implementation is experimental and the API may change based on community feedback and the final OpenTelemetry specification. diff --git a/docs/examples/metrics/measurement-processors/measurement_processors.py b/docs/examples/metrics/measurement-processors/measurement_processors.py new file mode 100644 index 0000000000..de88f24200 --- /dev/null +++ b/docs/examples/metrics/measurement-processors/measurement_processors.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 + +""" +Example demonstrating the use of MeasurementProcessor with OpenTelemetry Python SDK. + +This example shows how to: +1. Create custom measurement processors +2. Chain multiple processors together +3. Integrate with MeterProvider +4. Use the provided utility processors +""" + +import time +from typing import Callable + +from opentelemetry import baggage, metrics +from opentelemetry.context import attach, detach +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.measurement_processor import ( + AttributeFilterMeasurementProcessor, + BaggageMeasurementProcessor, + MeasurementProcessor, + StaticAttributeMeasurementProcessor, +) +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, +) + + +class CustomMeasurementProcessor(MeasurementProcessor): + """Example of a custom measurement processor that adds a timestamp attribute.""" + + def process( + self, + measurement: Measurement, + next_processor: Callable[[Measurement], None], + ) -> None: + # Add current timestamp as an attribute + from dataclasses import replace + + new_attributes = dict(measurement.attributes or {}) + new_attributes["processed_at"] = str(int(time.time())) + + new_measurement = replace(measurement, attributes=new_attributes) + next_processor(new_measurement) + + +def main(): + print("=== OpenTelemetry MeasurementProcessor Demo ===\n") + + # Create measurement processors + processors = [ + # Add baggage values as attributes (for correlation) + BaggageMeasurementProcessor(), + # Add static environment attributes + StaticAttributeMeasurementProcessor( + {"environment": "demo", "service": "measurement-processor-example"} + ), + # Filter out any potentially sensitive attributes + AttributeFilterMeasurementProcessor(["password", "secret"]), + # Add custom processing + CustomMeasurementProcessor(), + ] + + # Create metrics export pipeline + console_exporter = ConsoleMetricExporter() + reader = PeriodicExportingMetricReader( + exporter=console_exporter, + export_interval_millis=5000, # Export every 5 seconds + ) + + # Create MeterProvider with measurement processors + meter_provider = MeterProvider( + metric_readers=[reader], measurement_processors=processors + ) + metrics.set_meter_provider(meter_provider) + + # Get meter and create instruments + meter = metrics.get_meter(__name__) + request_counter = meter.create_counter( + "requests_total", description="Total number of requests" + ) + response_time_histogram = meter.create_histogram( + "response_time_seconds", description="Response time in seconds" + ) + + print("Recording measurements with different scenarios...\n") + + # Scenario 1: Regular measurement with baggage + print("1. Recording with baggage context...") + ctx = baggage.set_baggage("user.id", "12345") + ctx = baggage.set_baggage("synthetic_request", "true", context=ctx) + token = attach(ctx) + + try: + request_counter.add( + 1, {"http.route": "/api/users", "http.request.method": "GET"} + ) + response_time_histogram.record( + 0.150, + {"http.route": "/api/users", "http.response.status_code": 200}, + ) + finally: + detach(token) + + # Scenario 2: Measurement with filtered attributes + print("2. Recording with attributes that should be filtered...") + request_counter.add( + 1, + { + "http.route": "/api/login", + "http.request.method": "POST", + "password": "should-be-filtered", # This will be filtered out + "username": "alice", + }, + ) + + # Scenario 3: Valid measurement without baggage + print("3. Recording normal measurement...\n") + request_counter.add( + 2, {"http.route": "/api/products", "http.request.method": "GET"} + ) + response_time_histogram.record( + 0.075, + {"http.route": "/api/products", "http.response.status_code": 200}, + ) + + print( + "Waiting for metrics to be exported... (Check the console output above for processed measurements)\n" + ) + + # Wait a bit for export + time.sleep(6) + + # Cleanup + meter_provider.shutdown() + + +if __name__ == "__main__": + main() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index faa0959fce..86afa2f3d1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -58,6 +58,10 @@ MeasurementConsumer, SynchronousMeasurementConsumer, ) +from opentelemetry.sdk.metrics._internal.measurement_processor import ( + MeasurementProcessor, + MeasurementProcessorChain, +) from opentelemetry.sdk.metrics._internal.sdk_configuration import ( SdkConfiguration, ) @@ -418,12 +422,19 @@ def __init__( exemplar_filter: Optional[ExemplarFilter] = None, shutdown_on_exit: bool = True, views: Sequence["opentelemetry.sdk.metrics.view.View"] = (), + measurement_processors: Sequence[MeasurementProcessor] = (), ): self._lock = Lock() self._meter_lock = Lock() self._atexit_handler = None if resource is None: resource = Resource.create({}) + measurement_processor_chain = None + if measurement_processors: + measurement_processor_chain = MeasurementProcessorChain( + list(measurement_processors) + ) + self._sdk_config = SdkConfiguration( exemplar_filter=( exemplar_filter @@ -434,6 +445,7 @@ def __init__( resource=resource, metric_readers=metric_readers, views=views, + measurement_processor_chain=measurement_processor_chain, ) self._measurement_consumer = SynchronousMeasurementConsumer( sdk_config=self._sdk_config diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051..fbcbfea626 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -26,6 +26,9 @@ from opentelemetry.metrics._internal.instrument import CallbackOptions from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.measurement_processor import ( + MeasurementProcessorChain, +) from opentelemetry.sdk.metrics._internal.metric_reader_storage import ( MetricReaderStorage, ) @@ -62,6 +65,10 @@ def __init__( ) -> None: self._lock = Lock() self._sdk_config = sdk_config + self._measurement_processor_chain = ( + sdk_config.measurement_processor_chain + or MeasurementProcessorChain() + ) # should never be mutated self._reader_storages: Mapping[ "opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage @@ -78,18 +85,22 @@ def __init__( ] = [] def consume_measurement(self, measurement: Measurement) -> None: - should_sample_exemplar = ( - self._sdk_config.exemplar_filter.should_sample( - measurement.value, - measurement.time_unix_nano, - measurement.attributes, - measurement.context, - ) - ) - for reader_storage in self._reader_storages.values(): - reader_storage.consume_measurement( - measurement, should_sample_exemplar + def final_consumer(processed_measurement: Measurement) -> None: + should_sample_exemplar = ( + self._sdk_config.exemplar_filter.should_sample( + processed_measurement.value, + processed_measurement.time_unix_nano, + processed_measurement.attributes, + processed_measurement.context, + ) ) + for reader_storage in self._reader_storages.values(): + reader_storage.consume_measurement( + processed_measurement, should_sample_exemplar + ) + + # Process the measurement through the processor chain + self._measurement_processor_chain.process(measurement, final_consumer) def register_asynchronous_instrument( self, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_processor.py new file mode 100644 index 0000000000..b89870c227 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_processor.py @@ -0,0 +1,215 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from dataclasses import replace +from typing import Callable, List, Optional + +from opentelemetry.baggage import get_all +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.util.types import Attributes + + +class MeasurementProcessor(ABC): + """Interface for processing measurements in a chain-of-responsibility pattern. + + MeasurementProcessor allows implementing custom processing logic for measurements + including: + - Dynamic injection of additional attributes based on Context + - Dropping attributes + - Dropping individual measurements + - Modifying measurements + + Each processor in the chain is responsible for calling the next processor. + """ + + @abstractmethod + def process( + self, + measurement: Measurement, + next_processor: Callable[[Measurement], None], + ) -> None: + """Process a measurement and optionally call the next processor in the chain. + + Args: + measurement: The measurement to process + next_processor: Callable to invoke the next processor in the chain. + Processors MUST call this to continue the chain unless + they explicitly want to drop the measurement. + """ + + +class MeasurementProcessorChain: + """Manages a chain of MeasurementProcessors. + + This class maintains a list of processors and provides a way to execute + them in sequence using the chain-of-responsibility pattern. + """ + + def __init__( + self, processors: Optional[List[MeasurementProcessor]] = None + ): + """Initialize the processor chain. + + Args: + processors: List of processors to include in the chain. + If None or empty, uses NoOpMeasurementProcessor. + """ + self._processors: List[MeasurementProcessor] = processors or [] + + def process( + self, + measurement: Measurement, + final_consumer: Callable[[Measurement], None], + ) -> None: + """Process a measurement through the entire chain. + + Args: + measurement: The measurement to process + final_consumer: The final consumer that will handle the measurement + after all processors have been applied + """ + if not self._processors: + final_consumer(measurement) + return + + def create_next_step(index: int) -> Callable[[Measurement], None]: + """Create the next step function for the processor at the given index.""" + if index >= len(self._processors): + return final_consumer + + def next_step(processed_measurement: Measurement) -> None: + if index + 1 < len(self._processors): + self._processors[index + 1].process( + processed_measurement, create_next_step(index + 1) + ) + else: + final_consumer(processed_measurement) + + return next_step + + # Start the chain with the first processor + self._processors[0].process(measurement, create_next_step(0)) + + +# Example implementations, should probably be moved to https://github.com/open-telemetry/opentelemetry-python-contrib +class BaggageMeasurementProcessor(MeasurementProcessor): + """Processor that adds baggage values as measurement attributes. + + This processor extracts values from OpenTelemetry baggage and adds them + as attributes to the measurement, enabling end-to-end telemetry correlation. + """ + + def __init__(self, baggage_keys: Optional[List[str]] = None): + """Initialize the baggage processor. + + Args: + baggage_keys: List of specific baggage keys to extract. + If None, extracts all baggage keys. + """ + self._baggage_keys = baggage_keys + + def process( + self, + measurement: Measurement, + next_processor: Callable[[Measurement], None], + ) -> None: + """Add baggage values as measurement attributes.""" + try: + # Get all baggage from the measurement context + baggage = get_all(measurement.context) + + if baggage: + # Create new attributes by merging existing with baggage + new_attributes = dict(measurement.attributes or {}) + + for key, value in baggage.items(): + # Filter by specific keys if provided + if self._baggage_keys is None or key in self._baggage_keys: + new_attributes[f"baggage.{key}"] = str(value) + + # Create a new measurement with updated attributes + new_measurement = replace( + measurement, attributes=new_attributes + ) + next_processor(new_measurement) + else: + # No baggage, pass through unchanged + next_processor(measurement) + except Exception: + # On any error, pass through unchanged + next_processor(measurement) + + +class AttributeFilterMeasurementProcessor(MeasurementProcessor): + """Processor that filters out specific attributes from measurements.""" + + def __init__(self, excluded_attributes: List[str]): + """Initialize the attribute filter processor. + + Args: + excluded_attributes: List of attribute keys to remove from measurements. + """ + self._excluded_attributes = set(excluded_attributes) + + def process( + self, + measurement: Measurement, + next_processor: Callable[[Measurement], None], + ) -> None: + """Remove specified attributes from the measurement.""" + if not measurement.attributes or not self._excluded_attributes: + next_processor(measurement) + return + + # Filter out excluded attributes + new_attributes = { + key: value + for key, value in measurement.attributes.items() + if key not in self._excluded_attributes + } + + # Create a new measurement with filtered attributes + new_measurement = replace(measurement, attributes=new_attributes) + next_processor(new_measurement) + + +class StaticAttributeMeasurementProcessor(MeasurementProcessor): + """Processor that adds static attributes to all measurements.""" + + def __init__(self, static_attributes: Attributes): + """Initialize the static attribute processor. + + Args: + static_attributes: Dictionary of attributes to add to all measurements. + """ + self._static_attributes = static_attributes or {} + + def process( + self, + measurement: Measurement, + next_processor: Callable[[Measurement], None], + ) -> None: + """Add static attributes to the measurement.""" + if not self._static_attributes: + next_processor(measurement) + return + + # Create new attributes by merging existing with static attributes + new_attributes = dict(measurement.attributes or {}) + new_attributes.update(self._static_attributes) + + # Create a new measurement with updated attributes + new_measurement = replace(measurement, attributes=new_attributes) + next_processor(new_measurement) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py index 3d88facb0c..770a52532f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py @@ -15,11 +15,14 @@ # pylint: disable=unused-import from dataclasses import dataclass -from typing import Sequence +from typing import Optional, Sequence # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics import opentelemetry.sdk.resources +from opentelemetry.sdk.metrics._internal.measurement_processor import ( + MeasurementProcessorChain, +) @dataclass @@ -28,3 +31,4 @@ class SdkConfiguration: resource: "opentelemetry.sdk.resources.Resource" metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"] views: Sequence["opentelemetry.sdk.metrics.View"] + measurement_processor_chain: Optional[MeasurementProcessorChain] = None diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_processor.py b/opentelemetry-sdk/tests/metrics/test_measurement_processor.py new file mode 100644 index 0000000000..37746cc788 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_measurement_processor.py @@ -0,0 +1,292 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from time import time_ns +from unittest.mock import Mock + +from opentelemetry.context import get_current +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.measurement_processor import ( + AttributeFilterMeasurementProcessor, + BaggageMeasurementProcessor, + MeasurementProcessor, + MeasurementProcessorChain, + NoOpMeasurementProcessor, + StaticAttributeMeasurementProcessor, + ValueRangeMeasurementProcessor, +) + + +class TestMeasurementProcessorChain(unittest.TestCase): + def test_empty_chain(self): + """Test that an empty chain uses NoOpMeasurementProcessor.""" + chain = MeasurementProcessorChain() + final_consumer = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + chain.process(measurement, final_consumer) + final_consumer.assert_called_once_with(measurement) + + def test_single_processor(self): + """Test chain with a single processor.""" + processor = Mock(spec=MeasurementProcessor) + chain = MeasurementProcessorChain([processor]) + final_consumer = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + # Configure the processor to call next_processor + def mock_process(m, next_proc): + next_proc(m) + + processor.process.side_effect = mock_process + + chain.process(measurement, final_consumer) + processor.process.assert_called_once() + final_consumer.assert_called_once_with(measurement) + + def test_multiple_processors(self): + """Test chain with multiple processors.""" + processor1 = Mock(spec=MeasurementProcessor) + processor2 = Mock(spec=MeasurementProcessor) + chain = MeasurementProcessorChain([processor1, processor2]) + final_consumer = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + # Configure processors to call next_processor + def mock_process1(m, next_proc): + next_proc(m) + + def mock_process2(m, next_proc): + next_proc(m) + + processor1.process.side_effect = mock_process1 + processor2.process.side_effect = mock_process2 + + chain.process(measurement, final_consumer) + processor1.process.assert_called_once() + processor2.process.assert_called_once() + final_consumer.assert_called_once_with(measurement) + + +class TestNoOpMeasurementProcessor(unittest.TestCase): + def test_no_op_processor(self): + """Test that NoOpMeasurementProcessor passes measurements through unchanged.""" + processor = NoOpMeasurementProcessor() + next_processor = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + processor.process(measurement, next_processor) + next_processor.assert_called_once_with(measurement) + + +class TestStaticAttributeMeasurementProcessor(unittest.TestCase): + def test_add_static_attributes(self): + """Test adding static attributes to measurements.""" + static_attrs = {"environment": "test", "version": "1.0"} + processor = StaticAttributeMeasurementProcessor(static_attrs) + next_processor = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"original": "value"}, + ) + + processor.process(measurement, next_processor) + + # Check that next_processor was called with modified measurement + next_processor.assert_called_once() + modified_measurement = next_processor.call_args[0][0] + + # Should have both original and static attributes + expected_attrs = { + "original": "value", + "environment": "test", + "version": "1.0", + } + self.assertEqual(modified_measurement.attributes, expected_attrs) + + def test_empty_static_attributes(self): + """Test with empty static attributes.""" + processor = StaticAttributeMeasurementProcessor({}) + next_processor = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"original": "value"}, + ) + + processor.process(measurement, next_processor) + next_processor.assert_called_once_with(measurement) + + +class TestAttributeFilterMeasurementProcessor(unittest.TestCase): + def test_filter_attributes(self): + """Test filtering out specific attributes.""" + processor = AttributeFilterMeasurementProcessor( + ["sensitive", "internal"] + ) + next_processor = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={ + "keep": "value", + "sensitive": "secret", + "internal": "data", + "also_keep": "another", + }, + ) + + processor.process(measurement, next_processor) + + # Check that next_processor was called with filtered measurement + next_processor.assert_called_once() + modified_measurement = next_processor.call_args[0][0] + + # Should only have non-filtered attributes + expected_attrs = {"keep": "value", "also_keep": "another"} + self.assertEqual(modified_measurement.attributes, expected_attrs) + + def test_no_attributes_to_filter(self): + """Test with no attributes to filter.""" + processor = AttributeFilterMeasurementProcessor(["nonexistent"]) + next_processor = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"keep": "value"}, + ) + + processor.process(measurement, next_processor) + + # Should keep original attributes + next_processor.assert_called_once() + modified_measurement = next_processor.call_args[0][0] + self.assertEqual(modified_measurement.attributes, {"keep": "value"}) + + +class TestValueRangeMeasurementProcessor(unittest.TestCase): + def test_value_in_range(self): + """Test that values in range are passed through.""" + processor = ValueRangeMeasurementProcessor(min_value=0, max_value=100) + next_processor = Mock() + measurement = Measurement( + value=50.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + processor.process(measurement, next_processor) + next_processor.assert_called_once_with(measurement) + + def test_value_below_min(self): + """Test that values below minimum are dropped.""" + processor = ValueRangeMeasurementProcessor(min_value=0, max_value=100) + next_processor = Mock() + measurement = Measurement( + value=-10.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + processor.process(measurement, next_processor) + next_processor.assert_not_called() + + def test_value_above_max(self): + """Test that values above maximum are dropped.""" + processor = ValueRangeMeasurementProcessor(min_value=0, max_value=100) + next_processor = Mock() + measurement = Measurement( + value=150.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + processor.process(measurement, next_processor) + next_processor.assert_not_called() + + def test_no_limits(self): + """Test with no minimum or maximum limits.""" + processor = ValueRangeMeasurementProcessor() + next_processor = Mock() + measurement = Measurement( + value=999999.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + processor.process(measurement, next_processor) + next_processor.assert_called_once_with(measurement) + + +class TestBaggageMeasurementProcessor(unittest.TestCase): + def test_no_baggage(self): + """Test with no baggage in context.""" + processor = BaggageMeasurementProcessor() + next_processor = Mock() + measurement = Measurement( + value=1.0, + time_unix_nano=time_ns(), + instrument=Mock(), + context=get_current(), + attributes={"test": "value"}, + ) + + processor.process(measurement, next_processor) + next_processor.assert_called_once_with(measurement) + + +if __name__ == "__main__": + unittest.main()