Skip to content

Commit 82dff93

Browse files
shakuzenizeyelenin-jaganathanjonatan-ivanov
committed
Backport fix for partial step values on closing publish
This is a combination of the following commits: 1. Manual rollover of step meters for publishing partial step (#3681) (cherry picked from commit 1390610) 2. Polish "Manual rollover of step meters for publishing partial step" (#3704) (cherry picked from commit dfa43ef) 3. Avoid concurrent publish calls from publishSafely (#3714) (cherry picked from commit fddaf12) 4. Do not rollover again after closingRollover (#3730) (cherry picked from commit 4e64b29) 5. Polish "Avoid concurrent publish calls from publishSafely" (#3733) (cherry picked from commit fe07e98) 6. Polish "Do not rollover again after closingRollover" (#3737) (cherry picked from commit f82cdbc) 7. Fix @nullable import since we changed the package name in 1.10.0 Closes gh-3759 See gh-3681 See gh-1882 See gh-3704 See gh-3714 See gh-3711 See gh-3730 See gh-3720 See gh-3733 See gh-3737 Co-authored-by: Tommy Ludwig <[email protected]> Co-authored-by: Johnny Lim <[email protected]> Co-authored-by: Lenin Jaganathan <[email protected]> Co-authored-by: Jonatan Ivanov <[email protected]>
1 parent f022dde commit 82dff93

18 files changed

+634
-40
lines changed

micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@
2626
import java.util.concurrent.ScheduledExecutorService;
2727
import java.util.concurrent.ThreadFactory;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930

3031
public abstract class PushMeterRegistry extends MeterRegistry {
3132

3233
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PushMeterRegistry.class);
3334

3435
private final PushRegistryConfig config;
3536

37+
private final AtomicBoolean publishing = new AtomicBoolean(false);
38+
3639
@Nullable
3740
private ScheduledExecutorService scheduledExecutorService;
3841

@@ -49,15 +52,34 @@ protected PushMeterRegistry(PushRegistryConfig config, Clock clock) {
4952
/**
5053
* Catch uncaught exceptions thrown from {@link #publish()}.
5154
*/
52-
private void publishSafely() {
53-
try {
54-
publish();
55+
// VisibleForTesting
56+
void publishSafely() {
57+
if (this.publishing.compareAndSet(false, true)) {
58+
try {
59+
publish();
60+
}
61+
catch (Throwable e) {
62+
logger.warn("Unexpected exception thrown while publishing metrics for " + getClass().getSimpleName(),
63+
e);
64+
}
65+
finally {
66+
this.publishing.set(false);
67+
}
5568
}
56-
catch (Throwable e) {
57-
logger.warn("Unexpected exception thrown while publishing metrics for " + getClass().getSimpleName(), e);
69+
else {
70+
logger.warn("Publishing is already in progress. Skipping duplicate call to publish().");
5871
}
5972
}
6073

74+
/**
75+
* Returns if scheduled publishing of metrics is in progress.
76+
* @return if scheduled publishing of metrics is in progress
77+
* @since 1.11.0
78+
*/
79+
protected boolean isPublishing() {
80+
return publishing.get();
81+
}
82+
6183
/**
6284
* @deprecated Use {@link #start(ThreadFactory)} instead.
6385
*/
@@ -92,10 +114,10 @@ public void stop() {
92114

93115
@Override
94116
public void close() {
117+
stop();
95118
if (config.enabled() && !isClosed()) {
96119
publishSafely();
97120
}
98-
stop();
99121
super.close();
100122
}
101123

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepCounter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
*
2727
* @author Jon Schneider
2828
*/
29-
public class StepCounter extends AbstractMeter implements Counter {
29+
public class StepCounter extends AbstractMeter implements Counter, StepMeter {
3030

3131
private final StepDouble value;
3232

@@ -45,4 +45,9 @@ public double count() {
4545
return value.poll();
4646
}
4747

48+
@Override
49+
public void _closingRollover() {
50+
value.closingRollover();
51+
}
52+
4853
}

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepDistributionSummary.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* @author Jon Schneider
3333
* @author Johnny Lim
3434
*/
35-
public class StepDistributionSummary extends AbstractDistributionSummary {
35+
public class StepDistributionSummary extends AbstractDistributionSummary implements StepMeter {
3636

3737
private final LongAdder count = new LongAdder();
3838

@@ -86,4 +86,9 @@ public Iterable<Measurement> measure() {
8686
new Measurement(this::totalAmount, Statistic.TOTAL), new Measurement(this::max, Statistic.MAX));
8787
}
8888

89+
@Override
90+
public void _closingRollover() {
91+
countTotal.closingRollover();
92+
}
93+
8994
}

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepFunctionCounter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.lang.ref.WeakReference;
2323
import java.util.function.ToDoubleFunction;
2424

25-
public class StepFunctionCounter<T> extends AbstractMeter implements FunctionCounter {
25+
public class StepFunctionCounter<T> extends AbstractMeter implements FunctionCounter, StepMeter {
2626

2727
private final WeakReference<T> ref;
2828

@@ -50,4 +50,10 @@ public double count() {
5050
return count.poll();
5151
}
5252

53+
@Override
54+
public void _closingRollover() {
55+
count(); // add any difference from last count
56+
count.closingRollover();
57+
}
58+
5359
}

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepFunctionTimer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* @author Jon Schneider
3333
* @author Johnny Lim
3434
*/
35-
public class StepFunctionTimer<T> implements FunctionTimer {
35+
public class StepFunctionTimer<T> implements FunctionTimer, StepMeter {
3636

3737
private final Id id;
3838

@@ -118,4 +118,10 @@ public Type type() {
118118
return Type.TIMER;
119119
}
120120

121+
@Override
122+
public void _closingRollover() {
123+
accumulateCountAndTotal();
124+
countTotal.closingRollover();
125+
}
126+
121127
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2023 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.core.instrument.step;
17+
18+
/**
19+
* Internal. Intentionally package-private.
20+
*/
21+
interface StepMeter {
22+
23+
/**
24+
* This is an internal method not meant for general use.
25+
* <p>
26+
* Force a rollover of the values returned by a step meter and never roll over again
27+
* after.
28+
*/
29+
void _closingRollover();
30+
31+
}

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,16 @@ protected DistributionStatisticConfig defaultHistogramConfig() {
104104
.merge(DistributionStatisticConfig.DEFAULT);
105105
}
106106

107+
@Override
108+
public void close() {
109+
stop();
110+
if (!isPublishing()) {
111+
getMeters().stream()
112+
.filter(StepMeter.class::isInstance)
113+
.map(StepMeter.class::cast)
114+
.forEach(StepMeter::_closingRollover);
115+
}
116+
super.close();
117+
}
118+
107119
}

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepTimer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
/**
2929
* @author Jon Schneider
3030
*/
31-
public class StepTimer extends AbstractTimer {
31+
public class StepTimer extends AbstractTimer implements StepMeter {
3232

3333
private final LongAdder count = new LongAdder();
3434

@@ -79,4 +79,9 @@ public double max(final TimeUnit unit) {
7979
return TimeUtils.nanosToUnit(max.poll(), unit);
8080
}
8181

82+
@Override
83+
public void _closingRollover() {
84+
countTotal.closingRollover();
85+
}
86+
8287
}

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepTuple2.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ private void rollCount(long now) {
7373
}
7474
}
7575

76+
/**
77+
* Intended for internal use. Rolls the values regardless of the clock or current
78+
* time.
79+
*/
80+
void closingRollover() {
81+
// ensure rollover does not happen again
82+
lastInitPos.set(Long.MAX_VALUE);
83+
t1Previous = t1Supplier.get();
84+
t2Previous = t2Supplier.get();
85+
}
86+
7687
/**
7788
* @return The value for the last completed interval.
7889
*/

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepValue.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,13 @@ public V poll() {
7373
return previous;
7474
}
7575

76+
/**
77+
* internal use only; intentionally left package-private
78+
*/
79+
void closingRollover() {
80+
// make sure value does not roll over again if passing a step boundary
81+
lastInitPos.set(Long.MAX_VALUE);
82+
previous = valueSupplier().get();
83+
}
84+
7685
}

micrometer-core/src/test/java/io/micrometer/core/instrument/push/PushMeterRegistryTest.java

Lines changed: 92 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,20 @@
2222
import io.micrometer.core.instrument.step.StepMeterRegistry;
2323
import io.micrometer.core.instrument.step.StepRegistryConfig;
2424
import io.micrometer.core.instrument.util.NamedThreadFactory;
25-
import org.junit.jupiter.api.AfterEach;
2625
import org.junit.jupiter.api.Test;
2726

2827
import java.time.Duration;
29-
import java.util.concurrent.CountDownLatch;
30-
import java.util.concurrent.ThreadFactory;
31-
import java.util.concurrent.TimeUnit;
28+
import java.util.ArrayDeque;
29+
import java.util.Arrays;
30+
import java.util.Deque;
31+
import java.util.Map;
32+
import java.util.concurrent.*;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.function.ToDoubleFunction;
3435
import java.util.function.ToLongFunction;
3536

37+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
38+
import static java.util.concurrent.TimeUnit.SECONDS;
3639
import static org.assertj.core.api.Assertions.assertThat;
3740
import static org.assertj.core.api.Assertions.assertThatCode;
3841

@@ -62,15 +65,9 @@ public String get(String key) {
6265

6366
CountDownLatch latch = new CountDownLatch(2);
6467

65-
PushMeterRegistry pushMeterRegistry = new ThrowingPushMeterRegistry(config, latch);
66-
67-
@AfterEach
68-
void cleanUp() {
69-
pushMeterRegistry.close();
70-
}
71-
7268
@Test
7369
void whenUncaughtExceptionInPublish_taskStillScheduled() throws InterruptedException {
70+
PushMeterRegistry pushMeterRegistry = new ThrowingPushMeterRegistry(config, latch);
7471
pushMeterRegistry.start(threadFactory);
7572
assertThat(latch.await(500, TimeUnit.MILLISECONDS))
7673
.as("publish should continue to be scheduled even if an uncaught exception is thrown")
@@ -79,17 +76,98 @@ void whenUncaughtExceptionInPublish_taskStillScheduled() throws InterruptedExcep
7976

8077
@Test
8178
void whenUncaughtExceptionInPublish_closeRegistrySuccessful() {
79+
PushMeterRegistry pushMeterRegistry = new ThrowingPushMeterRegistry(config, latch);
8280
assertThatCode(() -> pushMeterRegistry.close()).doesNotThrowAnyException();
8381
}
8482

8583
@Test
8684
@Issue("#3712")
8785
void publishOnlyHappensOnceWithMultipleClose() {
88-
pushMeterRegistry = new CountingPushMeterRegistry(config, Clock.SYSTEM);
86+
CountingPushMeterRegistry pushMeterRegistry = new CountingPushMeterRegistry(config, Clock.SYSTEM);
8987
pushMeterRegistry.close();
90-
assertThat(((CountingPushMeterRegistry) pushMeterRegistry).publishCount.get()).isOne();
88+
assertThat(pushMeterRegistry.publishCount.get()).isOne();
9189
pushMeterRegistry.close();
92-
assertThat(((CountingPushMeterRegistry) pushMeterRegistry).publishCount.get()).isOne();
90+
assertThat(pushMeterRegistry.publishCount.get()).isOne();
91+
}
92+
93+
@Test
94+
@Issue("#3711")
95+
void scheduledPublishOverlapWithPublishOnClose() throws InterruptedException {
96+
MockClock clock = new MockClock();
97+
CyclicBarrier barrier = new CyclicBarrier(2);
98+
OverlappingStepMeterRegistry overlappingStepMeterRegistry = new OverlappingStepMeterRegistry(config, clock,
99+
barrier);
100+
Counter c1 = overlappingStepMeterRegistry.counter("c1");
101+
Counter c2 = overlappingStepMeterRegistry.counter("c2");
102+
c1.increment();
103+
c2.increment(2.5);
104+
clock.add(config.step());
105+
106+
// simulated scheduled publish
107+
Thread scheduledPublishingThread = new Thread(
108+
() -> ((PushMeterRegistry) overlappingStepMeterRegistry).publishSafely(),
109+
"scheduledMetricsPublisherThread");
110+
scheduledPublishingThread.start();
111+
// publish on shutdown
112+
Thread onClosePublishThread = new Thread(overlappingStepMeterRegistry::close, "shutdownHookThread");
113+
onClosePublishThread.start();
114+
scheduledPublishingThread.join();
115+
onClosePublishThread.join();
116+
117+
assertThat(overlappingStepMeterRegistry.publishes).as("only one publish happened").hasSize(1);
118+
Deque<Double> firstPublishValues = overlappingStepMeterRegistry.publishes.get(0);
119+
assertThat(firstPublishValues.pop()).isEqualTo(1);
120+
assertThat(firstPublishValues.pop()).isEqualTo(2.5);
121+
}
122+
123+
private static class OverlappingStepMeterRegistry extends StepMeterRegistry {
124+
125+
private final AtomicInteger numberOfPublishes = new AtomicInteger();
126+
127+
private final Map<Integer, Deque<Double>> publishes = new ConcurrentHashMap<>();
128+
129+
private final CyclicBarrier barrier;
130+
131+
OverlappingStepMeterRegistry(StepRegistryConfig config, Clock clock, CyclicBarrier barrier) {
132+
super(config, clock);
133+
this.barrier = barrier;
134+
}
135+
136+
@Override
137+
protected TimeUnit getBaseTimeUnit() {
138+
return SECONDS;
139+
}
140+
141+
@Override
142+
protected void publish() {
143+
try {
144+
barrier.await(100, MILLISECONDS);
145+
}
146+
catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
147+
throw new RuntimeException(e);
148+
}
149+
int publishIndex = numberOfPublishes.getAndIncrement();
150+
getMeters().stream()
151+
.filter(meter -> meter instanceof Counter)
152+
.map(meter -> (Counter) meter)
153+
.forEach(counter -> publishes.merge(publishIndex, new ArrayDeque<>(Arrays.asList(counter.count())),
154+
(l1, l2) -> {
155+
l1.addAll(l2);
156+
return l1;
157+
}));
158+
}
159+
160+
@Override
161+
public void close() {
162+
try {
163+
barrier.await(100, MILLISECONDS);
164+
}
165+
catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
166+
throw new RuntimeException(e);
167+
}
168+
super.close();
169+
}
170+
93171
}
94172

95173
static class CountingPushMeterRegistry extends PushMeterRegistry {

0 commit comments

Comments
 (0)