Skip to content

Commit c8575f8

Browse files
committed
Fix deprecation warnings
* Remove and already deprecated `KafkaConsumer.close(Duration)` method override from the `DefaultKafkaConsumerFactory`. Use `close(CloseOptions option)` instead * Replace a deprecated `org.springframework.context.expression.MapAccessor` with a new `org.springframework.expression.spel.support.MapAccessor` * Fix `DefaultKafkaConsumerFactoryTests` & `RetryTopicConfigurationManualAssignmentIntegrationTests` to not use a deprecated `KafkaConsumer.close(Duration)`, but based on the `CloseOptions.timeout()` instead
1 parent 68e797f commit c8575f8

File tree

4 files changed

+12
-8
lines changed

4 files changed

+12
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.kafka.core;
1818

19-
import java.time.Duration;
2019
import java.util.ArrayList;
2120
import java.util.Collections;
2221
import java.util.Enumeration;
@@ -32,6 +31,7 @@
3231
import java.util.function.Supplier;
3332

3433
import org.apache.commons.logging.LogFactory;
34+
import org.apache.kafka.clients.consumer.CloseOptions;
3535
import org.apache.kafka.clients.consumer.Consumer;
3636
import org.apache.kafka.clients.consumer.ConsumerConfig;
3737
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -524,8 +524,8 @@ public void close() {
524524
}
525525

526526
@Override
527-
public void close(Duration timeout) {
528-
super.close(timeout);
527+
public void close(CloseOptions option) {
528+
super.close(option);
529529
notifyConsumerRemoved();
530530
}
531531

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@
4343
import org.jspecify.annotations.Nullable;
4444
import reactor.core.publisher.Mono;
4545

46-
import org.springframework.context.expression.MapAccessor;
4746
import org.springframework.core.MethodParameter;
4847
import org.springframework.core.log.LogAccessor;
4948
import org.springframework.expression.BeanResolver;
5049
import org.springframework.expression.Expression;
5150
import org.springframework.expression.common.LiteralExpression;
5251
import org.springframework.expression.spel.standard.SpelExpressionParser;
52+
import org.springframework.expression.spel.support.MapAccessor;
5353
import org.springframework.expression.spel.support.StandardEvaluationContext;
5454
import org.springframework.expression.spel.support.StandardTypeConverter;
5555
import org.springframework.kafka.core.KafkaTemplate;
@@ -735,8 +735,8 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
735735
try {
736736
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
737737
handleException(request, acknowledgment, consumer, source,
738-
new ListenerExecutionFailedException(createMessagingErrorMessage(
739-
"Async Fail", Objects.requireNonNull(source).getPayload()), cause));
738+
new ListenerExecutionFailedException(createMessagingErrorMessage(
739+
"Async Fail", Objects.requireNonNull(source).getPayload()), cause));
740740
}
741741
catch (Throwable ex) {
742742
acknowledge(acknowledgment);

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.stream.Collectors;
3232
import java.util.stream.Stream;
3333

34+
import org.apache.kafka.clients.consumer.CloseOptions;
3435
import org.apache.kafka.clients.consumer.Consumer;
3536
import org.apache.kafka.clients.consumer.ConsumerConfig;
3637
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -490,7 +491,7 @@ public void consumerRemoved(String id, Consumer consumer) {
490491
assertThat(adds.get(0)).isEqualTo("cf.foo-0");
491492
assertThat(removals).isEmpty();
492493
if (closeWithTimeout) {
493-
consumer.close(Duration.ofSeconds(10));
494+
consumer.close(CloseOptions.timeout(Duration.ofSeconds(10)));
494495
}
495496
else {
496497
consumer.close();

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424

2525
import org.apache.kafka.clients.admin.AdminClientConfig;
26+
import org.apache.kafka.clients.consumer.CloseOptions;
2627
import org.apache.kafka.clients.consumer.Consumer;
2728
import org.apache.kafka.clients.consumer.ConsumerConfig;
2829
import org.apache.kafka.common.PartitionInfo;
@@ -55,6 +56,8 @@
5556

5657
/**
5758
* @author Gary Russell
59+
* @author Artem Bilan
60+
*
5861
* @since 2.7.7
5962
*
6063
*/
@@ -89,7 +92,7 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact
8992
assertThat(config.latch.await(120, TimeUnit.SECONDS)).isTrue();
9093
}
9194
finally {
92-
consumer.close(Duration.ofSeconds(10));
95+
consumer.close(CloseOptions.timeout(Duration.ofSeconds(10)));
9396
}
9497
}
9598

0 commit comments

Comments
 (0)