Skip to content

Conversation

@max-datahub
Copy link
Collaborator

Add OAuth Callback Support for Kafka Sink Producer Config

Summary

This PR completes OAuth callback support for Kafka producers, enabling MSK IAM authentication and custom OAuth mechanisms for Kafka sinks. This matches the existing functionality available for Kafka sources (consumers).

Problem Statement

Customers using Kafka sinks with MSK IAM authentication were encountering:

Failed to configure the sink (datahub-kafka): expected oauth_cb property as a callable function

The oauth_cb parameter validation and resolution existed only for consumer configs, but DataHub's Kafka sink uses producer configs. This caused authentication failures when trying to use MSK IAM or custom OAuth with Kafka sinks.

Changes

1. Rename CallableConsumerConfigKafkaOAuthCallbackResolver

  • Renamed class for better clarity and generalization
  • Updated all references across codebase (kafka.py, kafka.py source, kafka_emitter.py)
  • More accurately reflects that this resolver works for both consumers and producers
  • Files: kafka_consumer_config.py, kafka.py, ingestion/source/kafka/kafka.py

2. Producer OAuth Initialization (kafka_emitter.py)

  • Added OAuth callback initialization in DatahubKafkaEmitter.__init__()
  • Calls producer.poll(0) on startup when OAuth is detected
  • Critical for OAuth mechanisms like AWS MSK IAM where authentication tokens must be obtained before the first message send
  • Without this initialization, producers fail with authentication errors
  • Code:
    if KafkaOAuthCallbackResolver.is_callable_config(self.config.connection.producer_config):
        logger.debug("OAuth callback detected, triggering OAuth callbacks for Kafka producers")
        for producer in self.producers.values():
            producer.poll(0)  # Non-blocking - just triggers OAuth callback

3. Kafka Callback Error Handling Fix (datahub_kafka.py)

  • Fixed _KafkaCallback.kafka_callback signature to match actual Kafka delivery callback
  • Changed from (err: Optional[Exception], msg: str) to (err, msg)
  • Properly handles KafkaError and Message objects from confluent-kafka
  • Converts KafkaError to Exception for consistent error handling downstream
  • Fixes NoneType: None errors in sink callbacks

4. Documentation Simplification

  • Simplified custom OAuth callback deployment guidance in kafka.md
  • Removed verbose examples, kept essential information
  • Easier to maintain and understand

Why Producer Initialization is Required

The initial poll(0) call in DatahubKafkaEmitter.__init__() is not just validation - it's required for OAuth authentication to work:

  1. When a Kafka producer is created with OAuth config, the OAuth callback hasn't been invoked yet
  2. The OAuth callback only gets triggered when poll() is called on the producer
  3. Until poll() is called, the producer doesn't have an authentication token and can't connect to the broker
  4. The initial poll(0) forces the OAuth callback to execute immediately, acquiring the auth token before any messages are sent

Why consumers don't need this: Consumers have an implicit polling loop for receiving messages, which automatically triggers the OAuth callback. Producers don't have this natural polling mechanism, so it must be explicit.

Testing

✅ Unit Tests

  • 3 new unit tests for producer OAuth callback validation
  • All existing Kafka source OAuth tests continue to pass
  • Test command: pytest tests/unit/test_kafka_sink.py -k oauth

✅ End-to-End Testing

Successfully tested File → Kafka sink with MSK IAM OAuth:

  • 8 events produced in 0.91 seconds
  • OAuth callback triggered successfully
  • No timeout or authentication errors
  • Logs confirm proper OAuth initialization and token acquisition

Test environment: Kubernetes with IRSA (IAM Roles for Service Accounts), MSK with IAM authentication

✅ Linting

  • ruff check passes
  • Pre-commit hooks pass
  • Code formatting applied

Context

This PR builds on previous work (PR #15420) that added oauth_cb validation for producer_config. Together, these changes enable complete MSK IAM authentication support for Kafka sinks, achieving parity with Kafka sources.

Configuration Example

Kafka Sink with MSK IAM (now works):

sink:
  type: "datahub-kafka"
  config:
    connection:
      bootstrap: "your-msk-cluster:9098"
      schema_registry_url: "http://your-schema-registry:8080"
      producer_config:
        security.protocol: "SASL_SSL"
        sasl.mechanism: "OAUTHBEARER"
        sasl.oauthbearer.method: "default"
        oauth_cb: "datahub_actions.utils.kafka_msk_iam:oauth_cb"

Requirement: pip install acryl-datahub-actions>=1.3.1.2

Deployment Notes

  • No breaking changes - existing configurations continue to work
  • New capability - MSK IAM and custom OAuth now work for Kafka sinks
  • Backward compatible - class rename is internal, no external API changes

Checklist

  • Unit tests added/updated
  • End-to-end testing completed
  • Documentation updated
  • Linting and formatting pass
  • No breaking changes
  • OAuth functionality validated with MSK IAM

Enable MSK IAM authentication and other OAuth mechanisms for Kafka sinks
by adding oauth_cb resolution to producer_config, matching the existing
consumer_config functionality.

Changes:
- Add @field_validator for producer_config in KafkaProducerConnectionConfig
  to resolve oauth_cb string paths to callable functions
- Add unit tests for producer oauth_cb validation and resolution
- Update documentation with producer oauth_cb examples including MSK IAM

Fixes issue where datahub-kafka sink would fail with 'expected oauth_cb
property as a callable function' when trying to use MSK IAM authentication.

The fix reuses the existing CallableConsumerConfig class, maintaining
consistency between consumer and producer authentication patterns.
…tion

- Create _resolve_kafka_oauth_callback() helper to eliminate code duplication
- Both KafkaConsumerConnectionConfig and KafkaProducerConnectionConfig now use the same helper
- Simplify unit tests to avoid duplicating OAuth validation tests already covered by Kafka source tests
- Add documentation to the helper function explaining its purpose

This refactoring improves code maintainability by having a single source of truth for OAuth callback resolution logic.
1. Refactor tests for clarity (addresses review feedback):
   - Rename test_kafka_sink_producer_config_accepts_oauth_cb → test_kafka_sink_producer_config_without_oauth_cb
   - Add test_kafka_sink_producer_config_with_oauth_cb to verify oauth_cb resolution works
   - Keep test_kafka_sink_oauth_cb_rejects_callable for edge case validation
   - Now have clear separation: backward compatibility test, positive test, negative test

2. Enhance OAuth callback deployment documentation:
   - Add explicit pip install command for acryl-datahub-actions>=1.3.1.2
   - Document built-in callbacks (MSK IAM, Azure Event Hubs) with package requirements
   - Add PYTHONPATH configuration examples for custom callbacks

Test coverage:
- ✅ Standard config without oauth_cb works (no breaking change)
- ✅ OAuth callback string path resolves to callable (new functionality)
- ✅ Direct callable objects are rejected (edge case validation)
…llback resolver

This change completes OAuth support for Kafka producers by adding proper
initialization and refactoring the OAuth callback resolver class for clarity.

## Changes:

### 1. Rename CallableConsumerConfig → KafkaOAuthCallbackResolver
- Renamed class for better clarity and generalization
- Updated all references across codebase (kafka.py, kafka.py source, etc.)
- More accurately reflects that this resolver works for both consumers and producers

### 2. Producer OAuth Initialization (kafka_emitter.py)
- Added OAuth callback initialization in DatahubKafkaEmitter.__init__()
- Calls producer.poll(0) on startup when OAuth is detected
- Critical for OAuth mechanisms like AWS MSK IAM where tokens must be obtained
  before first message send
- Without this, producers fail with authentication errors

### 3. Kafka Callback Error Handling Fix (datahub_kafka.py)
- Fixed _KafkaCallback.kafka_callback signature to match actual Kafka callback
- Changed from (err: Optional[Exception], msg: str) to (err, msg)
- Properly handles KafkaError and Message objects from confluent-kafka
- Converts KafkaError to Exception for consistent error handling downstream

### 4. Documentation Simplification
- Simplified custom OAuth callback deployment guidance
- Removed verbose examples, kept essential information
- Easier to maintain and understand

## Context:

This builds on previous work that added oauth_cb validation for producer_config.
Together, these changes enable full MSK IAM authentication support for Kafka sinks,
matching the existing functionality available for Kafka sources.

## Testing:

✅ End-to-end: File→Kafka sink with MSK IAM OAuth (8 events in 0.91s)
✅ Unit tests: OAuth producer validation tests pass
✅ Linting: ruff checks pass
✅ Existing tests: All Kafka source OAuth tests continue to pass
@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata community-contribution PR or Issue raised by member(s) of DataHub Community labels Nov 28, 2025
- Add unit tests for OAuth producer initialization in DatahubKafkaEmitter
- Test verifies poll(0) is called when OAuth is configured
- Test verifies poll(0) is NOT called without OAuth (backwards compatibility)
- Add type annotations to kafka_callback (err: Any, msg: Any) to satisfy mypy
- All linting passes (ruff check, ruff format, mypy)
- All tests pass (2 new tests added)

This closes the test coverage gap identified in the backwards compatibility review.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant