Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* @author Kyuhyeok Park
* @author Wang Zhiyang
* @author Choi Wang Gyu
* @author Chaedong Im
*/
public class ContainerProperties extends ConsumerProperties {

Expand All @@ -69,6 +70,16 @@ public enum AckMode {
*/
RECORD,

/**
Copy link
Contributor

@sobychacko sobychacko Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add author tag to the class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed this class. I added author tag. Thanks 😊

* Commit the offset after each record is processed by the listener, but only
* for records that are not filtered out by a {@code RecordFilterStrategy}.
* When a record is filtered (not passed to the listener), no offset commit
* occurs for that record. This mode provides better performance when using
* filtering strategies that filter out a significant portion of records.
* @since 4.0
*/
RECORD_FILTERED,

/**
* Commit the offsets of all records returned by the previous poll after they all
* have been processed by the listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.FilteringAware;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
Expand Down Expand Up @@ -172,6 +173,7 @@
* @author Christian Fredriksson
* @author Timofey Barabanov
* @author Janek Lasocki-Biczysko
* @author Chaedong Im
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -677,6 +679,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean isRecordAck;

private final boolean isRecordFilteredAck;

private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue<>();

private final BlockingQueue<TopicPartitionOffset> seeks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -871,6 +875,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isManualImmediateAck = AckMode.MANUAL_IMMEDIATE.equals(this.ackMode);
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
this.isRecordAck = this.ackMode.equals(AckMode.RECORD);
this.isRecordFilteredAck = this.ackMode.equals(AckMode.RECORD_FILTERED);
boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies;
this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap<>() : null;
this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap<>() : null;
Expand Down Expand Up @@ -933,8 +938,8 @@ else if (listener instanceof MessageListener) {
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
this.commonErrorHandler = determineCommonErrorHandler();
Assert.state(!this.isBatchListener || !this.isRecordAck,
"Cannot use AckMode.RECORD with a batch listener");
Assert.state(!this.isBatchListener || (!this.isRecordAck && !this.isRecordFilteredAck),
"Cannot use AckMode.RECORD or AckMode.RECORD_FILTERED with a batch listener");
if (this.containerProperties.getScheduler() != null) {
this.taskScheduler = this.containerProperties.getScheduler();
this.taskSchedulerExplicitlySet = true;
Expand Down Expand Up @@ -1510,7 +1515,7 @@ protected void handleAsyncFailure() {
}

private void doProcessCommits() {
if (!this.autoCommit && !this.isRecordAck) {
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
try {
processCommits();
}
Expand Down Expand Up @@ -2260,7 +2265,7 @@ private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V>
}
getAfterRollbackProcessor().clearThreadState();
}
if (!this.autoCommit && !this.isRecordAck) {
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
processCommits();
}
}
Expand Down Expand Up @@ -2710,7 +2715,7 @@ private void listenerInfo(final ConsumerRecord<K, V> cRecord) {
}

private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> cRecord) {
if (!this.autoCommit && !this.isRecordAck) {
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
processCommits();
}
List<ConsumerRecord<?, ?>> list = new ArrayList<>();
Expand Down Expand Up @@ -3060,12 +3065,26 @@ public void checkDeser(final ConsumerRecord<K, V> cRecord, String headerName) {
}
}

private boolean isRecordFiltered(ConsumerRecord<K, V> cRecord) {
Object listener = KafkaMessageListenerContainer.this.getContainerProperties().getMessageListener();
if (listener instanceof FilteringAware<?, ?>) {
@SuppressWarnings("unchecked")
FilteringAware<K, V> filteringAware = (FilteringAware<K, V>) listener;
return filteringAware.wasFiltered(cRecord);
}
return false;
}

public void ackCurrent(final ConsumerRecord<K, V> cRecord) {
ackCurrent(cRecord, false);
}

public void ackCurrent(final ConsumerRecord<K, V> cRecord, boolean commitRecovered) {
if (this.isRecordAck && this.producer == null) {
if (this.isRecordFilteredAck && isRecordFiltered(cRecord)) {
return;
}

if ((this.isRecordAck || this.isRecordFilteredAck) && this.producer == null) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = buildSingleCommits(cRecord);
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
commitOffsets(offsetsToCommit);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener.adapter;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* An interface to indicate that a message listener adapter can report
* whether a record was filtered during processing.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Chaedong Im
* @since 4.0
*/
public interface FilteringAware<K, V> {

/**
* Check if the most recent record processed was filtered out.
* This method should be called after a record has been processed
* to determine if the record was filtered and should not trigger
* an offset commit in RECORD_FILTERED acknowledge mode.
* @param record the record to check
* @return true if the record was filtered, false if it was processed
*/
boolean wasFiltered(ConsumerRecord<K, V> record);

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Chaedong Im
*
*/
public class FilteringMessageListenerAdapter<K, V>
extends AbstractFilteringMessageListener<K, V, MessageListener<K, V>>
implements AcknowledgingConsumerAwareMessageListener<K, V> {
implements AcknowledgingConsumerAwareMessageListener<K, V>, FilteringAware<K, V> {

private final boolean ackDiscarded;

@SuppressWarnings("rawtypes")
private static final ThreadLocal<FilterResult> LAST = new ThreadLocal<>();

/**
* Create an instance with the supplied strategy and delegate listener.
* @param delegate the delegate.
Expand Down Expand Up @@ -68,7 +72,11 @@ public FilteringMessageListenerAdapter(MessageListener<K, V> delegate,
public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {

if (!filter(consumerRecord)) {
boolean filtered = filter(consumerRecord);
LAST.set(new FilterResult<>(consumerRecord, filtered));


if (!filtered) {
switch (this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment);
Expand All @@ -93,6 +101,17 @@ private void ackFilteredIfNecessary(@Nullable Acknowledgment acknowledgment) {
}
}

@Override
public boolean wasFiltered(ConsumerRecord<K, V> record) {
@SuppressWarnings("unchecked")
FilterResult<K, V> result = (FilterResult<K, V>) LAST.get();
return result != null
&& result.record.topic().equals(record.topic())
&& result.record.partition() == record.partition()
&& result.record.offset() == record.offset()
&& result.wasFiltered;
}

/*
* Since the container uses the delegate's type to determine which method to call, we
* must implement them all.
Expand All @@ -113,4 +132,16 @@ public void onMessage(ConsumerRecord<K, V> data, @Nullable Consumer<?, ?> consum
onMessage(data, null, consumer);
}

private static class FilterResult<K, V> {

final ConsumerRecord<K, V> record;

final boolean wasFiltered;

FilterResult(ConsumerRecord<K, V> record, boolean wasFiltered) {
this.record = record;
this.wasFiltered = wasFiltered;
}

}
}
Loading