Skip to content

Commit d613fa3

Browse files
authored
Configure interceptors on container factory (#4080)
This changes allows configuring interceptors on AbstractKafkaListenerContainerFactory in same way as on KafkaMessageListenerContainer, and changes the example docs to show this instead. Fixes #4080 Signed-off-by: Christian Fredriksson <[email protected]>
1 parent 92d48ed commit d613fa3

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,24 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t
2222

2323
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
2424

25-
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
25+
Starting with version 4.0, `AbstractKafkaListenerContainerFactory` and `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
2626
If the returned interceptor is an instance of `CompositeRecordInterceptor` or `CompositeBatchInterceptor`, additional `RecordInterceptor` or `BatchInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` or `BatchInterceptor` has already been configured.
2727
The following example shows how to do so:
2828

2929
[source, java]
3030
----
31-
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
31+
public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Integer, String> containerFactory) {
3232
CompositeRecordInterceptor compositeInterceptor;
3333
34-
RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
34+
RecordInterceptor<Integer, String> previousInterceptor = containerFactory.getRecordInterceptor();
3535
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
3636
compositeInterceptor = interceptor;
3737
} else {
3838
compositeInterceptor = new CompositeRecordInterceptor<>();
39-
container.setRecordInterceptor(compositeInterceptor);
40-
}
41-
42-
if (previousInterceptor != null) {
43-
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
39+
containerFactory.setRecordInterceptor(compositeInterceptor);
40+
if (previousInterceptor != null) {
41+
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
42+
}
4443
}
4544
4645
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
* @author Stephane Nicoll
6262
* @author Gary Russell
6363
* @author Artem Bilan
64+
* @author Christian Fredriksson
6465
*
6566
* @see AbstractMessageListenerContainer
6667
*/
@@ -275,6 +276,15 @@ public ContainerProperties getContainerProperties() {
275276
return this.containerProperties;
276277
}
277278

279+
/**
280+
* Get the {@link RecordInterceptor} for modification, if configured.
281+
* @return the {@link RecordInterceptor}, or {@code null} if not configured
282+
* @since 4.0
283+
*/
284+
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
285+
return this.recordInterceptor;
286+
}
287+
278288
/**
279289
* Set an interceptor to be called before calling the listener.
280290
* Only used with record listeners.
@@ -286,6 +296,15 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
286296
this.recordInterceptor = recordInterceptor;
287297
}
288298

299+
/**
300+
* Get the {@link BatchInterceptor} for modification, if configured.
301+
* @return the {@link BatchInterceptor}, or {@code null} if not configured
302+
* @since 4.0
303+
*/
304+
public @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
305+
return this.batchInterceptor;
306+
}
307+
289308
/**
290309
* Set a batch interceptor to be called before and after calling the listener.
291310
* Only used with batch listeners.

0 commit comments

Comments
 (0)