From f3ae82d4ebe2a37d7fb67cee855999ff056ea0d5 Mon Sep 17 00:00:00 2001 From: Peter Winckles Date: Fri, 21 Feb 2025 10:12:53 -0600 Subject: [PATCH 1/2] Fix race updating metrics with writer is stopped This fixes another race condition that causes metrics to be lost when they are updated while the writer is stopped and after the monitorable has been re-added to the writer. In this case, the update needs to set the initial value on `PcpValueInfo` so that the correct value is written when the writer is started. --- .../java/io/pcp/parfait/dxm/PcpMmvWriter.java | 20 ++++++++++++---- .../java/io/pcp/parfait/dxm/PcpValueInfo.java | 6 ++++- .../dxm/PcpMmvWriterIntegrationTest.java | 23 +++++++++++++++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java b/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java index dbbd2a8f..6f527d5f 100644 --- a/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java +++ b/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java @@ -167,6 +167,7 @@ public void putBytes(ByteBuffer buffer, String value) { private volatile State state = State.STOPPED; private final Monitor stateMonitor = new Monitor(); private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED); + private final Monitor.Guard isStopped = stateMonitor.newGuard(() -> state == State.STOPPED); private volatile Duration maxWaitStart = Duration.ofSeconds(10); private volatile boolean usePerMetricLock = true; private final Map perMetricByteBuffers = newConcurrentMap(); @@ -323,12 +324,23 @@ public final void updateMetric(MetricName name, Object value) { // implementation here is a little complicated to avoid taking a lock on the happy paths. if (state == State.STARTED) { doUpdateMetric(name, value); - } else if (state == State.STARTING) { - if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) { - // Leave the monitor immediately because we only care about being notified about the state change + } else if (stateMonitor.enterIf(isStopped)) { + // In this case, the writer has not been started yet, but it's possible the monitorable has already been + // added back to the writer. If it has, we need to update the initial value so that it gets written + // correctly when the writer is started. If it's not present, then we don't need to do anything because the + // monitorable will be re-added in the future with the correct value. + try { + PcpValueInfo info = metricData.get(name); + if (info != null) { + info.setInitialValue(value); + } + } finally { stateMonitor.leave(); - doUpdateMetric(name, value); } + } else if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) { + // Leave the monitor immediately because we only care about being notified about the state change + stateMonitor.leave(); + doUpdateMetric(name, value); } } diff --git a/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java b/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java index c4d8d3c8..0ca17b68 100644 --- a/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java +++ b/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java @@ -32,7 +32,7 @@ public final class PcpValueInfo implements PcpOffset,MmvWritable { private static final int VALUE_LENGTH = 32; private final MetricName metricName; - private final Object initialValue; + private volatile Object initialValue; private final PcpMetricInfo metricInfo; private final Instance instance; private final PcpString largeValue; @@ -78,6 +78,10 @@ private Object getInitialValue() { return initialValue; } + public void setInitialValue(Object initialValue) { + this.initialValue = initialValue; + } + private int getInstanceOffset() { return instance == null ? 0 : instance.getOffset(); } diff --git a/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java b/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java index d7472562..4b0e5f3d 100644 --- a/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java +++ b/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java @@ -203,6 +203,29 @@ public void putBytes(ByteBuffer buffer, Number value) { assertMetric("mmv." + order.get(0), is("10.000")); } + @Test + public void metricUpdatesWhileResettingWriterShouldNotBeLostWhenRecordedBeforeWriterStarted() throws Exception { + pcpMmvWriterV1.reset(); + pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1); + + pcpMmvWriterV1.start(); + + waitForReload(); + + assertMetric("mmv.value1", is("1.000")); + + pcpMmvWriterV1.reset(); + + pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1); + pcpMmvWriterV1.updateMetric(MetricName.parse("value1"), 10); + + pcpMmvWriterV1.start(); + + waitForReload(); + + assertMetric("mmv.value1", is("10.000")); + } + private void assertMetric(String metricName, Matcher expectedValue) throws Exception { String actual = pcpClient.getMetric(metricName); assertThat(actual, expectedValue); From 73b0aa4fee4a0f7dcb07f7fad40b0c67f9558442 Mon Sep 17 00:00:00 2001 From: Peter Winckles Date: Fri, 21 Feb 2025 10:29:16 -0600 Subject: [PATCH 2/2] Add caching to improve speed adding monitorables Adds a cache to the `MetricNameMapper` and `MetricNameValidator`. This improves performance of mapping metric names because a regex no longer has to be executed over and over again. The validator cache helps because the metric name no longer has to be encoded over and over again. Neither of these caches expire entries, but this is not an issue because it's currently not possible to remove monitorables from Parfait. --- .../pcp/parfait/dxm/MetricNameValidator.java | 11 +++++- .../parfait/pcp/CachingMetricNameMapper.java | 37 +++++++++++++++++++ .../io/pcp/parfait/pcp/PcpMonitorBridge.java | 5 ++- 3 files changed, 49 insertions(+), 4 deletions(-) create mode 100644 parfait-pcp/src/main/java/io/pcp/parfait/pcp/CachingMetricNameMapper.java diff --git a/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java b/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java index 791a6127..a91a83b0 100644 --- a/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java +++ b/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java @@ -16,12 +16,16 @@ package io.pcp.parfait.dxm; +import java.util.HashSet; +import java.util.Set; + import static io.pcp.parfait.dxm.PcpMmvWriter.PCP_CHARSET; class MetricNameValidator { private final int nameLimit; private final int domainLimit; + private final Set validNames = new HashSet<>(); MetricNameValidator(int nameLimit, int domainLimit) { this.nameLimit = nameLimit; @@ -29,8 +33,11 @@ class MetricNameValidator { } void validateNameConstraints(MetricName metricName) { - validateName(metricName); - validateInstance(metricName); + if (!validNames.contains(metricName)) { + validateName(metricName); + validateInstance(metricName); + validNames.add(metricName); + } } private void validateInstance(MetricName metricName) { diff --git a/parfait-pcp/src/main/java/io/pcp/parfait/pcp/CachingMetricNameMapper.java b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/CachingMetricNameMapper.java new file mode 100644 index 00000000..4ea0a0c8 --- /dev/null +++ b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/CachingMetricNameMapper.java @@ -0,0 +1,37 @@ +/* + * Copyright 2009-2017 Aconex + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.pcp.parfait.pcp; + +import io.pcp.parfait.dxm.MetricName; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class CachingMetricNameMapper implements MetricNameMapper { + + private final Map cache = new ConcurrentHashMap<>(); + private final MetricNameMapper innerMapper; + + public CachingMetricNameMapper(MetricNameMapper mapper) { + this.innerMapper = mapper; + } + + @Override + public MetricName map(String name) { + return cache.computeIfAbsent(name, innerMapper::map); + } +} diff --git a/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java index e840b6a5..5e36aa3c 100644 --- a/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java +++ b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java @@ -67,12 +67,13 @@ public class PcpMonitorBridge implements MonitoringView { private final TextSource shortTextSource; private final TextSource longTextSource; - private volatile PcpWriter pcpWriter; + private final PcpWriter pcpWriter; private volatile boolean started; public PcpMonitorBridge(PcpWriter writer) { - this(writer, MetricNameMapper.PASSTHROUGH_MAPPER, DEFAULT_SHORT_TEXT_SOURCE, + this(writer, new CachingMetricNameMapper(MetricNameMapper.PASSTHROUGH_MAPPER), + DEFAULT_SHORT_TEXT_SOURCE, DEFAULT_LONG_TEXT_SOURCE); }