From 8640e72a5b718d28a71fe558d3846fa3bdd76995 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 01:38:30 +0900 Subject: [PATCH 01/12] test: add test for AckMode Record and Record Filtered Signed-off-by: Chaedong Im --- .../kafka/listener/ContainerProperties.java | 10 + .../listener/AckModeRecordFilteredTest.java | 319 ++++++++++++++++++ .../AckModeRecordWithFilteringTest.java | 312 +++++++++++++++++ 3 files changed, 641 insertions(+) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 8135517f66..ba6c290ec7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -69,6 +69,16 @@ public enum AckMode { */ RECORD, + /** + * Commit the offset after each record is processed by the listener, but only + * for records that are not filtered out by a {@code RecordFilterStrategy}. + * When a record is filtered (not passed to the listener), no offset commit + * occurs for that record. This mode provides better performance when using + * filtering strategies that filter out a significant portion of records. + * @since 4.0 + */ + RECORD_FILTERED, + /** * Commit the offsets of all records returned by the previous poll after they all * have been processed by the listener. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java new file mode 100644 index 0000000000..f3b90ab394 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -0,0 +1,319 @@ +/* + * Copyright 2024-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.*; + +/** + * Tests for the RECORD_FILTERED acknowledge mode. + * + * Related to GitHub issue #3562 + * + * @author Chaedong Im + * @see AckModeRecordWithFilteringTest + */ +public class AckModeRecordFilteredTest { + + @Test + public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { + // Given: A container with RECORD_FILTERED ack mode + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 2 == 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(2); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), // Will be filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), // Will be processed -> COMMIT offset 2 + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2"), // Will be filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 3, "key3", "value3") // Will be processed -> COMMIT offset 4 + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When: Start the container and process records + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: Verify that only odd offset records were processed + assertThat(processedValues).containsExactly("value1", "value3"); + + verify(consumer, times(2)).commitSync(any(), any(Duration.class)); + } + + @Test + public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { + // Given: All records are filtered + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> true; + + List processedValues = new ArrayList<>(); + MessageListener listener = record -> processedValues.add(record.value()); + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2") // Filtered -> NO COMMIT + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When + container.start(); + Thread.sleep(1000); + container.stop(); + + assertThat(processedValues).isEmpty(); + verify(consumer, never()).commitSync(any(), any(Duration.class)); + } + + @Test + public void testRecordFilteredModeWithMixedPartitions() throws InterruptedException { + // Given: Mixed partitions with different filtering scenarios + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> + record.value().contains("skip"); + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(3); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp0 = new TopicPartition("test-topic", 0); + TopicPartition tp1 = new TopicPartition("test-topic", 1); + + List> records = List.of( + // Partition 0 + new ConsumerRecord<>("test-topic", 0, 0, "key0", "process1"), // Processed -> COMMIT offset 1 + new ConsumerRecord<>("test-topic", 0, 1, "key1", "skip1"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 2, "key2", "process2"), // Processed -> COMMIT offset 3 + // Partition 1 + new ConsumerRecord<>("test-topic", 1, 0, "key3", "skip2"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 1, 1, "key4", "process3"), // Processed -> COMMIT offset 2 + new ConsumerRecord<>("test-topic", 1, 2, "key5", "skip3") // Filtered -> NO COMMIT + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp0, records.subList(0, 3)); + recordsMap.put(tp1, records.subList(3, 6)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + assertThat(processedValues).containsExactly("process1", "process2", "process3"); + verify(consumer, times(3)).commitSync(any(), any(Duration.class)); + } + + @Test + public void testRecordFilteredModeEfficiencyGains() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 10 != 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(1); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + records.add(new ConsumerRecord<>("test-topic", 0, i, "key" + i, "value" + i)); + } + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + assertThat(processedValues).hasSize(1); + assertThat(processedValues.get(0)).isEqualTo("value0"); + verify(consumer, times(1)).commitSync(any(), any(Duration.class)); + } + + @Test + public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> false; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(3); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2") + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: All records processed + assertThat(processedValues).containsExactly("value0", "value1", "value2"); + verify(consumer, times(3)).commitSync(any(), any(Duration.class)); + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java new file mode 100644 index 0000000000..2d1d6e6160 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java @@ -0,0 +1,312 @@ +/* + * Copyright 2024-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.*; + +/** + * Tests to verify the behavior of RECORD acknowledge mode when used with filtering strategies. + * + * Related to GitHub issue #3562 + * + * @author Chaedong Im + * @see AckModeRecordFilteredTest + */ +public class AckModeRecordWithFilteringTest { + + @Test + public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException { + // Given: A container with RECORD ack mode and a filter that filters out even offsets + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 2 == 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(2); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2"), + new ConsumerRecord<>("test-topic", 0, 3, "key3", "value3") + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When: Start the container and process records + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: Verify that only odd offset records were processed + assertThat(processedValues).containsExactly("value1", "value3"); + + verify(consumer, times(4)).commitSync(any(), any(Duration.class)); + } + + @Test + public void testAllRecordsFilteredStillCommits() throws InterruptedException { + // Given: A container where all records are filtered + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> true; + + List processedValues = new ArrayList<>(); + MessageListener listener = record -> processedValues.add(record.value()); + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1") + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When: Start the container + container.start(); + Thread.sleep(1000); + container.stop(); + + // Then: Verify no records were processed + assertThat(processedValues).isEmpty(); + verify(consumer, times(2)).commitSync(any(), any(Duration.class)); + } + + @Test + public void testMixedPartitionsWithFiltering() throws InterruptedException { + // Given: Multiple partitions with different records + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = + record -> record.value().contains("skip"); + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(3); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp0 = new TopicPartition("test-topic", 0); + TopicPartition tp1 = new TopicPartition("test-topic", 1); + + List> records = List.of( + // Partition 0 + new ConsumerRecord<>("test-topic", 0, 0, "key0", "process_me"), // Will be processed + new ConsumerRecord<>("test-topic", 0, 1, "key1", "skip_me"), // Will be filtered + // Partition 1 + new ConsumerRecord<>("test-topic", 1, 0, "key2", "process_me"), // Will be processed + new ConsumerRecord<>("test-topic", 1, 1, "key3", "skip_me"), // Will be filtered + new ConsumerRecord<>("test-topic", 1, 2, "key4", "process_me") // Will be processed + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp0, records.subList(0, 2)); + recordsMap.put(tp1, records.subList(2, 5)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When: Start container + container.start(); + + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: Verify correct records were processed + assertThat(processedValues).containsExactly("process_me", "process_me", "process_me"); + verify(consumer, times(5)).commitSync(any(), any(Duration.class)); + } + + @Test + public void testCommitLogging() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + containerProperties.setLogContainerConfig(true); + + RecordFilterStrategy filterStrategy = record -> record.offset() == 0; + + CountDownLatch processedLatch = new CountDownLatch(1); + MessageListener listener = record -> processedLatch.countDown(); + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "filtered"), // Will be filtered + new ConsumerRecord<>("test-topic", 0, 1, "key1", "processed") // Will be processed + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + verify(consumer, times(2)).commitSync(anyMap(), any(Duration.class)); + } + + @Test + public void testAckDiscardedParameterBehavior() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 2 == 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(1); + + AcknowledgingMessageListener listener = (record, ack) -> { + processedValues.add(record.value()); + ack.acknowledge(); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy, true); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "filtered"), // Will be filtered but acked + new ConsumerRecord<>("test-topic", 0, 1, "key1", "processed") // Will be processed and acked + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + assertThat(processedValues).containsExactly("processed"); + } +} From 84aa033b43ece7503cc8c16443daf5f235e9500e Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 01:39:30 +0900 Subject: [PATCH 02/12] feat: implement AckMode RECORD_FILTERED Signed-off-by: Chaedong Im --- .../KafkaMessageListenerContainer.java | 31 ++++++++++--- .../listener/adapter/FilteringAware.java | 43 +++++++++++++++++++ .../FilteringMessageListenerAdapter.java | 33 +++++++++++++- 3 files changed, 99 insertions(+), 8 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f611178907..afe3354f15 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -111,6 +111,7 @@ import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; import org.springframework.kafka.listener.adapter.AsyncRepliesAware; +import org.springframework.kafka.listener.adapter.FilteringAware; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; @@ -172,6 +173,7 @@ * @author Christian Fredriksson * @author Timofey Barabanov * @author Janek Lasocki-Biczysko + * @author Chaedong Im */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -677,6 +679,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean isRecordAck; + private final boolean isRecordFilteredAck; + private final BlockingQueue> acks = new LinkedBlockingQueue<>(); private final BlockingQueue seeks = new LinkedBlockingQueue<>(); @@ -871,6 +875,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.isManualImmediateAck = AckMode.MANUAL_IMMEDIATE.equals(this.ackMode); this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck; this.isRecordAck = this.ackMode.equals(AckMode.RECORD); + this.isRecordFilteredAck = this.ackMode.equals(AckMode.RECORD_FILTERED); boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies; this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap<>() : null; this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap<>() : null; @@ -933,8 +938,8 @@ else if (listener instanceof MessageListener) { this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals(ListenerType.CONSUMER_AWARE); this.commonErrorHandler = determineCommonErrorHandler(); - Assert.state(!this.isBatchListener || !this.isRecordAck, - "Cannot use AckMode.RECORD with a batch listener"); + Assert.state(!this.isBatchListener || (!this.isRecordAck && !this.isRecordFilteredAck), + "Cannot use AckMode.RECORD or AckMode.RECORD_FILTERED with a batch listener"); if (this.containerProperties.getScheduler() != null) { this.taskScheduler = this.containerProperties.getScheduler(); this.taskSchedulerExplicitlySet = true; @@ -1510,7 +1515,7 @@ protected void handleAsyncFailure() { } private void doProcessCommits() { - if (!this.autoCommit && !this.isRecordAck) { + if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) { try { processCommits(); } @@ -2260,7 +2265,7 @@ private List> createRecordList(final ConsumerRecords } getAfterRollbackProcessor().clearThreadState(); } - if (!this.autoCommit && !this.isRecordAck) { + if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) { processCommits(); } } @@ -2710,7 +2715,7 @@ private void listenerInfo(final ConsumerRecord cRecord) { } private void handleNack(final ConsumerRecords records, final ConsumerRecord cRecord) { - if (!this.autoCommit && !this.isRecordAck) { + if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) { processCommits(); } List> list = new ArrayList<>(); @@ -3060,12 +3065,26 @@ public void checkDeser(final ConsumerRecord cRecord, String headerName) { } } + private boolean isRecordFiltered(ConsumerRecord cRecord) { + Object listener = KafkaMessageListenerContainer.this.getContainerProperties().getMessageListener(); + if (listener instanceof FilteringAware) { + @SuppressWarnings("unchecked") + FilteringAware filteringAware = (FilteringAware) listener; + return filteringAware.wasFiltered(cRecord); + } + return false; + } + public void ackCurrent(final ConsumerRecord cRecord) { ackCurrent(cRecord, false); } public void ackCurrent(final ConsumerRecord cRecord, boolean commitRecovered) { - if (this.isRecordAck && this.producer == null) { + if (this.isRecordFilteredAck && isRecordFiltered(cRecord)) { + return; + } + + if ((this.isRecordAck || this.isRecordFilteredAck) && this.producer == null) { Map offsetsToCommit = buildSingleCommits(cRecord); this.commitLogger.log(() -> COMMITTING + offsetsToCommit); commitOffsets(offsetsToCommit); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java new file mode 100644 index 0000000000..524c5d8da5 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * An interface to indicate that a message listener adapter can report + * whether a record was filtered during processing. + * + * @param the key type. + * @param the value type. + * + * @author Chaedong Im + * @since 4.0 + */ +public interface FilteringAware { + + /** + * Check if the most recent record processed was filtered out. + * This method should be called after a record has been processed + * to determine if the record was filtered and should not trigger + * an offset commit in RECORD_FILTERED acknowledge mode. + * @param record the record to check + * @return true if the record was filtered, false if it was processed + */ + boolean wasFiltered(ConsumerRecord record); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java index b17c24d0e9..5e593b56c3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java @@ -16,6 +16,8 @@ package org.springframework.kafka.listener.adapter; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.jspecify.annotations.Nullable; @@ -32,14 +34,30 @@ * @param the value type. * * @author Gary Russell + * @author Chaedong Im * */ public class FilteringMessageListenerAdapter extends AbstractFilteringMessageListener> - implements AcknowledgingConsumerAwareMessageListener { + implements AcknowledgingConsumerAwareMessageListener, FilteringAware { + + private static class FilterResult { + + final ConsumerRecord record; + + final boolean wasFiltered; + + FilterResult(ConsumerRecord record, boolean wasFiltered) { + this.record = record; + this.wasFiltered = wasFiltered; + } + + } private final boolean ackDiscarded; + private final AtomicReference<@Nullable FilterResult> lastResult = new AtomicReference<>(); + /** * Create an instance with the supplied strategy and delegate listener. * @param delegate the delegate. @@ -68,7 +86,12 @@ public FilteringMessageListenerAdapter(MessageListener delegate, public void onMessage(ConsumerRecord consumerRecord, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer) { - if (!filter(consumerRecord)) { + boolean filtered = filter(consumerRecord); + + // Atomically update both the record and its filtered state together + this.lastResult.set(new FilterResult<>(consumerRecord, filtered)); + + if (!filtered) { switch (this.delegateType) { case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer); case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment); @@ -93,6 +116,12 @@ private void ackFilteredIfNecessary(@Nullable Acknowledgment acknowledgment) { } } + @Override + public boolean wasFiltered(ConsumerRecord record) { + FilterResult result = this.lastResult.get(); + return result != null && result.record == record && result.wasFiltered; + } + /* * Since the container uses the delegate's type to determine which method to call, we * must implement them all. From 27d7eb72dcef646124e205d56d5514766e20303d Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 02:01:54 +0900 Subject: [PATCH 03/12] docs: fix year of the Copyright Signed-off-by: Chaedong Im --- .../springframework/kafka/listener/adapter/FilteringAware.java | 2 +- .../kafka/listener/AckModeRecordFilteredTest.java | 2 +- .../kafka/listener/AckModeRecordWithFilteringTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java index 524c5d8da5..74f869e877 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java @@ -1,5 +1,5 @@ /* - * Copyright 2024-present the original author or authors. + * Copyright 2025-present the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java index f3b90ab394..acce9ac807 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024-present the original author or authors. + * Copyright 2025-present the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java index 2d1d6e6160..3e8e723dba 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024-present the original author or authors. + * Copyright 2025-present the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 0458aa1e6bcf844318fbe708424737acccaa73d8 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 02:17:05 +0900 Subject: [PATCH 04/12] refactor: move inner class to the end of the class Signed-off-by: Chaedong Im --- .../FilteringMessageListenerAdapter.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java index 5e593b56c3..78c4f7d2fc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java @@ -41,18 +41,6 @@ public class FilteringMessageListenerAdapter extends AbstractFilteringMessageListener> implements AcknowledgingConsumerAwareMessageListener, FilteringAware { - private static class FilterResult { - - final ConsumerRecord record; - - final boolean wasFiltered; - - FilterResult(ConsumerRecord record, boolean wasFiltered) { - this.record = record; - this.wasFiltered = wasFiltered; - } - - } private final boolean ackDiscarded; @@ -142,4 +130,16 @@ public void onMessage(ConsumerRecord data, @Nullable Consumer consum onMessage(data, null, consumer); } + private static class FilterResult { + + final ConsumerRecord record; + + final boolean wasFiltered; + + FilterResult(ConsumerRecord record, boolean wasFiltered) { + this.record = record; + this.wasFiltered = wasFiltered; + } + + } } From 14cceef83388654b603a8a0abdae838d0f2f398d Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 02:24:19 +0900 Subject: [PATCH 05/12] refactor: remove newline Signed-off-by: Chaedong Im --- .../kafka/listener/adapter/FilteringMessageListenerAdapter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java index 78c4f7d2fc..8e5e8b63a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java @@ -41,7 +41,6 @@ public class FilteringMessageListenerAdapter extends AbstractFilteringMessageListener> implements AcknowledgingConsumerAwareMessageListener, FilteringAware { - private final boolean ackDiscarded; private final AtomicReference<@Nullable FilterResult> lastResult = new AtomicReference<>(); From c552c92882a858f36e00816e49dbab2bfc195e60 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 02:25:03 +0900 Subject: [PATCH 06/12] refactor: add suppress warnings for unchecked exception Signed-off-by: Chaedong Im --- .../kafka/listener/AckModeRecordFilteredTest.java | 5 +++++ .../kafka/listener/AckModeRecordWithFilteringTest.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java index acce9ac807..7947460df1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -47,6 +47,7 @@ */ public class AckModeRecordFilteredTest { + @SuppressWarnings("unchecked") @Test public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { // Given: A container with RECORD_FILTERED ack mode @@ -103,6 +104,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt verify(consumer, times(2)).commitSync(any(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { // Given: All records are filtered @@ -150,6 +152,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc verify(consumer, never()).commitSync(any(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testRecordFilteredModeWithMixedPartitions() throws InterruptedException { // Given: Mixed partitions with different filtering scenarios @@ -212,6 +215,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept verify(consumer, times(3)).commitSync(any(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testRecordFilteredModeEfficiencyGains() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -264,6 +268,7 @@ public void testRecordFilteredModeEfficiencyGains() throws InterruptedException verify(consumer, times(1)).commitSync(any(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java index 3e8e723dba..aa949d0daf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java @@ -48,6 +48,7 @@ */ public class AckModeRecordWithFilteringTest { + @SuppressWarnings("unchecked") @Test public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException { // Given: A container with RECORD ack mode and a filter that filters out even offsets @@ -104,6 +105,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException verify(consumer, times(4)).commitSync(any(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testAllRecordsFilteredStillCommits() throws InterruptedException { // Given: A container where all records are filtered @@ -151,6 +153,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException { verify(consumer, times(2)).commitSync(any(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testMixedPartitionsWithFiltering() throws InterruptedException { // Given: Multiple partitions with different records @@ -214,6 +217,7 @@ record -> record.value().contains("skip"); verify(consumer, times(5)).commitSync(any(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testCommitLogging() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -260,6 +264,7 @@ public void testCommitLogging() throws InterruptedException { verify(consumer, times(2)).commitSync(anyMap(), any(Duration.class)); } + @SuppressWarnings("unchecked") @Test public void testAckDiscardedParameterBehavior() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); From df865122918fd50ab65470f457fe17da4a3bbd89 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 02:28:39 +0900 Subject: [PATCH 07/12] refactor: remove unused import Signed-off-by: Chaedong Im --- .../kafka/listener/AckModeRecordFilteredTest.java | 2 -- .../kafka/listener/AckModeRecordWithFilteringTest.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java index 7947460df1..f44fd6d51d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -19,10 +19,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java index aa949d0daf..3fdf7c8047 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java @@ -19,10 +19,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; From 47a728f92837d549bf9e90d96854bdeeab09a609 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 02:30:12 +0900 Subject: [PATCH 08/12] refactor: replace deprecated code Signed-off-by: Chaedong Im --- .../listener/AckModeRecordFilteredTest.java | 39 +++++++++++-------- .../AckModeRecordWithFilteringTest.java | 38 ++++++++++-------- 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java index f44fd6d51d..5177e55567 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -16,24 +16,31 @@ package org.springframework.kafka.listener; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; + import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests for the RECORD_FILTERED acknowledge mode. @@ -45,7 +52,7 @@ */ public class AckModeRecordFilteredTest { - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { // Given: A container with RECORD_FILTERED ack mode @@ -88,7 +95,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When: Start the container and process records container.start(); @@ -102,7 +109,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt verify(consumer, times(2)).commitSync(any(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { // Given: All records are filtered @@ -139,7 +146,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When container.start(); @@ -150,7 +157,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc verify(consumer, never()).commitSync(any(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testRecordFilteredModeWithMixedPartitions() throws InterruptedException { // Given: Mixed partitions with different filtering scenarios @@ -201,7 +208,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When container.start(); @@ -213,7 +220,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept verify(consumer, times(3)).commitSync(any(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testRecordFilteredModeEfficiencyGains() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -253,7 +260,7 @@ public void testRecordFilteredModeEfficiencyGains() throws InterruptedException given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When container.start(); @@ -266,7 +273,7 @@ public void testRecordFilteredModeEfficiencyGains() throws InterruptedException verify(consumer, times(1)).commitSync(any(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -307,7 +314,7 @@ public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws Interrup given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When container.start(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java index 3fdf7c8047..d073eea27a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java @@ -16,25 +16,31 @@ package org.springframework.kafka.listener; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; + import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests to verify the behavior of RECORD acknowledge mode when used with filtering strategies. @@ -46,7 +52,7 @@ */ public class AckModeRecordWithFilteringTest { - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException { // Given: A container with RECORD ack mode and a filter that filters out even offsets @@ -89,7 +95,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When: Start the container and process records container.start(); @@ -103,7 +109,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException verify(consumer, times(4)).commitSync(any(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testAllRecordsFilteredStillCommits() throws InterruptedException { // Given: A container where all records are filtered @@ -139,7 +145,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException { given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When: Start the container container.start(); @@ -151,7 +157,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException { verify(consumer, times(2)).commitSync(any(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testMixedPartitionsWithFiltering() throws InterruptedException { // Given: Multiple partitions with different records @@ -201,7 +207,7 @@ record -> record.value().contains("skip"); given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When: Start container container.start(); @@ -215,7 +221,7 @@ record -> record.value().contains("skip"); verify(consumer, times(5)).commitSync(any(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testCommitLogging() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -251,7 +257,7 @@ public void testCommitLogging() throws InterruptedException { given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); // When container.start(); @@ -262,7 +268,7 @@ public void testCommitLogging() throws InterruptedException { verify(consumer, times(2)).commitSync(anyMap(), any(Duration.class)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testAckDiscardedParameterBehavior() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -303,7 +309,7 @@ public void testAckDiscardedParameterBehavior() throws InterruptedException { given(consumer.poll(any(Duration.class))) .willReturn(consumerRecords) - .willReturn(new ConsumerRecords<>(Collections.emptyMap())); + .willReturn(ConsumerRecords.empty()); container.start(); assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); From e32efea859e8df3acc77f772d94f33f3179a11b9 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 08:44:40 +0900 Subject: [PATCH 09/12] refactor: remove public modifier in the test code Signed-off-by: Chaedong Im --- .../kafka/listener/AckModeRecordFilteredTest.java | 12 ++++++------ .../listener/AckModeRecordWithFilteringTest.java | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java index 5177e55567..c54a1b4d2a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -50,11 +50,11 @@ * @author Chaedong Im * @see AckModeRecordWithFilteringTest */ -public class AckModeRecordFilteredTest { +class AckModeRecordFilteredTest { @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { + void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { // Given: A container with RECORD_FILTERED ack mode ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -111,7 +111,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { + void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { // Given: All records are filtered ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -159,7 +159,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testRecordFilteredModeWithMixedPartitions() throws InterruptedException { + void testRecordFilteredModeWithMixedPartitions() throws InterruptedException { // Given: Mixed partitions with different filtering scenarios ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -222,7 +222,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testRecordFilteredModeEfficiencyGains() throws InterruptedException { + void testRecordFilteredModeEfficiencyGains() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); @@ -275,7 +275,7 @@ public void testRecordFilteredModeEfficiencyGains() throws InterruptedException @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException { + void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java index d073eea27a..a47698f408 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java @@ -50,11 +50,11 @@ * @author Chaedong Im * @see AckModeRecordFilteredTest */ -public class AckModeRecordWithFilteringTest { +class AckModeRecordWithFilteringTest { @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException { + void testCurrentRecordModeCommitsAllRecords() throws InterruptedException { // Given: A container with RECORD ack mode and a filter that filters out even offsets ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -111,7 +111,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testAllRecordsFilteredStillCommits() throws InterruptedException { + void testAllRecordsFilteredStillCommits() throws InterruptedException { // Given: A container where all records are filtered ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -159,7 +159,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException { @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testMixedPartitionsWithFiltering() throws InterruptedException { + void testMixedPartitionsWithFiltering() throws InterruptedException { // Given: Multiple partitions with different records ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -223,7 +223,7 @@ record -> record.value().contains("skip"); @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testCommitLogging() throws InterruptedException { + void testCommitLogging() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); @@ -270,7 +270,7 @@ public void testCommitLogging() throws InterruptedException { @SuppressWarnings({"unchecked", "deprecation"}) @Test - public void testAckDiscardedParameterBehavior() throws InterruptedException { + void testAckDiscardedParameterBehavior() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); From 507ddbca6629f637e41f0d309014f68dd4d83d12 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 08:45:10 +0900 Subject: [PATCH 10/12] docs: add @author Signed-off-by: Chaedong Im --- .../org/springframework/kafka/listener/ContainerProperties.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index ba6c290ec7..6900d3fac4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -56,6 +56,7 @@ * @author Kyuhyeok Park * @author Wang Zhiyang * @author Choi Wang Gyu + * @author Chaedong Im */ public class ContainerProperties extends ConsumerProperties { From 4e8b8dda80b9255ff77067e6fe903de4caaf12a9 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 09:33:17 +0900 Subject: [PATCH 11/12] fix: use ThreadLocal to isolate filter result per thread in FilteringMessageListenerAdapter Signed-off-by: Chaedong Im --- .../FilteringMessageListenerAdapter.java | 17 +++-- .../listener/AckModeRecordFilteredTest.java | 74 +++++++++++++++++++ 2 files changed, 84 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java index 8e5e8b63a5..e9e86228a1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java @@ -16,8 +16,6 @@ package org.springframework.kafka.listener.adapter; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.jspecify.annotations.Nullable; @@ -43,7 +41,8 @@ public class FilteringMessageListenerAdapter private final boolean ackDiscarded; - private final AtomicReference<@Nullable FilterResult> lastResult = new AtomicReference<>(); + @SuppressWarnings("rawtypes") + private static final ThreadLocal LAST = new ThreadLocal<>(); /** * Create an instance with the supplied strategy and delegate listener. @@ -74,9 +73,8 @@ public void onMessage(ConsumerRecord consumerRecord, @Nullable Acknowledgm @Nullable Consumer consumer) { boolean filtered = filter(consumerRecord); + LAST.set(new FilterResult<>(consumerRecord, filtered)); - // Atomically update both the record and its filtered state together - this.lastResult.set(new FilterResult<>(consumerRecord, filtered)); if (!filtered) { switch (this.delegateType) { @@ -105,8 +103,13 @@ private void ackFilteredIfNecessary(@Nullable Acknowledgment acknowledgment) { @Override public boolean wasFiltered(ConsumerRecord record) { - FilterResult result = this.lastResult.get(); - return result != null && result.record == record && result.wasFiltered; + @SuppressWarnings("unchecked") + FilterResult result = (FilterResult) LAST.get(); + return result != null + && result.record.topic().equals(record.topic()) + && result.record.partition() == record.partition() + && result.record.offset() == record.offset() + && result.wasFiltered; } /* diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java index c54a1b4d2a..188d954e79 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -33,6 +33,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; +import org.springframework.kafka.support.Acknowledgment; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -326,4 +327,77 @@ void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedExce assertThat(processedValues).containsExactly("value0", "value1", "value2"); verify(consumer, times(3)).commitSync(any(), any(Duration.class)); } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void recordFilteredModeShouldBeThreadIsolated() throws Exception { + ConsumerFactory cf = mock(ConsumerFactory.class); + Consumer c0 = mock(Consumer.class); + Consumer c1 = mock(Consumer.class); + given(cf.createConsumer(any(), any(), any(), any())).willReturn(c0, c1); + + ContainerProperties props = new ContainerProperties("iso-topic"); + props.setGroupId("iso-group"); + props.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + + CountDownLatch aHasSetState = new CountDownLatch(1); + CountDownLatch bHasProcessed = new CountDownLatch(1); + RecordFilterStrategy filter = rec -> rec.offset() == 0; + + FilteringMessageListenerAdapter adapter = + new FilteringMessageListenerAdapter<>( + (MessageListener) r -> { + }, + filter + ) { + @Override + public void onMessage(ConsumerRecord rec, + Acknowledgment ack, + Consumer consumer) { + super.onMessage(rec, ack, consumer); + if (rec.offset() == 0) { + aHasSetState.countDown(); + try { + bHasProcessed.await(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else if (rec.offset() == 1) { + try { + aHasSetState.await(200, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + bHasProcessed.countDown(); + } + } + }; + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, props); + container.setConcurrency(2); + container.setupMessageListener(adapter); + + TopicPartition tp0 = new TopicPartition("iso-topic", 0); + TopicPartition tp1 = new TopicPartition("iso-topic", 1); + + ConsumerRecords poll0 = new ConsumerRecords<>(Map.of( + tp0, List.of(new ConsumerRecord<>("iso-topic", 0, 0, "k0", "v0")) + )); + ConsumerRecords poll1 = new ConsumerRecords<>(Map.of( + tp1, List.of(new ConsumerRecord<>("iso-topic", 1, 1, "k1", "v1")) + )); + + given(c0.poll(any(Duration.class))).willReturn(poll0).willReturn(ConsumerRecords.empty()); + given(c1.poll(any(Duration.class))).willReturn(poll1).willReturn(ConsumerRecords.empty()); + + // when: containers process records concurrently (thread-local isolation should apply) + container.start(); + Thread.sleep(400); + container.stop(); + + // then: consumer c1 commits only its record, while c0 (filtered) does not + verify(c1, times(1)).commitSync(any(), any(Duration.class)); + verify(c0, never()).commitSync(any(), any(Duration.class)); + } } From 0beec0c956f78c33a2b16836891230a2a8dbccc8 Mon Sep 17 00:00:00 2001 From: Chaedong Im Date: Fri, 17 Oct 2025 10:00:43 +0900 Subject: [PATCH 12/12] style: format code Signed-off-by: Chaedong Im --- .../listener/AckModeRecordFilteredTest.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java index 188d954e79..f8998d4e7c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -345,27 +345,25 @@ void recordFilteredModeShouldBeThreadIsolated() throws Exception { RecordFilterStrategy filter = rec -> rec.offset() == 0; FilteringMessageListenerAdapter adapter = - new FilteringMessageListenerAdapter<>( - (MessageListener) r -> { - }, - filter - ) { + new FilteringMessageListenerAdapter<>((MessageListener) r -> { + }, filter) { @Override - public void onMessage(ConsumerRecord rec, - Acknowledgment ack, - Consumer consumer) { + public void onMessage(ConsumerRecord rec, Acknowledgment ack, Consumer consumer) { super.onMessage(rec, ack, consumer); if (rec.offset() == 0) { aHasSetState.countDown(); try { bHasProcessed.await(500, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); } - } else if (rec.offset() == 1) { + } + else if (rec.offset() == 1) { try { aHasSetState.await(200, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); } bHasProcessed.countDown();