1616
1717package org .springframework .kafka .listener ;
1818
19+ import java .time .Duration ;
20+ import java .util .ArrayList ;
21+ import java .util .HashMap ;
22+ import java .util .List ;
23+ import java .util .Map ;
24+ import java .util .concurrent .CountDownLatch ;
25+ import java .util .concurrent .TimeUnit ;
26+
1927import org .apache .kafka .clients .consumer .Consumer ;
2028import org .apache .kafka .clients .consumer .ConsumerRecord ;
2129import org .apache .kafka .clients .consumer .ConsumerRecords ;
2533import org .springframework .kafka .listener .adapter .FilteringMessageListenerAdapter ;
2634import org .springframework .kafka .listener .adapter .RecordFilterStrategy ;
2735
28- import java .time .Duration ;
29- import java .util .*;
30- import java .util .concurrent .CountDownLatch ;
31- import java .util .concurrent .TimeUnit ;
32-
3336import static org .assertj .core .api .Assertions .assertThat ;
3437import static org .mockito .ArgumentMatchers .any ;
3538import static org .mockito .BDDMockito .given ;
4548 */
4649public class AckModeRecordFilteredTest {
4750
48- @ SuppressWarnings ("unchecked" )
51+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
4952 @ Test
5053 public void testRecordFilteredModeOnlyCommitsProcessedRecords () throws InterruptedException {
5154 // Given: A container with RECORD_FILTERED ack mode
@@ -88,7 +91,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt
8891
8992 given (consumer .poll (any (Duration .class )))
9093 .willReturn (consumerRecords )
91- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
94+ .willReturn (ConsumerRecords . empty ( ));
9295
9396 // When: Start the container and process records
9497 container .start ();
@@ -102,7 +105,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt
102105 verify (consumer , times (2 )).commitSync (any (), any (Duration .class ));
103106 }
104107
105- @ SuppressWarnings ("unchecked" )
108+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
106109 @ Test
107110 public void testRecordFilteredModeWithAllRecordsFiltered () throws InterruptedException {
108111 // Given: All records are filtered
@@ -139,7 +142,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc
139142
140143 given (consumer .poll (any (Duration .class )))
141144 .willReturn (consumerRecords )
142- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
145+ .willReturn (ConsumerRecords . empty ( ));
143146
144147 // When
145148 container .start ();
@@ -150,7 +153,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc
150153 verify (consumer , never ()).commitSync (any (), any (Duration .class ));
151154 }
152155
153- @ SuppressWarnings ("unchecked" )
156+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
154157 @ Test
155158 public void testRecordFilteredModeWithMixedPartitions () throws InterruptedException {
156159 // Given: Mixed partitions with different filtering scenarios
@@ -201,7 +204,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept
201204
202205 given (consumer .poll (any (Duration .class )))
203206 .willReturn (consumerRecords )
204- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
207+ .willReturn (ConsumerRecords . empty ( ));
205208
206209 // When
207210 container .start ();
@@ -213,7 +216,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept
213216 verify (consumer , times (3 )).commitSync (any (), any (Duration .class ));
214217 }
215218
216- @ SuppressWarnings ("unchecked" )
219+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
217220 @ Test
218221 public void testRecordFilteredModeEfficiencyGains () throws InterruptedException {
219222 ConsumerFactory <String , String > consumerFactory = mock (ConsumerFactory .class );
@@ -266,7 +269,7 @@ public void testRecordFilteredModeEfficiencyGains() throws InterruptedException
266269 verify (consumer , times (1 )).commitSync (any (), any (Duration .class ));
267270 }
268271
269- @ SuppressWarnings ("unchecked" )
272+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
270273 @ Test
271274 public void testRecordFilteredModeDoesNotBreakNormalProcessing () throws InterruptedException {
272275 ConsumerFactory <String , String > consumerFactory = mock (ConsumerFactory .class );
0 commit comments