-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-3562: Add AckMode.RECORD_FILTERED #4108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
dc6bc25 to
c0bb203
Compare
Signed-off-by: Chaedong Im <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to use public modifier on test classes or methods any longer.
| * @author Chaedong Im | ||
| * @see AckModeRecordWithFilteringTest | ||
| */ | ||
| public class AckModeRecordFilteredTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can drop public modifier from tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. Thanks 😊
|
|
||
| @SuppressWarnings({"unchecked", "deprecation"}) | ||
| @Test | ||
| public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No public modifier needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. Thanks 😊
|
|
||
| @SuppressWarnings({"unchecked", "deprecation"}) | ||
| @Test | ||
| public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no public modifier needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. Thanks 😊
| * @author Chaedong Im | ||
| * @see AckModeRecordFilteredTest | ||
| */ | ||
| public class AckModeRecordWithFilteringTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No public modifier needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. Thanks 😊
| */ | ||
| RECORD, | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to add author tag to the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed this class. I added author tag. Thanks 😊
Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
…MessageListenerAdapter Signed-off-by: Chaedong Im <[email protected]>
Signed-off-by: Chaedong Im <[email protected]>
Issue: #3562
Summary
Add a new AckMode
AckMode.RECORD_FILTERED.It will commits offsets only for records that are not filtered by
RecordFilterStrategy.Problem
When RecordFilterStrategy is used with
AckMode.RECORD, the container commits the offset for every record, including those filtered out (never delivered to the listener).This causes unnecessary synchronous commits and can advance offsets for records that were effectively skipped.
Implementation
ContainerProperties.AckMode.RECORD_FILTEREDKafkaMessageListenerContainer: per-record commit path skips filtered recordsFilteringAware+FilteringMessageListenerAdapterto expose filter resultsTests
I wasn’t fully confident about my code, so I hesitated to open this PR for a few weeks. 😅
Please feel free to review it — thank you!