Skip to content

Conversation

@kumarpritam863
Copy link
Contributor

@kumarpritam863 kumarpritam863 commented Nov 24, 2025

Summary

This PR adds support for pluggable Committer implementations in the Iceberg Kafka Connect connector, allowing users to provide custom commit strategies via configuration. Previously, the connector was hardcoded to use CommitterImpl. This change enables users to implement and configure custom committers without modifying the connector source code.

Motivation

Different use cases may require different commit strategies:

  • Alternative coordination mechanisms - Users may want to use external consensus systems (e.g., ZooKeeper, etcd, Apache Ratis)
  • Custom commit policies - Time-based, size-based, or business-logic-driven commit strategies
  • Enhanced monitoring - Custom metrics collection and observability
  • Specialized error handling - Domain-specific retry logic and failure recovery

The current implementation hardcodes CommitterImpl, requiring users to fork the codebase to implement custom commit logic. This PR makes the committer pluggable while maintaining full backward
compatibility.

Backward Compatibility

This change is fully backward compatible:

  • When iceberg.committer.class is not configured, uses default CommitterImpl
  • Existing configurations work without modifications
  • No changes to the Committer interface
  • No breaking API changes

Usage Example

Uses default CommitterImpl
Default Behavior (No Configuration Changes):
iceberg.tables=my_db.my_table
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=s3://my-bucket/warehouse

Loads custom committer
Custom Committer:
iceberg.tables=my_db.my_table
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=s3://my-bucket/warehouse
iceberg.committer.class=com.example.MyCustomCommitter

Testing

Test coverage includes:

  • ✅ Default committer instantiation and operation
  • ✅ Custom committer lifecycle
  • ✅ Factory error handling
  • ✅ Configuration validation
  • ✅ Complete lifecycle flows
  • ✅ Instance isolation

Files Changed

Modified:

  • kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
  • kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java

Added:

  • kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/TestCustomCommitter.java
  • kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/TestCommitterFactory.java
  • kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitter.java

Removed:

  • kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java

@kumarpritam863
Copy link
Contributor Author

@bryanck for review.

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public interface Committer {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might look like backward incompatible change but as this config option to plug committer is not there it is very highly likely that users are using the CommitterImpl by default and this is the right time to change these API's and make this sensible and in sync with the Connect Task terminologies. Also this helps us prevent the current hacks and edge cases like using isInitialized to check multiple times and at multiple places to avoid having null pointer exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant