Skip to content

Conversation

sobychacko
Copy link
Contributor

Implement explicit and implicit acknowledgment modes for share consumer containers, enabling fine-grained control over record processing outcomes with ACCEPT, RELEASE, and REJECT acknowledgment types.

  • Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support

  • Add ShareAcknowledgmentException for acknowledgment failures

  • Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties

  • Add poll-level acknowledgment constraints in explicit mode

  • Add ShareConsumerAwareMessageListener for ShareConsumer access

  • Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment

  • Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration

  • Use non-polymorphic onShareRecord method names to avoid regression issues with existing listener infrastructure and maintain clear API separation

  • Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking

  • Automatic error handling with REJECT acknowledgment on exceptions

  • Poll blocking in explicit mode until all records acknowledged

  • Support for mixed acknowledgment patterns within single poll

  • Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints

  • Validation preventing batch listeners with share consumers

  • Factory-level and container-level acknowledgment mode configuration

  • Message converter extensions for ShareAcknowledgment parameter injection

  • Comprehensive integration tests covering all acknowledgment scenarios

  • Constraint tests validating poll-level acknowledgment requirements

  • Unit tests for container behavior and listener dispatching

  • Updated documentation with acknowledgment examples

  • Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error

  • Explicit: Application must manually acknowledge each record before next poll

  • Explicit mode blocks subsequent polls until all records acknowledged

  • Prevents message loss and ensures proper acknowledgment ordering

  • Concurrent acknowledgment attempts properly handled with IllegalStateException

  • Processing exceptions trigger automatic REJECT acknowledgment

  • Acknowledgment failures reset state and throw ShareAcknowledgmentException

  • Container continues processing after individual record failures

  • ShareAcknowledgment parameter injection follows existing Spring Kafka patterns

  • Non-polymorphic listener method names (onShareRecord vs onMessage) prevent potential conflicts with existing listener infrastructure and ensure clear separation between regular and share consumer listener contracts

  • Factory and container level configuration options provide flexibility

This implementation provides acknowledgment semantics for Kafka share groups while maintaining backward compatibility with existing implicit acknowledgment behavior.

@artembilan artembilan added this to the 4.0.0-RC1 milestone Sep 18, 2025
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The review so far.
Might be the case that some of it is void according to your new vision, but still might be helpful.

Thanks

@sobychacko sobychacko force-pushed the share-group-acknowledgment-changes branch from 178aedf to 11bf13e Compare September 24, 2025 20:11
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the review is too long.
But thanks for doing this not easy stuff!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another review round.

produceTestRecords(bootstrapServers, topic, 2);

// Wait and verify second batch is NOT processed yet
Thread.sleep(3000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is reliable way to delay.
Please, consider to use some other async barrier instead.

assertThat(pendingAcks).hasSize(3);

// Wait a bit to ensure no more records are processed while acknowledgments are pending
Thread.sleep(2000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we ensure ordering instead of vague timeout?

Implement explicit and implicit acknowledgment modes for share consumer
containers, enabling fine-grained control over record processing outcomes
with ACCEPT, RELEASE, and REJECT acknowledgment types.

- Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support
- Add ShareAcknowledgmentException for acknowledgment failures
- Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties
- Add poll-level acknowledgment constraints in explicit mode

- Add ShareConsumerAwareMessageListener for ShareConsumer access
- Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment
- Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration
- Use non-polymorphic onShareRecord method names to avoid regression issues
  with existing listener infrastructure and maintain clear API separation

- Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking
- Automatic error handling with REJECT acknowledgment on exceptions
- Poll blocking in explicit mode until all records acknowledged
- Support for mixed acknowledgment patterns within single poll

- Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints
- Validation preventing batch listeners with share consumers
- Factory-level and container-level acknowledgment mode configuration
- Message converter extensions for ShareAcknowledgment parameter injection

- Comprehensive integration tests covering all acknowledgment scenarios
- Constraint tests validating poll-level acknowledgment requirements
- Unit tests for container behavior and listener dispatching
- Updated documentation with acknowledgment examples

- Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error
- Explicit: Application must manually acknowledge each record before next poll

- Explicit mode blocks subsequent polls until all records acknowledged
- Prevents message loss and ensures proper acknowledgment ordering
- Concurrent acknowledgment attempts properly handled with IllegalStateException

- Processing exceptions trigger automatic REJECT acknowledgment
- Acknowledgment failures reset state and throw ShareAcknowledgmentException
- Container continues processing after individual record failures

- ShareAcknowledgment parameter injection follows existing Spring Kafka patterns
- Non-polymorphic listener method names (onShareRecord vs onMessage) prevent
  potential conflicts with existing listener infrastructure and ensure clear
  separation between regular and share consumer listener contracts
- Factory and container level configuration options provide flexibility

This implementation provides acknowledgment semantics for Kafka share groups
while maintaining backward compatibility with existing implicit acknowledgment behavior.

Signed-off-by: Soby Chacko <[email protected]>
…ality

- Simplify to single AcknowledgingShareConsumerAwareMessageListener interface with nullable acknowledgment
- Remove redundant ShareConsumerAwareMessageListener interface
- Clean up ShareAcknowledgmentMode enum by removing unnecessary string property
- Replace verbose null checks with Boolean.TRUE.equals() pattern
- Use enum values for acknowledgment mode validation instead of hardcoded strings
- Update tests and documentation to use unified interface
- Fix spacing issues in @nullable annotations
- Fixing other formatting issues

Improve share consumer polling behavior and add isShareConsumer() helper

- Add isShareConsumer() helper method to AbstractKafkaListenerEndpoint for cleaner boolean checks
- Replace verbose Boolean.TRUE.equals() pattern with cleaner isShareConsumer() call
- Handle KIP-932 IllegalStateException when polling with unacknowledged records in explicit mode
- Add minimal 10ms delay to prevent tight loop while maintaining responsiveness
- Remove problematic pre-poll blocking logic that prevented proper exception handling

The share consumer now properly handles the broker's IllegalStateException when attempting
to poll with unacknowledged records, as specified in KIP-932. This maintains heartbeat
while waiting for acknowledgments and prevents CPU-intensive tight loops.

Fix thread safety in ShareConsumer acknowledgments

ShareConsumer is not thread-safe and requires all access to happen on the consumer thread.
The previous implementation allowed acknowledgment calls from listener threads to directly
access the consumer, causing ConcurrentModificationException.

Changes:
- Add PendingAcknowledgment queue to safely pass acknowledgments between threads
- Process queued acknowledgments on the consumer thread during poll loop
- Remove direct consumer access from ShareConsumerAcknowledgment.acknowledgeInternal()
- Add notifyAcknowledged() callback for acknowledgment completion

This ensures all ShareConsumer interactions happen on the owning consumer thread,
eliminating race conditions between polling and acknowledgment operations.

The thread safety issue was exposed by removing the pre-poll sleep, which previously
masked the concurrent access by creating timing windows where the consumer was dormant.

Signed-off-by: Soby Chacko <[email protected]>
…message converters

  - Replace ShareAcknowledgmentMode enum with boolean explicitShareAcknowledgment field
  - Consolidate duplicate toMessage/toShareMessage methods into single toMessage with Object parameters
  - Merge invoke/invokeHandler methods in MessagingMessageListenerAdapter to use Object parameters
  - Eliminate duplicate commonHeaders methods in MessageConverter interface
  - Add validation for explicit acknowledgment mode requiring proper listener interface
  - Simplify ShareKafkaMessageListenerContainerUnitTests to focus on configuration validation
  - Other formatting fixes in docs and javadocs

Signed-off-by: Soby Chacko <[email protected]>
  - Replace Thread.sleep() with CountDownLatch assertions in tests for
    reliable synchronization and faster test execution
  - Use LogMessage.format() instead of Supplier with String.format()
    for consistent logging patterns
  - Fix lambda expression to use effectively final variable in error logging
  - Change shareConsumer field from nullable Boolean to primitive boolean
    in AbstractKafkaListenerEndpoint for cleaner null handling
  - Make utility methods static where appropriate for better code organization

Signed-off-by: Soby Chacko <[email protected]>
  constraint and integration tests, as testing for non-occurrence
  (await expecting false) still blocks for full timeout duration

- Use chained assertion style for acknowledgment validations

Signed-off-by: Soby Chacko <[email protected]>
@sobychacko sobychacko force-pushed the share-group-acknowledgment-changes branch from 37a23ac to 6bc2806 Compare October 2, 2025 14:41
  - Replace Thread.sleep() with Awaitility pattern that verifies poll blocking via trace log
  - Add Mockito spy on LogAccessor to confirm the logging message
  - Migrate shouldHandlePartialAcknowledgmentCorrectly test from ConstraintTests
  - Migrate shouldHandleConcurrentAcknowledgmentAttempts test from ConstraintTests
  - Fix topic collision by assigning unique topics to each test
  - Remove duplicate ShareKafkaMessageListenerContainerConstraintTests class

  This improves test reliability by verifying actual blocking behavior rather than
  assuming Thread.sleep duration is sufficient, and eliminates test duplication.

Signed-off-by: Soby Chacko <[email protected]>
@sobychacko sobychacko force-pushed the share-group-acknowledgment-changes branch from 6bc2806 to 43b8c43 Compare October 2, 2025 14:42
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's it.
Everything else is just great!

containerFactory, beanName);

if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) {
endpoint.setShareConsumer(Boolean.TRUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, address.


if (clientAckMode != null) {
String mode = clientAckMode.toString().toLowerCase();
if ("explicit".equals(mode)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rely on the ShareAcknowledgementMode.fromString() API instead of the manual logic in this method?

import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.ShareAcknowledgment;
import org.springframework.kafka.support.converter.JacksonProjectingMessageConverter;
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to avoid deprecated imports and use FQCN instead of the place of use where you already have that @SuppressWarnings("removal")

* @author Soby Chacko
* @since 4.0
*/
public class ShareAcknowledgmentException extends KafkaException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this class is out of use.
Perhaps just remove?

  - Change autoStartup and phase from nullable wrappers to primitives with defaults
  - Add @SuppressWarnings(NullAway.Init) to applicationEventPublisher and applicationContext
  - Use ShareAcknowledgementMode.fromString() API instead of manual string parsing
  - Use ArgumentMatchers with explicit type parameters to eliminate unchecked warnings in tests
  - Remove deprecated imports from ShareRecordMessagingMessageListenerAdapter
  - Use FQCN for JacksonProjectingMessageConverter and ProjectingMessageConverter
  - Fix syntax error in kafka-queues.adoc documentation example

Signed-off-by: Soby Chacko <[email protected]>
RecordMessageConverter messageConverter = getMessageConverter();
if (!(messageConverter instanceof JacksonProjectingMessageConverter
|| messageConverter instanceof ProjectingMessageConverter)) {
if (!(messageConverter instanceof org.springframework.kafka.support.converter.JacksonProjectingMessageConverter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an overreacting.
I have talked only about a deprecated ProjectingMessageConverter class.

@artembilan artembilan merged commit 7e64584 into spring-projects:main Oct 3, 2025
2 of 3 checks passed
artembilan added a commit to spring-projects/spring-integration that referenced this pull request Oct 3, 2025
Related to: spring-projects/spring-kafka#4087

In particular the `MessageConverter` in SK has a new signature from now on:
```
toMessage(ConsumerRecord<?, ?> record, Object acknowledgment, Object consumer, Type type);
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants