Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState;

import java.util.Arrays;
Expand Down Expand Up @@ -117,9 +118,9 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
private final Sensor eventPurgatoryTimeSensor;

/**
* Sensor to measure the flush time.
* Sensor to measure the flush time and rate.
*/
private final Sensor flushTimeSensor;
private final Sensor flushSensor;

public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {
this.metrics = Objects.requireNonNull(metrics);
Expand Down Expand Up @@ -205,8 +206,14 @@ public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {
"The " + suffix + " flush time in milliseconds"
)
);
this.flushTimeSensor = metrics.sensor(this.metricsGroup + "-FlushTime");
this.flushTimeSensor.add(flushTimeHistogram);
this.flushSensor = metrics.sensor(this.metricsGroup + "-Flush");
this.flushSensor.add(flushTimeHistogram);
this.flushSensor.add(
metrics.metricName(
"batch-flush-rate",
this.metricsGroup,
"The flushes per second."),
new Rate(TimeUnit.SECONDS, new WindowedCount()));
}

/**
Expand Down Expand Up @@ -234,7 +241,7 @@ public void close() {
metrics.removeSensor(eventQueueTimeSensor.name());
metrics.removeSensor(eventProcessingTimeSensor.name());
metrics.removeSensor(eventPurgatoryTimeSensor.name());
metrics.removeSensor(flushTimeSensor.name());
metrics.removeSensor(flushSensor.name());
}

/**
Expand Down Expand Up @@ -296,7 +303,7 @@ public void recordEventPurgatoryTime(long purgatoryTimeMs) {

@Override
public void recordFlushTime(long durationMs) {
flushTimeSensor.record(durationMs);
flushSensor.record(durationMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void testMetricNames() {
kafkaMetricName(metrics, "batch-flush-time-ms-p50"),
kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
kafkaMetricName(metrics, "batch-flush-time-ms-p99"),
kafkaMetricName(metrics, "batch-flush-time-ms-p999")
kafkaMetricName(metrics, "batch-flush-time-ms-p999"),
kafkaMetricName(metrics, "batch-flush-rate")
);

try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
Expand Down Expand Up @@ -341,6 +342,19 @@ private static void assertMetricGauge(Metrics metrics, org.apache.kafka.common.M
assertEquals(count, (long) metrics.metric(metricName).metricValue());
}

@Test
public void testFlushRateSensor() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);

CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordFlushTime((i + 1) * 1000));

org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, "batch-flush-rate");
KafkaMetric metric = metrics.metrics().get(metricName);
assertEquals(0.1, metric.metricValue()); // 'total / window_s'
}

private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) {
return metrics.metricName(name, METRICS_GROUP, "", keyValue);
}
Expand Down
5 changes: 5 additions & 0 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -1895,6 +1895,11 @@ <h4 class="anchor-heading"><a id="group_coordinator_monitoring" class="anchor-li
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-time-ms-[max|p50|p95|p99|p999]</td>
<td>The time that a batch took to be flushed to the local partition</td>
</tr>
<tr>
<td>Batch Flush Rate</td>
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-rate</td>
<td>The number of batches flushed per second</td>
</tr>
<tr>
<td>Group Count, per group type</td>
<td>kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic}</td>
Expand Down