Skip to content

Conversation

@majialoong
Copy link
Contributor

@majialoong majialoong commented Oct 21, 2025

This PR fixed the time comparison logic in
CoordinatorRuntime#maybeFlushCurrentBatch to ensure that the batch is
flushed when the elapsed time since appendTimeMs exceeds the
appendLingerMs parameter.

This issue is also mentioned here.

Reviewers: David Jacot [email protected], Chia-Ping Tsai
[email protected]

private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
if (currentBatch.builder.isTransactional() || (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would prefer to remove this condition, as a time-based task already exists. This approach would also mean we are not changing the current behavior, since the condition was previously a no-op

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, time-based scheduled tasks will ensure that the current batch is flushed to the log once it has passed the append linger time.

Perhaps performing a conditional check here is intended to detect as promptly as possible when the current batch has passed the append linger time, so that flushing to the log can be carried out more promptly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a unit test here? Since the existing tests don't update the mock time, this bug was not disclosed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I think that we can keep the condition here. I agree with adding a unit test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you all for your feedback and suggestions. I will add a unit test to cover this scenario.

@majialoong
Copy link
Contributor Author

Hello @AndrewJSchofield and @dajac , if you have time, could you please take a look at this issue? Thank you very much, and I look forward to your suggestions.

@github-actions github-actions bot removed the triage PRs from the community label Oct 22, 2025
@AndrewJSchofield
Copy link
Member

Hello @AndrewJSchofield and @dajac , if you have time, could you please take a look at this issue? Thank you very much, and I look forward to your suggestions.

This is much more in @dajac's area than mine.

@AndrewJSchofield AndrewJSchofield removed their request for review October 23, 2025 10:09
@majialoong
Copy link
Contributor Author

Hello @chia7712 and @dajac , I have added a unit test to cover this scenario. When you have time, could you please review this patch again? Thanks !

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@majialoong thanks for this patch. overall LGTM.

assertEquals(2, schedulerTimer.size());

// Advance past the linger time.
clockTimer.advanceClock(11);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this to line#4921? It makes more sense there, since the goal of advancing the clockTimer is to ensure flushCurrentBatch is executed during writing #2, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the purpose of advancing the clockTimer is to ensure that flushCurrentBatch is executed when writing #2.

However, I think that after advancing the clockTimer, we should check the number of tasks in the schedulerTimer to ensure that the linger task still exists (has not been executed or canceled) before writing #2.

This ensures that flushCurrentBatch is executed when writing #2, rather than being triggered by the linger timer task.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712
Copy link
Member

I will merge this tomorrow if @dajac and @AndrewJSchofield have no objections. I will also backport it to 4.1 and 4.0.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, @majialoong! I left a few nits. Otherwise, LGTM.

MockPartitionWriter writer = new MockPartitionWriter();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We use four spaces to indent code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I’ve addressed this.

Comment on lines 4905 to 4906
"write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("record1"), "response1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Comment on lines 4924 to 4925
"write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("record2"), "response2")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks

@chia7712 chia7712 merged commit 82a1935 into apache:trunk Oct 24, 2025
24 checks passed
chia7712 pushed a commit that referenced this pull request Oct 24, 2025
…maybeFlushCurrentBatch (#20739)

This PR fixed the time comparison logic in
`CoordinatorRuntime#maybeFlushCurrentBatch` to ensure that the batch is
flushed when the elapsed time since `appendTimeMs` exceeds the
`appendLingerMs` parameter.

This issue is also mentioned [here](
https://github.com/apache/kafka/pull/20653/files#r2442452104).

Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
 <[email protected]>
@chia7712
Copy link
Member

@majialoong could you please file a PR for 4.0? I got following error during backport

> Task :coordinator-common:checkstyleTest
[ant:checkstyle] [ERROR] /home/chia7712/project/kafka/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:17:1: NCSS for this file is 2,026 (max allowed is 2,000). [JavaNCSS]

> Task :group-coordinator:compileTestJava
Note: /home/chia7712/project/kafka/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

> Task :coordinator-common:checkstyleTest FAILED
[ant:checkstyle] [ERROR] /home/chia7712/project/kafka/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:110:1: Class Fan-Out Complexity is 53 (max allowed is 52). [ClassFanOutComplexity]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants