diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 965f8074f8014..552e7e72effd8 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -833,7 +833,7 @@ private void flushCurrentBatch() { */ private void maybeFlushCurrentBatch(long currentTimeMs) { if (currentBatch != null) { - if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) { + if (currentBatch.builder.isTransactional() || (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) { flushCurrentBatch(); } } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index dfbbdf048bc20..de1aa21f3f116 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -4779,6 +4779,7 @@ public void testCompleteTransactionEventCompletesOnlyOnce() throws Exception { assertTrue(write1.isCompletedExceptionally()); verify(runtimeMetrics, times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L); } + @Test public void testCoordinatorExecutor() { Duration writeTimeout = Duration.ofMillis(1000); @@ -4866,6 +4867,81 @@ public void testCoordinatorExecutor() { assertTrue(write1.isDone()); } + @Test + public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception { + // Provides the runtime clock; we will advance it. + MockTimer clockTimer = new MockTimer(); + // Used for scheduling timer tasks; we won't advance it to avoid a timer-triggered batch flush. + MockTimer schedulerTimer = new MockTimer(); + + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(clockTimer.time()) + .withTimer(schedulerTimer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(ACTIVE, ctx.state); + assertNull(ctx.currentBatch); + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record1"), "response1") + ); + assertFalse(write1.isDone()); + assertNotNull(ctx.currentBatch); + assertEquals(0, writer.entries(TP).size()); + + // Verify that the linger timeout task is created; there will also be a default write timeout task. + assertEquals(2, schedulerTimer.size()); + + // Advance past the linger time. + clockTimer.advanceClock(11); + + // At this point, there are still two scheduled tasks; the linger task has not fired + // because we did not advance the schedulerTimer. + assertEquals(2, schedulerTimer.size()); + + // Write #2. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record2"), "response2") + ); + + // The batch should have been flushed. + assertEquals(1, writer.entries(TP).size()); + + // Because flushing the batch cancels the linger task, there should now be two write timeout tasks. + assertEquals(2, schedulerTimer.size()); + + // Verify batch contains both two records + MemoryRecords batch = writer.entries(TP).get(0); + RecordBatch recordBatch = batch.firstBatch(); + assertEquals(2, recordBatch.countOrNull()); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + // Now that all scheduled tasks have been cancelled, the scheduler queue should be empty. + assertEquals(0, schedulerTimer.size()); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp