diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java index af775c7c45118..6c5e957e25ae9 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState; import java.util.Arrays; @@ -117,9 +118,9 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics private final Sensor eventPurgatoryTimeSensor; /** - * Sensor to measure the flush time. + * Sensor to measure the flush time and rate. */ - private final Sensor flushTimeSensor; + private final Sensor flushSensor; public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) { this.metrics = Objects.requireNonNull(metrics); @@ -205,8 +206,14 @@ public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) { "The " + suffix + " flush time in milliseconds" ) ); - this.flushTimeSensor = metrics.sensor(this.metricsGroup + "-FlushTime"); - this.flushTimeSensor.add(flushTimeHistogram); + this.flushSensor = metrics.sensor(this.metricsGroup + "-Flush"); + this.flushSensor.add(flushTimeHistogram); + this.flushSensor.add( + metrics.metricName( + "batch-flush-rate", + this.metricsGroup, + "The flushes per second."), + new Rate(TimeUnit.SECONDS, new WindowedCount())); } /** @@ -234,7 +241,7 @@ public void close() { metrics.removeSensor(eventQueueTimeSensor.name()); metrics.removeSensor(eventProcessingTimeSensor.name()); metrics.removeSensor(eventPurgatoryTimeSensor.name()); - metrics.removeSensor(flushTimeSensor.name()); + metrics.removeSensor(flushSensor.name()); } /** @@ -296,7 +303,7 @@ public void recordEventPurgatoryTime(long purgatoryTimeMs) { @Override public void recordFlushTime(long durationMs) { - flushTimeSensor.record(durationMs); + flushSensor.record(durationMs); } @Override diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java index 68f152f2bea08..be0a3fcf397b1 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java @@ -78,7 +78,8 @@ public void testMetricNames() { kafkaMetricName(metrics, "batch-flush-time-ms-p50"), kafkaMetricName(metrics, "batch-flush-time-ms-p95"), kafkaMetricName(metrics, "batch-flush-time-ms-p99"), - kafkaMetricName(metrics, "batch-flush-time-ms-p999") + kafkaMetricName(metrics, "batch-flush-time-ms-p999"), + kafkaMetricName(metrics, "batch-flush-rate") ); try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) { @@ -341,6 +342,19 @@ private static void assertMetricGauge(Metrics metrics, org.apache.kafka.common.M assertEquals(count, (long) metrics.metric(metricName).metricValue()); } + @Test + public void testFlushRateSensor() { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP); + IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordFlushTime((i + 1) * 1000)); + + org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, "batch-flush-rate"); + KafkaMetric metric = metrics.metrics().get(metricName); + assertEquals(0.1, metric.metricValue()); // 'total / window_s' + } + private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) { return metrics.metricName(name, METRICS_GROUP, "", keyValue); } diff --git a/docs/ops.html b/docs/ops.html index fcef480f2c91a..798b46ae2f241 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1895,6 +1895,11 @@