Skip to content

Commit a3f2b58

Browse files
KAFKA-19825 [1/2]: Add effective batch linger time metric
1 parent 2c44448 commit a3f2b58

File tree

6 files changed

+162
-0
lines changed

6 files changed

+162
-0
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,7 @@ private void flushCurrentBatch() {
789789
}
790790

791791
long flushStartMs = time.milliseconds();
792+
runtimeMetrics.recordLingerTime(flushStartMs - currentBatch.appendTimeMs);
792793
// Write the records to the log and update the last written offset.
793794
long offset = partitionWriter.append(
794795
tp,

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable {
6060
*/
6161
void recordEventPurgatoryTime(long durationMs);
6262

63+
/**
64+
* Record the effective batch linger time.
65+
*
66+
* @param durationMs The linger time in milliseconds.
67+
*/
68+
void recordLingerTime(long durationMs);
69+
6370
/**
6471
* Record the flush time.
6572
*

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
5858
*/
5959
public static final String EVENT_PURGATORY_TIME_METRIC_NAME = "event-purgatory-time-ms";
6060

61+
/**
62+
* The effective batch linger time metric name.
63+
*/
64+
public static final String BATCH_LINGER_TIME_METRIC_NAME = "batch-linger-time-ms";
65+
6166
/**
6267
* The flush time metric name.
6368
*/
@@ -116,6 +121,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
116121
*/
117122
private final Sensor eventPurgatoryTimeSensor;
118123

124+
/**
125+
* Sensor to measure the effective batch linger time.
126+
*/
127+
private final Sensor lingerTimeSensor;
128+
119129
/**
120130
* Sensor to measure the flush time.
121131
*/
@@ -199,6 +209,15 @@ public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {
199209
this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup + "-EventPurgatoryTime");
200210
this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram);
201211

212+
KafkaMetricHistogram lingerTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
213+
suffix -> kafkaMetricName(
214+
BATCH_LINGER_TIME_METRIC_NAME + "-" + suffix,
215+
"The " + suffix + " effective linger time in milliseconds"
216+
)
217+
);
218+
this.lingerTimeSensor = metrics.sensor(this.metricsGroup + "-LingerTime");
219+
this.lingerTimeSensor.add(lingerTimeHistogram);
220+
202221
KafkaMetricHistogram flushTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
203222
suffix -> kafkaMetricName(
204223
BATCH_FLUSH_TIME_METRIC_NAME + "-" + suffix,
@@ -234,6 +253,7 @@ public void close() {
234253
metrics.removeSensor(eventQueueTimeSensor.name());
235254
metrics.removeSensor(eventProcessingTimeSensor.name());
236255
metrics.removeSensor(eventPurgatoryTimeSensor.name());
256+
metrics.removeSensor(lingerTimeSensor.name());
237257
metrics.removeSensor(flushTimeSensor.name());
238258
}
239259

@@ -294,6 +314,11 @@ public void recordEventPurgatoryTime(long purgatoryTimeMs) {
294314
eventPurgatoryTimeSensor.record(purgatoryTimeMs);
295315
}
296316

317+
@Override
318+
public void recordLingerTime(long durationMs) {
319+
lingerTimeSensor.record(durationMs);
320+
}
321+
297322
@Override
298323
public void recordFlushTime(long durationMs) {
299324
flushTimeSensor.record(durationMs);

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.stream.IntStream;
3333

3434
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
35+
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME;
3536
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
3637
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME;
3738
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_QUEUE_TIME_METRIC_NAME;
@@ -74,6 +75,11 @@ public void testMetricNames() {
7475
kafkaMetricName(metrics, "event-purgatory-time-ms-p95"),
7576
kafkaMetricName(metrics, "event-purgatory-time-ms-p99"),
7677
kafkaMetricName(metrics, "event-purgatory-time-ms-p999"),
78+
kafkaMetricName(metrics, "batch-linger-time-ms-max"),
79+
kafkaMetricName(metrics, "batch-linger-time-ms-p50"),
80+
kafkaMetricName(metrics, "batch-linger-time-ms-p95"),
81+
kafkaMetricName(metrics, "batch-linger-time-ms-p99"),
82+
kafkaMetricName(metrics, "batch-linger-time-ms-p999"),
7783
kafkaMetricName(metrics, "batch-flush-time-ms-max"),
7884
kafkaMetricName(metrics, "batch-flush-time-ms-p50"),
7985
kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
@@ -236,6 +242,7 @@ public void testEventQueueSizeMetricsGroupIsolation() {
236242
EVENT_QUEUE_TIME_METRIC_NAME,
237243
EVENT_PROCESSING_TIME_METRIC_NAME,
238244
EVENT_PURGATORY_TIME_METRIC_NAME,
245+
BATCH_LINGER_TIME_METRIC_NAME,
239246
BATCH_FLUSH_TIME_METRIC_NAME
240247
})
241248
public void testHistogramMetrics(String metricNamePrefix) {
@@ -255,6 +262,9 @@ public void testHistogramMetrics(String metricNamePrefix) {
255262
case EVENT_PURGATORY_TIME_METRIC_NAME:
256263
runtimeMetrics.recordEventPurgatoryTime(i);
257264
break;
265+
case BATCH_LINGER_TIME_METRIC_NAME:
266+
runtimeMetrics.recordLingerTime(i);
267+
break;
258268
case BATCH_FLUSH_TIME_METRIC_NAME:
259269
runtimeMetrics.recordFlushTime(i);
260270
}

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.time.Duration;
5050
import java.util.ArrayList;
5151
import java.util.Arrays;
52+
import java.util.Collections;
5253
import java.util.List;
5354
import java.util.Map;
5455
import java.util.OptionalInt;
@@ -4182,6 +4183,119 @@ public void testEmptyBatch() throws Exception {
41824183
assertNull(complete1.get(5, TimeUnit.SECONDS));
41834184
}
41844185

4186+
@Test
4187+
public void testRecordAppendLingerTime() throws Exception {
4188+
MockTimer timer = new MockTimer();
4189+
4190+
// Writer sleeps for 10ms before appending records.
4191+
MockPartitionWriter writer = new MockPartitionWriter(timer.time(), Integer.MAX_VALUE, false);
4192+
CoordinatorRuntimeMetrics runtimeMetrics = mock(CoordinatorRuntimeMetrics.class);
4193+
4194+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
4195+
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
4196+
.withTime(timer.time())
4197+
.withTimer(timer)
4198+
.withDefaultWriteTimeOut(Duration.ofMillis(20))
4199+
.withLoader(new MockCoordinatorLoader())
4200+
.withEventProcessor(new DirectEventProcessor())
4201+
.withPartitionWriter(writer)
4202+
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
4203+
.withCoordinatorRuntimeMetrics(runtimeMetrics)
4204+
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
4205+
.withSerializer(new StringSerializer())
4206+
.withAppendLingerMs(10)
4207+
.withExecutorService(mock(ExecutorService.class))
4208+
.build();
4209+
4210+
// Schedule the loading.
4211+
runtime.scheduleLoadOperation(TP, 10);
4212+
4213+
// Verify the initial state.
4214+
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
4215+
assertNull(ctx.currentBatch);
4216+
4217+
// Get the max batch size.
4218+
int maxBatchSize = writer.config(TP).maxMessageSize();
4219+
4220+
// Create records with a quarter of the max batch size each. Keep in mind that
4221+
// each batch has a header so it is not possible to have those four records
4222+
// in one single batch.
4223+
List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
4224+
char[] payload = new char[maxBatchSize / 4];
4225+
Arrays.fill(payload, c);
4226+
return new String(payload);
4227+
}).collect(Collectors.toList());
4228+
4229+
// Write #1 with two records.
4230+
long firstBatchTimestamp = timer.time().milliseconds();
4231+
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
4232+
state -> new CoordinatorResult<>(records.subList(0, 2), "response1")
4233+
);
4234+
4235+
// A batch has been created.
4236+
assertNotNull(ctx.currentBatch);
4237+
4238+
// Write #2 with one record.
4239+
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
4240+
state -> new CoordinatorResult<>(records.subList(2, 3), "response2")
4241+
);
4242+
4243+
// Verify the state. Records are replayed but no batch written.
4244+
assertEquals(Collections.emptyList(), writer.entries(TP));
4245+
verify(runtimeMetrics, times(0)).recordFlushTime(10);
4246+
4247+
// Write #3 with one record. This one cannot go into the existing batch
4248+
// so the existing batch should be flushed and a new one should be created.
4249+
long secondBatchTimestamp = timer.time().milliseconds();
4250+
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50),
4251+
state -> new CoordinatorResult<>(records.subList(3, 4), "response3")
4252+
);
4253+
4254+
// Verify the state. Records are replayed. The previous batch
4255+
// got flushed with all the records but the new one from #3.
4256+
// The new batch's timestamp comes from before the flush.
4257+
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
4258+
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
4259+
assertEquals(List.of(
4260+
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
4261+
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
4262+
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
4263+
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
4264+
), ctx.coordinator.coordinator().fullRecords());
4265+
assertEquals(List.of(
4266+
records(firstBatchTimestamp, records.subList(0, 3))
4267+
), writer.entries(TP));
4268+
verify(runtimeMetrics, times(1)).recordLingerTime(0);
4269+
4270+
// Advance past the linger time.
4271+
timer.advanceClock(11);
4272+
4273+
// Verify the state. The pending batch is flushed.
4274+
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
4275+
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
4276+
assertEquals(List.of(
4277+
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
4278+
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
4279+
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
4280+
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
4281+
), ctx.coordinator.coordinator().fullRecords());
4282+
assertEquals(List.of(
4283+
records(secondBatchTimestamp, records.subList(0, 3)),
4284+
records(secondBatchTimestamp, records.subList(3, 4))
4285+
), writer.entries(TP));
4286+
verify(runtimeMetrics, times(1)).recordLingerTime(21);
4287+
4288+
// Commit and verify that writes are completed.
4289+
writer.commit(TP);
4290+
assertTrue(write1.isDone());
4291+
assertTrue(write2.isDone());
4292+
assertTrue(write3.isDone());
4293+
assertEquals(4L, ctx.coordinator.lastCommittedOffset());
4294+
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
4295+
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
4296+
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
4297+
}
4298+
41854299
@Test
41864300
public void testRecordFlushTime() throws Exception {
41874301
MockTimer timer = new MockTimer();

docs/ops.html

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,6 +1890,11 @@ <h4 class="anchor-heading"><a id="group_coordinator_monitoring" class="anchor-li
18901890
<td>kafka.server:type=group-coordinator-metrics,name=event-purgatory-time-ms-[max|p50|p95|p99|p999]</td>
18911891
<td>The time that an event waited in the purgatory before being completed</td>
18921892
</tr>
1893+
<tr>
1894+
<td>Batch Linger Time (Ms)</td>
1895+
<td>kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-[max|p50|p95|p99|p999]</td>
1896+
<td>The effective linger time of a batch before being flushed to the local partition</td>
1897+
</tr>
18931898
<tr>
18941899
<td>Batch Flush Time (Ms)</td>
18951900
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-time-ms-[max|p50|p95|p99|p999]</td>

0 commit comments

Comments
 (0)