Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ private void flushCurrentBatch() {
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
if (currentBatch.builder.isTransactional() || (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would prefer to remove this condition, as a time-based task already exists. This approach would also mean we are not changing the current behavior, since the condition was previously a no-op

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, time-based scheduled tasks will ensure that the current batch is flushed to the log once it has passed the append linger time.

Perhaps performing a conditional check here is intended to detect as promptly as possible when the current batch has passed the append linger time, so that flushing to the log can be carried out more promptly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a unit test here? Since the existing tests don't update the mock time, this bug was not disclosed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I think that we can keep the condition here. I agree with adding a unit test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you all for your feedback and suggestions. I will add a unit test to cover this scenario.

flushCurrentBatch();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4779,6 +4779,7 @@ public void testCompleteTransactionEventCompletesOnlyOnce() throws Exception {
assertTrue(write1.isCompletedExceptionally());
verify(runtimeMetrics, times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
}

@Test
public void testCoordinatorExecutor() {
Duration writeTimeout = Duration.ofMillis(1000);
Expand Down Expand Up @@ -4866,6 +4867,81 @@ public void testCoordinatorExecutor() {
assertTrue(write1.isDone());
}

@Test
public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception {
// Provides the runtime clock; we will advance it.
MockTimer clockTimer = new MockTimer();
// Used for scheduling timer tasks; we won't advance it to avoid a timer-triggered batch flush.
MockTimer schedulerTimer = new MockTimer();

MockPartitionWriter writer = new MockPartitionWriter();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(clockTimer.time())
.withTimer(schedulerTimer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(ACTIVE, ctx.state);
assertNull(ctx.currentBatch);

// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("record1"), "response1")
);
assertFalse(write1.isDone());
assertNotNull(ctx.currentBatch);
assertEquals(0, writer.entries(TP).size());

// Verify that the linger timeout task is created; there will also be a default write timeout task.
assertEquals(2, schedulerTimer.size());

// Advance past the linger time.
clockTimer.advanceClock(11);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this to line#4921? It makes more sense there, since the goal of advancing the clockTimer is to ensure flushCurrentBatch is executed during writing #2, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the purpose of advancing the clockTimer is to ensure that flushCurrentBatch is executed when writing #2.

However, I think that after advancing the clockTimer, we should check the number of tasks in the schedulerTimer to ensure that the linger task still exists (has not been executed or canceled) before writing #2.

This ensures that flushCurrentBatch is executed when writing #2, rather than being triggered by the linger timer task.


// At this point, there are still two scheduled tasks; the linger task has not fired
// because we did not advance the schedulerTimer.
assertEquals(2, schedulerTimer.size());

// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("record2"), "response2")
);

// The batch should have been flushed.
assertEquals(1, writer.entries(TP).size());

// Because flushing the batch cancels the linger task, there should now be two write timeout tasks.
assertEquals(2, schedulerTimer.size());

// Verify batch contains both two records
MemoryRecords batch = writer.entries(TP).get(0);
RecordBatch recordBatch = batch.firstBatch();
assertEquals(2, recordBatch.countOrNull());

// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
// Now that all scheduled tasks have been cancelled, the scheduler queue should be empty.
assertEquals(0, schedulerTimer.size());
}

private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
Expand Down