Skip to content

Commit ecc66dd

Browse files
Improve Otlp Delta Aggregation with support for max and Histogram.
1 parent c167402 commit ecc66dd

25 files changed

+1395
-1055
lines changed

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/AggregationTemporality.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,12 @@ public static io.opentelemetry.proto.metrics.v1.AggregationTemporality toOtlpAgg
4848
}
4949
}
5050

51+
static boolean isDelta(io.opentelemetry.proto.metrics.v1.AggregationTemporality aggregationTemporality) {
52+
return aggregationTemporality == io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;
53+
}
54+
55+
static boolean isCumulative(io.opentelemetry.proto.metrics.v1.AggregationTemporality aggregationTemporality) {
56+
return aggregationTemporality == io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
57+
}
58+
5159
}

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpCumulativeDistributionSummary.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpCumulativeTimer.java

Lines changed: 0 additions & 88 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2022 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.registry.otlp;
17+
18+
import io.micrometer.common.lang.Nullable;
19+
import io.micrometer.core.instrument.AbstractDistributionSummary;
20+
import io.micrometer.core.instrument.Clock;
21+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
22+
import io.micrometer.core.instrument.distribution.StepHistogram;
23+
import io.micrometer.core.instrument.step.StepMax;
24+
import io.micrometer.core.instrument.step.StepTuple2;
25+
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.DoubleAdder;
28+
import java.util.concurrent.atomic.LongAdder;
29+
30+
/**
31+
* A {@link io.micrometer.core.instrument.DistributionSummary} implementation customised
32+
* for OTLP protocol to export data based on the
33+
* {@link io.opentelemetry.proto.metrics.v1.AggregationTemporality} configured. In case of
34+
* Delta aggregation, this uses a {@link StepMax} and {@link StepHistogram} to measure the
35+
* distribution.
36+
*
37+
* @author Tommy Ludwig
38+
* @author Lenin Jaganathan
39+
*/
40+
class OtlpDistributionSummary extends AbstractDistributionSummary implements StartTimeAwareMeter {
41+
42+
private final long startTimeNanos;
43+
44+
private final LongAdder count = new LongAdder();
45+
46+
private final DoubleAdder total = new DoubleAdder();
47+
48+
@Nullable
49+
private final StepTuple2<Long, Double> countTotal;
50+
51+
@Nullable
52+
private final StepMax max;
53+
54+
OtlpDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig, double scale,
55+
long stepMillis, io.opentelemetry.proto.metrics.v1.AggregationTemporality aggregationTemporality) {
56+
super(id, scale,
57+
OtlpMeterRegistry.getHistogram(clock, stepMillis, distributionStatisticConfig, aggregationTemporality));
58+
59+
this.startTimeNanos = TimeUnit.MILLISECONDS.toNanos(clock.wallTime());
60+
countTotal = AggregationTemporality.isDelta(aggregationTemporality)
61+
? new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset) : null;
62+
max = AggregationTemporality.isDelta(aggregationTemporality) ? new StepMax(clock, stepMillis) : null;
63+
}
64+
65+
@Override
66+
protected void recordNonNegative(double amount) {
67+
count.add(1);
68+
total.add(amount);
69+
if (max != null) {
70+
max.record(amount);
71+
}
72+
}
73+
74+
@Override
75+
public long count() {
76+
return countTotal == null ? count.longValue() : countTotal.poll1();
77+
}
78+
79+
@Override
80+
public double totalAmount() {
81+
return countTotal == null ? total.doubleValue() : countTotal.poll2();
82+
}
83+
84+
@Override
85+
public double max() {
86+
return max == null ? 0 : max.poll();
87+
}
88+
89+
@Override
90+
public long getStartTimeNanos() {
91+
return startTimeNanos;
92+
}
93+
94+
}

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.opentelemetry.proto.metrics.v1.*;
4343
import io.opentelemetry.proto.resource.v1.Resource;
4444

45+
import java.time.Duration;
4546
import java.util.ArrayList;
4647
import java.util.List;
4748
import java.util.Map;
@@ -150,19 +151,15 @@ protected Counter newCounter(Meter.Id id) {
150151
@Override
151152
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig,
152153
PauseDetector pauseDetector) {
153-
return isCumulative()
154-
? new OtlpCumulativeTimer(id, this.clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit())
155-
: new OtlpStepTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
156-
config.step().toMillis());
154+
return new OtlpTimer(id, clock, distributionStatisticConfig, config.step().toMillis(), pauseDetector,
155+
getBaseTimeUnit(), otlpAggregationTemporality);
157156
}
158157

159158
@Override
160159
protected DistributionSummary newDistributionSummary(Meter.Id id,
161160
DistributionStatisticConfig distributionStatisticConfig, double scale) {
162-
return isCumulative()
163-
? new OtlpCumulativeDistributionSummary(id, this.clock, distributionStatisticConfig, scale, true)
164-
: new OtlpStepDistributionSummary(id, clock, distributionStatisticConfig, scale,
165-
config.step().toMillis());
161+
return new OtlpDistributionSummary(id, clock, distributionStatisticConfig, scale, config.step().toMillis(),
162+
otlpAggregationTemporality);
166163
}
167164

168165
@Override
@@ -389,4 +386,32 @@ Iterable<KeyValue> getResourceAttributes() {
389386
return attributes;
390387
}
391388

389+
static io.micrometer.core.instrument.distribution.Histogram getHistogram(Clock clock, long stepMillis,
390+
DistributionStatisticConfig distributionStatisticConfig,
391+
io.opentelemetry.proto.metrics.v1.AggregationTemporality aggregationTemporality) {
392+
// While publishing to OTLP, we export either Histogram datapoint / Summary
393+
// datapoint. So, we will make the histogram either of them and not both.
394+
// Though AbstractTimer/Distribution Summary prefers publishing percentiles,
395+
// exporting of histograms over percentiles is preferred in OTLP.
396+
if (distributionStatisticConfig.isPublishingHistogram()) {
397+
if (AggregationTemporality.isCumulative(aggregationTemporality)) {
398+
return new TimeWindowFixedBoundaryHistogram(clock, DistributionStatisticConfig.builder()
399+
// effectively never roll over
400+
.expiry(Duration.ofDays(1825))
401+
.percentiles()
402+
.bufferLength(1)
403+
.build()
404+
.merge(distributionStatisticConfig), true, false);
405+
}
406+
else if (AggregationTemporality.isDelta(aggregationTemporality)) {
407+
return new StepHistogram(clock, stepMillis, distributionStatisticConfig);
408+
}
409+
}
410+
411+
if (distributionStatisticConfig.isPublishingPercentiles()) {
412+
return new TimeWindowPercentileHistogram(clock, distributionStatisticConfig, false);
413+
}
414+
return NoopHistogram.INSTANCE;
415+
}
416+
392417
}

0 commit comments

Comments
 (0)