Skip to content

Commit 7e64584

Browse files
authored
Add comprehensive acknowledgment support for Kafka share consumers
* Add comprehensive acknowledgment support for Kafka share consumers Implement explicit and implicit acknowledgment modes for share consumer containers, enabling fine-grained control over record processing outcomes with ACCEPT, RELEASE, and REJECT acknowledgment types. - Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support - Add ShareAcknowledgmentException for acknowledgment failures - Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties - Add poll-level acknowledgment constraints in explicit mode - Add ShareConsumerAwareMessageListener for ShareConsumer access - Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment - Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration - Use non-polymorphic onShareRecord method names to avoid regression issues with existing listener infrastructure and maintain clear API separation - Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking - Automatic error handling with REJECT acknowledgment on exceptions - Poll blocking in explicit mode until all records acknowledged - Support for mixed acknowledgment patterns within single poll - Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints - Validation preventing batch listeners with share consumers - Factory-level and container-level acknowledgment mode configuration - Message converter extensions for ShareAcknowledgment parameter injection - Comprehensive integration tests covering all acknowledgment scenarios - Constraint tests validating poll-level acknowledgment requirements - Unit tests for container behavior and listener dispatching - Updated documentation with acknowledgment examples - Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error - Explicit: Application must manually acknowledge each record before next poll - Explicit mode blocks subsequent polls until all records acknowledged - Prevents message loss and ensures proper acknowledgment ordering - Concurrent acknowledgment attempts properly handled with IllegalStateException - Processing exceptions trigger automatic REJECT acknowledgment - Acknowledgment failures reset state and throw ShareAcknowledgmentException - Container continues processing after individual record failures - ShareAcknowledgment parameter injection follows existing Spring Kafka patterns - Non-polymorphic listener method names (onShareRecord vs onMessage) prevent potential conflicts with existing listener infrastructure and ensure clear separation between regular and share consumer listener contracts - Factory and container level configuration options provide flexibility This implementation provides acknowledgment semantics for Kafka share groups while maintaining backward compatibility with existing implicit acknowledgment behavior. * Refactor share consumer acknowledgment interfaces and improve code quality - Simplify to single AcknowledgingShareConsumerAwareMessageListener interface with nullable acknowledgment - Remove redundant ShareConsumerAwareMessageListener interface - Clean up ShareAcknowledgmentMode enum by removing unnecessary string property - Replace verbose null checks with Boolean.TRUE.equals() pattern - Use enum values for acknowledgment mode validation instead of hardcoded strings - Update tests and documentation to use unified interface - Fix spacing issues in @nullable annotations - Fixing other formatting issues Improve share consumer polling behavior and add isShareConsumer() helper - Add isShareConsumer() helper method to AbstractKafkaListenerEndpoint for cleaner boolean checks - Replace verbose Boolean.TRUE.equals() pattern with cleaner isShareConsumer() call - Handle KIP-932 IllegalStateException when polling with unacknowledged records in explicit mode - Add minimal 10ms delay to prevent tight loop while maintaining responsiveness - Remove problematic pre-poll blocking logic that prevented proper exception handling The share consumer now properly handles the broker's IllegalStateException when attempting to poll with unacknowledged records, as specified in KIP-932. This maintains heartbeat while waiting for acknowledgments and prevents CPU-intensive tight loops. Fix thread safety in ShareConsumer acknowledgments ShareConsumer is not thread-safe and requires all access to happen on the consumer thread. The previous implementation allowed acknowledgment calls from listener threads to directly access the consumer, causing ConcurrentModificationException. Changes: - Add PendingAcknowledgment queue to safely pass acknowledgments between threads - Process queued acknowledgments on the consumer thread during poll loop - Remove direct consumer access from ShareConsumerAcknowledgment.acknowledgeInternal() - Add notifyAcknowledged() callback for acknowledgment completion This ensures all ShareConsumer interactions happen on the owning consumer thread, eliminating race conditions between polling and acknowledgment operations. The thread safety issue was exposed by removing the pre-poll sleep, which previously masked the concurrent access by creating timing windows where the consumer was dormant. * Simplify share consumer acknowledgment configuration and consolidate message converters - Replace ShareAcknowledgmentMode enum with boolean explicitShareAcknowledgment field - Consolidate duplicate toMessage/toShareMessage methods into single toMessage with Object parameters - Merge invoke/invokeHandler methods in MessagingMessageListenerAdapter to use Object parameters - Eliminate duplicate commonHeaders methods in MessageConverter interface - Add validation for explicit acknowledgment mode requiring proper listener interface - Simplify ShareKafkaMessageListenerContainerUnitTests to focus on configuration validation - Other formatting fixes in docs and javadocs * Improve logging and test reliability for share consumer implementation - Replace Thread.sleep() with CountDownLatch assertions in tests for reliable synchronization and faster test execution - Use LogMessage.format() instead of Supplier with String.format() for consistent logging patterns - Fix lambda expression to use effectively final variable in error logging - Change shareConsumer field from nullable Boolean to primitive boolean in AbstractKafkaListenerEndpoint for cleaner null handling - Make utility methods static where appropriate for better code organization * - Revert latch-based negative assertions back to Thread.sleep() in constraint and integration tests, as testing for non-occurrence (await expecting false) still blocks for full timeout duration - Use chained assertion style for acknowledgment validations * Consolidate share consumer constraint tests into integration tests - Replace Thread.sleep() with Awaitility pattern that verifies poll blocking via trace log - Add Mockito spy on LogAccessor to confirm the logging message - Migrate shouldHandlePartialAcknowledgmentCorrectly test from ConstraintTests - Migrate shouldHandleConcurrentAcknowledgmentAttempts test from ConstraintTests - Fix topic collision by assigning unique topics to each test - Remove duplicate ShareKafkaMessageListenerContainerConstraintTests class This improves test reliability by verifying actual blocking behavior rather than assuming Thread.sleep duration is sufficient, and eliminates test duplication. * Refactor ShareKafkaListenerContainerFactory and fix compilation warnings - Change autoStartup and phase from nullable wrappers to primitives with defaults - Add @SuppressWarnings(NullAway.Init) to applicationEventPublisher and applicationContext - Use ShareAcknowledgementMode.fromString() API instead of manual string parsing - Use ArgumentMatchers with explicit type parameters to eliminate unchecked warnings in tests - Remove deprecated imports from ShareRecordMessagingMessageListenerAdapter - Use FQCN for JacksonProjectingMessageConverter and ProjectingMessageConverter - Fix syntax error in kafka-queues.adoc documentation example Signed-off-by: Soby Chacko <[email protected]>
1 parent 17fe0ff commit 7e64584

18 files changed

+2309
-142
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 278 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -201,25 +201,6 @@ public class ShareMessageListener {
201201
}
202202
----
203203

204-
[[share-group-configuration]]
205-
== Share Group Configuration
206-
207-
Share groups require specific broker configuration to function properly.
208-
For testing with embedded Kafka, use:
209-
210-
[source,java]
211-
----
212-
@EmbeddedKafka(
213-
topics = {"my-queue-topic"},
214-
brokerProperties = {
215-
"unstable.api.versions.enable=true",
216-
"group.coordinator.rebalance.protocols=classic,share",
217-
"share.coordinator.state.topic.replication.factor=1",
218-
"share.coordinator.state.topic.min.isr=1"
219-
}
220-
)
221-
----
222-
223204
[[share-group-offset-reset]]
224205
=== Share Group Offset Reset
225206

@@ -248,8 +229,277 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws
248229
[[share-record-acknowledgment]]
249230
== Record Acknowledgment
250231

251-
Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing.
252-
More sophisticated acknowledgment patterns will be added in future versions.
232+
Share consumers support two acknowledgment modes that control how records are acknowledged after processing.
233+
234+
[[share-implicit-acknowledgment]]
235+
=== Implicit Acknowledgment (Default)
236+
237+
In implicit mode, records are automatically acknowledged based on processing outcome:
238+
239+
Successful processing: Records are acknowledged as `ACCEPT`
240+
Processing errors: Records are acknowledged as `REJECT`
241+
242+
[source,java]
243+
----
244+
@Bean
245+
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
246+
ShareConsumerFactory<String, String> shareConsumerFactory) {
247+
ShareKafkaListenerContainerFactory<String, String> factory =
248+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
249+
// Implicit mode is the default (false means implicit acknowledgment)
250+
factory.getContainerProperties().setExplicitShareAcknowledgment(false);
251+
252+
return factory;
253+
}
254+
----
255+
256+
[[share-explicit-acknowledgment]]
257+
=== Explicit Acknowledgment
258+
259+
In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment.
260+
261+
There are two ways to configure explicit acknowledgment mode:
262+
263+
==== Option 1: Using Kafka Client Configuration
264+
265+
[source,java]
266+
----
267+
@Bean
268+
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
269+
Map<String, Object> props = new HashMap<>();
270+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
271+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
272+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
273+
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
274+
return new DefaultShareConsumerFactory<>(props);
275+
}
276+
----
277+
278+
==== Option 2: Using Spring Container Configuration
279+
280+
[source,java]
281+
----
282+
@Bean
283+
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
284+
ShareConsumerFactory<String, String> shareConsumerFactory) {
285+
286+
ShareKafkaListenerContainerFactory<String, String> factory =
287+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
288+
289+
// Configure acknowledgment mode at container factory level
290+
// true means explicit acknowledgment is required
291+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
292+
293+
return factory;
294+
}
295+
----
296+
297+
==== Configuration Precedence
298+
299+
When both configuration methods are used, Spring Kafka follows this precedence order (highest to lowest):
300+
301+
1. **Container Properties**: `containerProperties.setExplicitShareAcknowledgment(true/false)`
302+
2. **Consumer Config**: `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG` ("implicit" or "explicit")
303+
3. **Default**: `false` (implicit acknowledgment)
304+
305+
[[share-acknowledgment-types]]
306+
=== Acknowledgment Types
307+
308+
Share consumers support three acknowledgment types:
309+
310+
ACCEPT: Record processed successfully, mark as completed
311+
RELEASE: Temporary failure, make record available for redelivery
312+
REJECT: Permanent failure, do not retry
313+
314+
[[share-acknowledgment-api]]
315+
=== ShareAcknowledgment API
316+
317+
The `ShareAcknowledgment` interface provides methods for explicit acknowledgment:
318+
319+
[source,java]
320+
----
321+
public interface ShareAcknowledgment {
322+
void acknowledge();
323+
void release();
324+
void reject();
325+
}
326+
----
327+
328+
[[share-listener-interfaces]]
329+
=== Listener Interfaces
330+
331+
Share consumers support specialized listener interfaces for different use cases:
332+
333+
[[share-basic-listener]]
334+
==== Basic Message Listener
335+
336+
Use the standard MessageListener for simple cases:
337+
[source,java]
338+
----
339+
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
340+
public void listen(ConsumerRecord<String, String> record) {
341+
System.out.println("Received: " + record.value());
342+
// Automatically acknowledged in implicit mode
343+
}
344+
----
345+
346+
[[share-acknowledging-listener]]
347+
==== AcknowledgingShareConsumerAwareMessageListener
348+
349+
This interface provides access to the `ShareConsumer` instance with optional acknowledgment support.
350+
The acknowledgment parameter is nullable and depends on the container's acknowledgment mode:
351+
352+
===== Implicit Mode Example (acknowledgment is null)
353+
354+
[source,java]
355+
----
356+
@KafkaListener(
357+
topics = "my-topic",
358+
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
359+
)
360+
public void listen(ConsumerRecord<String, String> record,
361+
@Nullable ShareAcknowledgment acknowledgment,
362+
ShareConsumer<?, ?> consumer) {
363+
364+
// In implicit mode, acknowledgment is null
365+
System.out.println("Received: " + record.value());
366+
367+
// Access consumer metrics if needed
368+
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
369+
370+
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
371+
}
372+
----
373+
374+
===== Explicit Mode Example (acknowledgment is non-null)
375+
376+
[source,java]
377+
----
378+
@Component
379+
public class ExplicitAckListener {
380+
@KafkaListener(
381+
topics = "my-topic",
382+
containerFactory = "explicitShareKafkaListenerContainerFactory"
383+
)
384+
public void listen(ConsumerRecord<String, String> record,
385+
@Nullable ShareAcknowledgment acknowledgment,
386+
ShareConsumer<?, ?> consumer) {
387+
388+
// In explicit mode, acknowledgment is non-null
389+
try {
390+
processRecord(record);
391+
acknowledgment.acknowledge(); // ACCEPT
392+
}
393+
catch (RetryableException e) {
394+
acknowledgment.release(); // Will be redelivered
395+
}
396+
catch (Exception e) {
397+
acknowledgment.reject(); // Permanent failure
398+
}
399+
}
400+
401+
private void processRecord(ConsumerRecord<String, String> record) {
402+
// Business logic here
403+
}
404+
}
405+
----
406+
407+
[[share-acknowledgment-constraints]]
408+
=== Acknowledgment Constraints
409+
410+
In explicit acknowledgment mode, the container enforces important constraints:
411+
412+
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
413+
One-time Acknowledgment: Each record can only be acknowledged once.
414+
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
415+
416+
[WARNING]
417+
In explicit mode, failing to acknowledge records will block further message processing.
418+
Always ensure records are acknowledged in all code paths.
419+
420+
[[share-acknowledgment-timeout]]
421+
==== Acknowledgment Timeout Detection
422+
423+
To help identify missing acknowledgments, Spring Kafka provides configurable timeout detection.
424+
When a record is not acknowledged within the specified timeout, a warning is logged with details about the unacknowledged record.
425+
426+
[source,java]
427+
----
428+
@Bean
429+
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
430+
ShareConsumerFactory<String, String> shareConsumerFactory) {
431+
ShareKafkaListenerContainerFactory<String, String> factory =
432+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
433+
434+
// Set acknowledgment timeout (default is 30 seconds)
435+
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
436+
437+
return factory;
438+
}
439+
----
440+
441+
When a record exceeds the timeout, you'll see a warning like:
442+
----
443+
WARN: Record not acknowledged within timeout (30 seconds).
444+
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
445+
or ack.reject() for every record.
446+
----
447+
448+
This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments.
449+
450+
[[share-acknowledgment-examples]]
451+
=== Acknowledgment Examples
452+
453+
[[share-mixed-acknowledgment-example]]
454+
==== Mixed Acknowledgment Patterns
455+
456+
[source,java]
457+
----
458+
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
459+
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
460+
String orderId = record.key();
461+
String orderData = record.value();
462+
try {
463+
if (isValidOrder(orderData)) {
464+
if (processOrder(orderData)) {
465+
acknowledgment.acknowledge(); // Success - ACCEPT
466+
}
467+
else {
468+
acknowledgment.release(); // Temporary failure - retry later
469+
}
470+
}
471+
else {
472+
acknowledgment.reject(); // Invalid order - don't retry
473+
}
474+
}
475+
catch (Exception e) {
476+
// Exception automatically triggers REJECT
477+
throw e;
478+
}
479+
}
480+
----
481+
482+
[[share-conditional-acknowledgment-example]]
483+
==== Conditional Acknowledgment
484+
485+
[source,java]
486+
----
487+
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
488+
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
489+
ValidationResult result = validator.validate(record.value());
490+
switch (result.getStatus()) {
491+
case VALID:
492+
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
493+
break;
494+
case INVALID_RETRYABLE:
495+
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
496+
break;
497+
case INVALID_PERMANENT:
498+
acknowledgment.acknowledge(AcknowledgeType.REJECT);
499+
break;
500+
}
501+
}
502+
----
253503

254504
[[share-differences-from-regular-consumers]]
255505
== Differences from Regular Consumers
@@ -259,16 +509,19 @@ Share consumers differ from regular consumers in several key ways:
259509
1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions
260510
2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns
261511
3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously
262-
4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing
512+
4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types
263513
5. **Different Group Management**: Share groups use different coordinator protocols
514+
6. **No Batch Processing**: Share consumers process records individually, not in batches
264515

265516
[[share-limitations-and-considerations]]
266517
== Limitations and Considerations
267518

268519
[[share-current-limitations]]
269520
=== Current Limitations
270521

271-
* **Early Access**: This feature is in early access and may change in future versions
272-
* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported
273-
* **No Message Converters**: Message converters are not yet supported for share consumers
522+
* **In preview**: This feature is in preview mode and may change in future versions
274523
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
524+
* **No Message Converters**: Message converters are not yet supported for share consumers
525+
* **No Batch Listeners**: Batch processing is not supported with share consumers
526+
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls
527+

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
9090
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
9191
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
92+
import org.springframework.kafka.config.ShareKafkaListenerContainerFactory;
9293
import org.springframework.kafka.listener.ContainerGroupSequencer;
9394
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
9495
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
@@ -651,6 +652,10 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
651652
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
652653
containerFactory, beanName);
653654

655+
if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) {
656+
endpoint.setShareConsumer(true);
657+
}
658+
654659
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
655660
}
656661

@@ -685,6 +690,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
685690
if (StringUtils.hasText(kafkaListener.batch())) {
686691
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
687692
}
693+
688694
endpoint.setBeanFactory(this.beanFactory);
689695
resolveErrorHandler(endpoint, kafkaListener);
690696
resolveContentTypeConverter(endpoint, kafkaListener);

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
9898

9999
private @Nullable Boolean batchListener;
100100

101+
private boolean shareConsumer;
102+
101103
private @Nullable KafkaTemplate<?, ?> replyTemplate;
102104

103105
private @Nullable String clientIdPrefix;
@@ -291,6 +293,19 @@ public void setBatchListener(boolean batchListener) {
291293
this.batchListener = batchListener;
292294
}
293295

296+
public void setShareConsumer(boolean shareConsumer) {
297+
this.shareConsumer = shareConsumer;
298+
}
299+
300+
/**
301+
* Return true if this endpoint is for a share consumer.
302+
* @return true for a share consumer endpoint.
303+
* @since 4.0
304+
*/
305+
public boolean isShareConsumer() {
306+
return this.shareConsumer;
307+
}
308+
294309
/**
295310
* Set the {@link KafkaTemplate} to use to send replies.
296311
* @param replyTemplate the template.

0 commit comments

Comments
 (0)