-
Notifications
You must be signed in to change notification settings - Fork 717
Implement PoC of MeasurementProcessor proposal #4642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1afaad4
664b741
e446a30
463d343
67629e8
b3c95d3
732238b
6e02f9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
# OpenTelemetry Python - MeasurementProcessor Implementation | ||
|
||
This implementation adds support for **MeasurementProcessor** to the OpenTelemetry Python SDK, following the [OpenTelemetry Specification PR #4318](https://github.com/open-telemetry/opentelemetry-specification/pull/4318). | ||
|
||
## Overview | ||
|
||
The MeasurementProcessor allows you to process measurements before they are aggregated and exported. This enables powerful use cases such as: | ||
|
||
- **Dynamic injection of additional attributes** to measurements based on Context (e.g., from Baggage) | ||
- **Dropping attributes** (e.g., removing sensitive information) | ||
- **Dropping individual measurements** (e.g., filtering invalid values) | ||
- **Modifying measurements** (e.g., unit conversion, value transformation) | ||
|
||
## Key Features | ||
|
||
### Chain-of-Responsibility Pattern | ||
|
||
Unlike existing processors in OpenTelemetry (SpanProcessor, LogRecordProcessor), MeasurementProcessor uses a **chain-of-responsibility pattern** where each processor is responsible for calling the next processor in the chain. This gives processors fine-grained control over the processing flow. | ||
|
||
### High Performance | ||
|
||
The implementation is designed for high-performance scenarios: | ||
|
||
- Minimal overhead when no processors are configured | ||
- Efficient processor chaining using closures | ||
- No unnecessary object creation in the hot path | ||
|
||
## Architecture | ||
|
||
``` | ||
Measurement → Processor 1 → Processor 2 → ... → Processor N → Aggregation | ||
``` | ||
|
||
Each processor can: | ||
|
||
1. **Pass through unchanged**: `next_processor(measurement)` | ||
2. **Modify and pass**: `next_processor(modified_measurement)` | ||
3. **Drop measurement**: Simply don't call `next_processor` | ||
4. **Split into multiple**: Call `next_processor` multiple times | ||
|
||
## Usage | ||
|
||
### Basic Setup | ||
|
||
```python | ||
from opentelemetry.sdk.metrics import MeterProvider | ||
from opentelemetry.sdk.metrics._internal.measurement_processor import ( | ||
BaggageMeasurementProcessor, | ||
StaticAttributeMeasurementProcessor, | ||
ValueRangeMeasurementProcessor, | ||
) | ||
|
||
# Create measurement processors | ||
processors = [ | ||
BaggageMeasurementProcessor(), # Add baggage as attributes | ||
StaticAttributeMeasurementProcessor({"env": "prod"}), # Add static attributes | ||
ValueRangeMeasurementProcessor(min_value=0), # Drop negative values | ||
] | ||
|
||
# Configure MeterProvider with processors | ||
meter_provider = MeterProvider( | ||
measurement_processors=processors, | ||
# ... other configuration | ||
) | ||
``` | ||
|
||
### Built-in Processors | ||
|
||
#### 1. BaggageMeasurementProcessor | ||
|
||
Extracts values from OpenTelemetry Baggage and adds them as measurement attributes, enabling end-to-end telemetry correlation. | ||
|
||
```python | ||
# Add all baggage as attributes with "baggage." prefix | ||
processor = BaggageMeasurementProcessor() | ||
|
||
# Add only specific baggage keys | ||
processor = BaggageMeasurementProcessor(baggage_keys=["user.id", "trace.id"]) | ||
``` | ||
|
||
#### 2. AttributeFilterMeasurementProcessor | ||
|
||
Removes specific attributes from measurements (useful for removing sensitive data). | ||
|
||
```python | ||
processor = AttributeFilterMeasurementProcessor([ | ||
"password", "secret", "auth_token" | ||
]) | ||
``` | ||
|
||
#### 3. StaticAttributeMeasurementProcessor | ||
|
||
Adds static attributes to all measurements. | ||
|
||
```python | ||
processor = StaticAttributeMeasurementProcessor({ | ||
"environment": "production", | ||
"service": "api-server", | ||
"version": "1.0.0" | ||
}) | ||
``` | ||
|
||
### Custom Processors | ||
|
||
Create custom processors by implementing the `MeasurementProcessor` interface: | ||
|
||
```python | ||
from opentelemetry.sdk.metrics._internal.measurement_processor import MeasurementProcessor | ||
from opentelemetry.sdk.metrics._internal.measurement import Measurement | ||
from dataclasses import replace | ||
from typing import Callable | ||
|
||
class CustomMeasurementProcessor(MeasurementProcessor): | ||
def process( | ||
self, | ||
measurement: Measurement, | ||
next_processor: Callable[[Measurement], None] | ||
) -> None: | ||
# Example: Add timestamp attribute | ||
new_attributes = dict(measurement.attributes or {}) | ||
new_attributes["processed_at"] = str(int(time.time())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should show example of adding timestamp as a Metric attribute - is there any valid use-case for it? |
||
|
||
modified_measurement = replace(measurement, attributes=new_attributes) | ||
next_processor(modified_measurement) | ||
|
||
# Unit conversion processor | ||
class MetersToFeetProcessor(MeasurementProcessor): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this! A simpler one would be to do sec->msec. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Umm, I'm not sure. Perhaps someone with more experience with Metrics spec could answer this question. |
||
def process(self, measurement: Measurement, next_processor: Callable[[Measurement], None]) -> None: | ||
if measurement.instrument.name.endswith("_meters"): | ||
# Convert meters to feet | ||
feet_value = measurement.value * 3.28084 | ||
new_measurement = replace(measurement, value=feet_value) | ||
next_processor(new_measurement) | ||
else: | ||
next_processor(measurement) | ||
``` | ||
|
||
## Integration with Existing Metrics SDK | ||
|
||
The MeasurementProcessor integrates seamlessly with the existing metrics SDK: | ||
|
||
1. **SdkConfiguration** - Extended to include `measurement_processor_chain` | ||
2. **MeasurementConsumer** - Modified to process measurements through the processor chain | ||
3. **MeterProvider** - Extended constructor to accept `measurement_processors` parameter | ||
|
||
### Configuration Flow | ||
|
||
``` | ||
MeterProvider(measurement_processors=[...]) | ||
↓ | ||
SdkConfiguration(measurement_processor_chain=...) | ||
↓ | ||
SynchronousMeasurementConsumer(sdk_config) | ||
↓ | ||
MeasurementProcessorChain.process(measurement, final_consumer) | ||
``` | ||
|
||
## Advanced Examples | ||
|
||
### Baggage-based Attribute Injection | ||
|
||
```python | ||
from opentelemetry import baggage | ||
from opentelemetry.context import attach, detach | ||
|
||
# Set baggage in context | ||
ctx = baggage.set_baggage("user.id", "12345") | ||
ctx = baggage.set_baggage("tenant.id", "acme-corp", context=ctx) | ||
token = attach(ctx) | ||
|
||
try: | ||
# This measurement will automatically get baggage.user.id and baggage.tenant.id attributes | ||
counter.add(1, {"operation": "login"}) | ||
finally: | ||
detach(token) | ||
``` | ||
|
||
### Complex Processing Chain | ||
|
||
```python | ||
processors = [ | ||
# 1. Add baggage for correlation | ||
BaggageMeasurementProcessor(baggage_keys=["user.id", "trace.id"]), | ||
|
||
# 2. Add environment info | ||
StaticAttributeMeasurementProcessor({ | ||
"environment": "production", | ||
"datacenter": "us-west-2" | ||
}), | ||
|
||
# 3. Remove sensitive attributes | ||
AttributeFilterMeasurementProcessor(["password", "secret", "token"]), | ||
|
||
# 4. Custom processing | ||
CustomTimestampProcessor(), | ||
] | ||
``` | ||
|
||
### Error Handling | ||
|
||
Processors should handle errors gracefully to avoid breaking the metrics pipeline: | ||
|
||
```python | ||
class SafeProcessor(MeasurementProcessor): | ||
def process(self, measurement: Measurement, next_processor: Callable[[Measurement], None]) -> None: | ||
try: | ||
# Custom processing logic | ||
processed_measurement = self.transform(measurement) | ||
next_processor(processed_measurement) | ||
except Exception as e: | ||
# Log error but don't break the pipeline | ||
logger.warning(f"Processor error: {e}") | ||
# Pass through original measurement | ||
next_processor(measurement) | ||
``` | ||
|
||
--- | ||
|
||
**Note**: This implementation is experimental and the API may change based on community feedback and the final OpenTelemetry specification. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
#!/usr/bin/env python3 | ||
|
||
""" | ||
Example demonstrating the use of MeasurementProcessor with OpenTelemetry Python SDK. | ||
|
||
This example shows how to: | ||
1. Create custom measurement processors | ||
2. Chain multiple processors together | ||
3. Integrate with MeterProvider | ||
4. Use the provided utility processors | ||
""" | ||
|
||
import time | ||
from typing import Callable | ||
|
||
from opentelemetry import baggage, metrics | ||
from opentelemetry.context import attach, detach | ||
from opentelemetry.sdk.metrics import MeterProvider | ||
from opentelemetry.sdk.metrics._internal.measurement import Measurement | ||
from opentelemetry.sdk.metrics._internal.measurement_processor import ( | ||
AttributeFilterMeasurementProcessor, | ||
BaggageMeasurementProcessor, | ||
MeasurementProcessor, | ||
StaticAttributeMeasurementProcessor, | ||
) | ||
from opentelemetry.sdk.metrics.export import ( | ||
ConsoleMetricExporter, | ||
PeriodicExportingMetricReader, | ||
) | ||
|
||
|
||
class CustomMeasurementProcessor(MeasurementProcessor): | ||
"""Example of a custom measurement processor that adds a timestamp attribute.""" | ||
|
||
def process( | ||
self, | ||
measurement: Measurement, | ||
next_processor: Callable[[Measurement], None], | ||
) -> None: | ||
# Add current timestamp as an attribute | ||
from dataclasses import replace | ||
|
||
new_attributes = dict(measurement.attributes or {}) | ||
new_attributes["processed_at"] = str(int(time.time())) | ||
|
||
new_measurement = replace(measurement, attributes=new_attributes) | ||
next_processor(new_measurement) | ||
|
||
|
||
def main(): | ||
print("=== OpenTelemetry MeasurementProcessor Demo ===\n") | ||
|
||
# Create measurement processors | ||
processors = [ | ||
# Add baggage values as attributes (for correlation) | ||
BaggageMeasurementProcessor(), | ||
# Add static environment attributes | ||
StaticAttributeMeasurementProcessor( | ||
{"environment": "demo", "service": "measurement-processor-example"} | ||
), | ||
# Filter out any potentially sensitive attributes | ||
AttributeFilterMeasurementProcessor(["password", "secret"]), | ||
# Add custom processing | ||
CustomMeasurementProcessor(), | ||
] | ||
|
||
# Create metrics export pipeline | ||
console_exporter = ConsoleMetricExporter() | ||
reader = PeriodicExportingMetricReader( | ||
exporter=console_exporter, | ||
export_interval_millis=5000, # Export every 5 seconds | ||
) | ||
|
||
# Create MeterProvider with measurement processors | ||
meter_provider = MeterProvider( | ||
metric_readers=[reader], measurement_processors=processors | ||
) | ||
metrics.set_meter_provider(meter_provider) | ||
|
||
# Get meter and create instruments | ||
meter = metrics.get_meter(__name__) | ||
request_counter = meter.create_counter( | ||
"requests_total", description="Total number of requests" | ||
) | ||
response_time_histogram = meter.create_histogram( | ||
"response_time_seconds", description="Response time in seconds" | ||
) | ||
|
||
print("Recording measurements with different scenarios...\n") | ||
|
||
# Scenario 1: Regular measurement with baggage | ||
print("1. Recording with baggage context...") | ||
ctx = baggage.set_baggage("user.id", "12345") | ||
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx) | ||
token = attach(ctx) | ||
|
||
try: | ||
request_counter.add( | ||
1, {"http.route": "/api/users", "http.request.method": "GET"} | ||
) | ||
response_time_histogram.record( | ||
0.150, | ||
{"http.route": "/api/users", "http.response.status_code": 200}, | ||
) | ||
finally: | ||
detach(token) | ||
|
||
# Scenario 2: Measurement with filtered attributes | ||
print("2. Recording with attributes that should be filtered...") | ||
request_counter.add( | ||
1, | ||
{ | ||
"http.route": "/api/login", | ||
"http.request.method": "POST", | ||
"password": "should-be-filtered", # This will be filtered out | ||
"username": "alice", | ||
}, | ||
) | ||
|
||
# Scenario 3: Valid measurement without baggage | ||
print("3. Recording normal measurement...\n") | ||
request_counter.add( | ||
2, {"http.route": "/api/products", "http.request.method": "GET"} | ||
) | ||
response_time_histogram.record( | ||
0.075, | ||
{"http.route": "/api/products", "http.response.status_code": 200}, | ||
) | ||
|
||
print( | ||
"Waiting for metrics to be exported... (Check the console output above for processed measurements)\n" | ||
) | ||
|
||
# Wait a bit for export | ||
time.sleep(6) | ||
|
||
# Cleanup | ||
meter_provider.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Uh oh!
There was an error while loading. Please reload this page.