Skip to content

Commit a4251a9

Browse files
Igor Macedo Quintanilhaigormq
authored andcommitted
support per-record observations in batch listeners
Signed-off-by: Igor Macedo Quintanilha <[email protected]>
1 parent 6425682 commit a4251a9

File tree

6 files changed

+457
-4
lines changed

6 files changed

+457
-4
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,28 @@ The `record` property in both observation contexts contains the `ConsumerRecord`
119119
The sender and receiver contexts `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`.
120120
If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into ``KafkaTemplate``s and listener containers.
121121
When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker.
122+
123+
[[batch-listener-obs]]
124+
=== Batch Listener Observations
125+
126+
When using a batch listener, by default, no observations are created, even if a `ObservationRegistry` is present.
127+
This is because the scope of an observation is tied to the thread, and with a batch listener, there is no one-to-one mapping between an observation and a record.
128+
129+
To enable per-record observations in a batch listener, set the container factory property `recordObservationsInBatch` to `true`.
130+
131+
[source,java]
132+
----
133+
@Bean
134+
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
135+
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
136+
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
137+
138+
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
139+
configurer.configure(factory, kafkaConsumerFactory);
140+
factory.getContainerProperties().setRecordObservationsInBatch(true);
141+
return factory;
142+
}
143+
----
144+
145+
When this property is `true`, an observation will be created for each record in the batch, but the observation is not propagated to the listener method.
146+
This allows you to have visibility into the processing of each record, even within a batch context.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba
7676

7777
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
7878
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].
79+
80+
[[x40-batch-observability]]
81+
=== Per-Record Observation in Batch Listeners
82+
83+
It is now possible to get an observation for each record when using a batch listener.
84+
See xref:kafka/micrometer.adoc#batch-listener-obs[Observability for Batch Listeners] for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@ public enum EOSMode {
310310

311311
private boolean restartAfterAuthExceptions;
312312

313+
private boolean recordObservationsInBatch;
314+
313315
/**
314316
* Create properties for a container that will subscribe to the specified topics.
315317
* @param topics the topics.
@@ -1091,6 +1093,27 @@ public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions) {
10911093
this.restartAfterAuthExceptions = restartAfterAuthExceptions;
10921094
}
10931095

1096+
/**
1097+
* When true, and a batch listener is configured with observation enabled, an observation
1098+
* will be started for each record in the batch.
1099+
* @return recordObservationsInBatch.
1100+
* @since 4.0
1101+
*/
1102+
public boolean isRecordObservationsInBatch() {
1103+
return this.recordObservationsInBatch;
1104+
}
1105+
1106+
/**
1107+
* Set whether to enable individual record observations in a batch.
1108+
* When true, and a batch listener is configured with observation enabled, an observation
1109+
* will be started for each record in the batch. Default false.
1110+
* @param recordObservationsInBatch true to enable individual record observations.
1111+
* @since 4.0
1112+
*/
1113+
public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
1114+
this.recordObservationsInBatch = recordObservationsInBatch;
1115+
}
1116+
10941117
@Override
10951118
public String toString() {
10961119
return "ContainerProperties ["
@@ -1141,6 +1164,7 @@ public String toString() {
11411164
? "\n observationRegistry=" + this.observationRegistry
11421165
: "")
11431166
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
1167+
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
11441168
+ "\n]";
11451169
}
11461170

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
898898
this.isBatchListener = true;
899899
this.wantsFullRecords = this.batchListener.wantsPollResult();
900900
this.pollThreadStateProcessor = setUpPollProcessor(true);
901-
this.observationEnabled = false;
901+
this.observationEnabled = this.containerProperties.isObservationEnabled() && this.containerProperties.isRecordObservationsInBatch();
902902
}
903903
else if (listener instanceof MessageListener) {
904904
this.listener = (MessageListener<K, V>) listener;
@@ -2423,6 +2423,21 @@ private void ackBatch(final ConsumerRecords<K, V> records) throws InterruptedExc
24232423
}
24242424
}
24252425

2426+
private void invokeBatchWithIndividualRecordObservation(List<ConsumerRecord<K, V>> recordList) {
2427+
// Create individual observations for each record without scopes
2428+
for (ConsumerRecord<K, V> record : recordList) {
2429+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
2430+
this.containerProperties.getObservationConvention(),
2431+
DefaultKafkaListenerObservationConvention.INSTANCE,
2432+
() -> new KafkaRecordReceiverContext(record, getListenerId(), getClientId(), this.consumerGroupId,
2433+
this::clusterId),
2434+
this.observationRegistry);
2435+
observation.observe(() -> {
2436+
this.logger.debug(() -> "Observing record in batch: " + KafkaUtils.format(record));
2437+
});
2438+
}
2439+
}
2440+
24262441
private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> recordsArg,
24272442
List<ConsumerRecord<K, V>> recordListArg) {
24282443

@@ -2443,7 +2458,13 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24432458
}
24442459
}
24452460
Object sample = startMicrometerSample();
2461+
2462+
24462463
try {
2464+
if (this.observationEnabled) {
2465+
invokeBatchWithIndividualRecordObservation(recordList);
2466+
}
2467+
24472468
if (this.wantsFullRecords) {
24482469
Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR
24492470
this.isAnyManualAck

0 commit comments

Comments
 (0)