Skip to content

Conversation

@treff7es
Copy link
Contributor

@treff7es treff7es commented Nov 7, 2025

Summary

This PR adds automatic lineage inference from DataHub to the Kafka Connect source connector. Instead of relying solely on connector manifests, the ingestion can now query DataHub's metadata graph to resolve schemas and generate both table-level and column-level lineage.

Motivation

Currently, Kafka Connect lineage extraction is limited by what's explicitly declared in connector configurations. This PR enables:

  1. Wildcard pattern expansion: Connectors configured with patterns like table.include.list: "database.*" can now be resolved to actual table names by querying DataHub
  2. Column-level lineage: Generate fine-grained lineage showing which source columns map to Kafka topic fields
  3. Schema-aware ingestion: Leverage existing metadata in DataHub to enrich Kafka Connect lineage without requiring external database connections

Changes

New Configuration Options

Added three new configuration fields to KafkaConnectSourceConfig:

source:
  type: kafka-connect
  config:
    # Enable DataHub schema resolution (default: false for backward compatibility)
    use_schema_resolver: true
    
    # Expand wildcard patterns to concrete table names (default: true)
    schema_resolver_expand_patterns: true
    
    # Generate column-level lineage (default: true)
    schema_resolver_finegrained_lineage: true

Core Components

  1. SchemaResolver Integration (connector_registry.py):

    • New create_schema_resolver() method to instantiate schema resolvers with platform-specific configurations
    • Automatically attaches resolvers to connector instances during creation
    • Passes pipeline context through the instantiation chain
  2. Fine-Grained Lineage Extraction (common.py):

    • New _extract_fine_grained_lineage() method in BaseConnector
    • Assumes 1:1 column mapping between source tables and Kafka topics (typical for CDC connectors)
    • Generates FineGrainedLineageClass instances for column-level lineage
  3. Snowflake Source Connector (source_connectors.py):

    • New connector for Confluent Cloud Snowflake Source
    • Pattern expansion support (e.g., ANALYTICS.PUBLIC.* → actual tables)
    • Efficient caching to avoid redundant DataHub queries
    • Proper topic naming following Snowflake connector conventions
  4. Platform Support (source_connectors.py):

    • Platform-specific schema resolution for PostgreSQL, MySQL, Snowflake, MongoDB, etc.
    • Handles platform URN generation and schema field mapping
    • Integration with existing JDBC and Debezium connectors

Code Quality Improvements

  • Removed redundant connector instantiation in _derive_topics_from_config()
  • Cleaned up unused legacy methods (extract_lineages(), extract_flow_property_bag())
  • Improved null safety with comprehensive checks before schema resolver access
  • Enhanced error handling with detailed logging and graceful fallbacks

Usage

Before (pattern-based config):

source:
  type: kafka-connect
  config:
    connect_uri: "http://localhost:8083"
    # Connector with wildcard pattern
    # Lineage only shows pattern, not actual tables

After (schema-aware):

source:
  type: kafka-connect
  config:
    connect_uri: "http://localhost:8083"
    use_schema_resolver: true
    schema_resolver_expand_patterns: true
    schema_resolver_finegrained_lineage: true
    # Now resolves actual tables and generates column lineage

Testing

  • 138 tests passing (117 existing + 21 new)
  • ✅ All linting checks pass (ruff)
  • ✅ All type checks pass (mypy)
  • ✅ Test categories:
    • Transform pipeline tests (9)
    • Connector implementation tests (30+)
    • Cloud connector tests (12)
    • Error handling tests (4)
    • Schema resolver integration tests (21)

New test coverage:

  • Pattern expansion with wildcards
  • Fine-grained lineage generation
  • Snowflake Source connector
  • Schema resolver integration
  • Error handling and fallback scenarios

Breaking Changes

None - All features are opt-in via configuration flags. Existing Kafka Connect ingestions continue to work unchanged.

Default behavior (backward compatible):

  • use_schema_resolver: false (disabled by default)
  • No schema resolution without explicit configuration
  • No changes to existing connector behavior

Documentation

All configuration options include comprehensive descriptions:

  • use_schema_resolver: Master switch for the feature
  • schema_resolver_expand_patterns: Controls pattern expansion behavior
  • schema_resolver_finegrained_lineage: Controls column-level lineage generation

🤖 Generated with Claude Code

Co-Authored-By: Claude [email protected]

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Nov 7, 2025
@codecov
Copy link

codecov bot commented Nov 7, 2025

❌ 2 Tests Failed:

Tests completed Failed Passed Skipped
4767 2 4765 31
View the top 2 failed test(s) by shortest run time
tests.integration.hex.test_hex::test_hex_ingestion_with_lineage
Stack Traces | 0.001s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f608e6aa160>
test_resources_dir = PosixPath('.../tests/integration/hex')

    @pytest.fixture(scope="module")
    def hex_mock_api_runner(docker_compose_runner, test_resources_dir):
        docker_dir = test_resources_dir / "docker"
    
        # Start Docker Compose
>       with docker_compose_runner(
            docker_dir / "docker-compose.yml", "hex-mock", parallel=1
        ) as docker_services:

.../integration/hex/test_hex.py:34: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.13....../x64/lib/python3.11/contextlib.py:137: in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
.../datahub/testing/docker_utils.py:65: in run
    with pytest_docker.plugin.get_docker_services(
.../hostedtoolcache/Python/3.11.13....../x64/lib/python3.11/contextlib.py:137: in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
venv/lib/python3.11........./site-packages/pytest_docker/plugin.py:212: in get_docker_services
    docker_compose.execute(command)
venv/lib/python3.11........./site-packages/pytest_docker/plugin.py:140: in execute
    return execute(command, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

command = 'docker compose --parallel 1 -f ".../tests/integration/hex/docker/docker-compose.yml" -p "pytest6206-hex-mock" up --build --wait'
success_codes = (0,), ignore_stderr = False

    def execute(command: str, success_codes: Iterable[int] = (0,), ignore_stderr: bool = False) -> Union[bytes, Any]:
        """Run a shell command."""
        try:
            stderr_pipe = subprocess.DEVNULL if ignore_stderr else subprocess.STDOUT
            output = subprocess.check_output(command, stderr=stderr_pipe, shell=True)
            status = 0
        except subprocess.CalledProcessError as error:
            output = error.output or b""
            status = error.returncode
            command = error.cmd
    
        if status not in success_codes:
>           raise Exception(
                'Command {} returned {}: """{}""".'.format(command, status, output.decode("utf-8"))
            )
E           Exception: Command docker compose --parallel 1 -f ".../tests/integration/hex/docker/docker-compose.yml" -p "pytest6206-hex-mock" up --build --wait returned 1: """time="2025-11-14T13:10:27Z" level=warning msg=".../tests/integration/hex/docker/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion"
E            datahub-mock-api Pulling 
E            2d35ebdb57d9 Pulling fs layer 
E            c9008197b8dd Pulling fs layer 
E            27711589a568 Pulling fs layer 
E            9806928fa0da Pulling fs layer 
E            9806928fa0da Waiting 
E            2d35ebdb57d9 Downloading [>                                                  ]  50.67kB/3.802MB
E            c9008197b8dd Downloading [=>                                                 ]  16.38kB/456.9kB
E            c9008197b8dd Downloading [==================================================>]  456.9kB/456.9kB
E            c9008197b8dd Verifying Checksum 
E            c9008197b8dd Download complete 
E            2d35ebdb57d9 Download complete 
E            2d35ebdb57d9 Extracting [>                                                  ]  65.54kB/3.802MB
E            9806928fa0da Downloading [==================================================>]     247B/247B
E            9806928fa0da Verifying Checksum 
E            9806928fa0da Download complete 
E            2d35ebdb57d9 Extracting [==================================================>]  3.802MB/3.802MB
E            2d35ebdb57d9 Pull complete 
E            c9008197b8dd Extracting [===>                                               ]  32.77kB/456.9kB
E            27711589a568 Downloading [>                                                  ]  155.1kB/15.4MB
E            c9008197b8dd Extracting [==================================================>]  456.9kB/456.9kB
E            27711589a568 Verifying Checksum 
E            27711589a568 Download complete 
E            c9008197b8dd Extracting [==================================================>]  456.9kB/456.9kB
E            c9008197b8dd Pull complete 
E            27711589a568 Extracting [>                                                  ]  163.8kB/15.4MB
E            27711589a568 Extracting [==============>                                    ]  4.588MB/15.4MB
E            27711589a568 Extracting [=====================================>             ]  11.47MB/15.4MB
E            27711589a568 Extracting [=============================================>     ]  14.09MB/15.4MB
E            27711589a568 Extracting [==================================================>]   15.4MB/15.4MB
E            27711589a568 Pull complete 
E            9806928fa0da Extracting [==================================================>]     247B/247B
E            9806928fa0da Extracting [==================================================>]     247B/247B
E            9806928fa0da Pull complete 
E            datahub-mock-api Pulled 
E            hex-mock-api Pulling 
E            hex-mock-api Pulled 
E            Network pytest6206-hex-mock_default  Creating
E            Network pytest6206-hex-mock_default  Created
E            Container hex-mock-api  Creating
E            Container datahub-mock-api  Creating
E            Container hex-mock-api  Created
E            Container datahub-mock-api  Created
E            Container datahub-mock-api  Starting
E            Container hex-mock-api  Starting
E            Container hex-mock-api  Started
E            Container datahub-mock-api  Started
E            Container datahub-mock-api  Waiting
E            Container hex-mock-api  Waiting
E           container datahub-mock-api is unhealthy
E           """.

venv/lib/python3.11........./site-packages/pytest_docker/plugin.py:37: Exception
tests.integration.hex.test_hex::test_hex_ingestion
Stack Traces | 16.9s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f608e6aa160>
test_resources_dir = PosixPath('.../tests/integration/hex')

    @pytest.fixture(scope="module")
    def hex_mock_api_runner(docker_compose_runner, test_resources_dir):
        docker_dir = test_resources_dir / "docker"
    
        # Start Docker Compose
>       with docker_compose_runner(
            docker_dir / "docker-compose.yml", "hex-mock", parallel=1
        ) as docker_services:

.../integration/hex/test_hex.py:34: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.13....../x64/lib/python3.11/contextlib.py:137: in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
.../datahub/testing/docker_utils.py:65: in run
    with pytest_docker.plugin.get_docker_services(
.../hostedtoolcache/Python/3.11.13....../x64/lib/python3.11/contextlib.py:137: in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
venv/lib/python3.11........./site-packages/pytest_docker/plugin.py:212: in get_docker_services
    docker_compose.execute(command)
venv/lib/python3.11........./site-packages/pytest_docker/plugin.py:140: in execute
    return execute(command, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

command = 'docker compose --parallel 1 -f ".../tests/integration/hex/docker/docker-compose.yml" -p "pytest6206-hex-mock" up --build --wait'
success_codes = (0,), ignore_stderr = False

    def execute(command: str, success_codes: Iterable[int] = (0,), ignore_stderr: bool = False) -> Union[bytes, Any]:
        """Run a shell command."""
        try:
            stderr_pipe = subprocess.DEVNULL if ignore_stderr else subprocess.STDOUT
            output = subprocess.check_output(command, stderr=stderr_pipe, shell=True)
            status = 0
        except subprocess.CalledProcessError as error:
            output = error.output or b""
            status = error.returncode
            command = error.cmd
    
        if status not in success_codes:
>           raise Exception(
                'Command {} returned {}: """{}""".'.format(command, status, output.decode("utf-8"))
            )
E           Exception: Command docker compose --parallel 1 -f ".../tests/integration/hex/docker/docker-compose.yml" -p "pytest6206-hex-mock" up --build --wait returned 1: """time="2025-11-14T13:10:27Z" level=warning msg=".../tests/integration/hex/docker/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion"
E            datahub-mock-api Pulling 
E            2d35ebdb57d9 Pulling fs layer 
E            c9008197b8dd Pulling fs layer 
E            27711589a568 Pulling fs layer 
E            9806928fa0da Pulling fs layer 
E            9806928fa0da Waiting 
E            2d35ebdb57d9 Downloading [>                                                  ]  50.67kB/3.802MB
E            c9008197b8dd Downloading [=>                                                 ]  16.38kB/456.9kB
E            c9008197b8dd Downloading [==================================================>]  456.9kB/456.9kB
E            c9008197b8dd Verifying Checksum 
E            c9008197b8dd Download complete 
E            2d35ebdb57d9 Download complete 
E            2d35ebdb57d9 Extracting [>                                                  ]  65.54kB/3.802MB
E            9806928fa0da Downloading [==================================================>]     247B/247B
E            9806928fa0da Verifying Checksum 
E            9806928fa0da Download complete 
E            2d35ebdb57d9 Extracting [==================================================>]  3.802MB/3.802MB
E            2d35ebdb57d9 Pull complete 
E            c9008197b8dd Extracting [===>                                               ]  32.77kB/456.9kB
E            27711589a568 Downloading [>                                                  ]  155.1kB/15.4MB
E            c9008197b8dd Extracting [==================================================>]  456.9kB/456.9kB
E            27711589a568 Verifying Checksum 
E            27711589a568 Download complete 
E            c9008197b8dd Extracting [==================================================>]  456.9kB/456.9kB
E            c9008197b8dd Pull complete 
E            27711589a568 Extracting [>                                                  ]  163.8kB/15.4MB
E            27711589a568 Extracting [==============>                                    ]  4.588MB/15.4MB
E            27711589a568 Extracting [=====================================>             ]  11.47MB/15.4MB
E            27711589a568 Extracting [=============================================>     ]  14.09MB/15.4MB
E            27711589a568 Extracting [==================================================>]   15.4MB/15.4MB
E            27711589a568 Pull complete 
E            9806928fa0da Extracting [==================================================>]     247B/247B
E            9806928fa0da Extracting [==================================================>]     247B/247B
E            9806928fa0da Pull complete 
E            datahub-mock-api Pulled 
E            hex-mock-api Pulling 
E            hex-mock-api Pulled 
E            Network pytest6206-hex-mock_default  Creating
E            Network pytest6206-hex-mock_default  Created
E            Container hex-mock-api  Creating
E            Container datahub-mock-api  Creating
E            Container hex-mock-api  Created
E            Container datahub-mock-api  Created
E            Container datahub-mock-api  Starting
E            Container hex-mock-api  Starting
E            Container hex-mock-api  Started
E            Container datahub-mock-api  Started
E            Container datahub-mock-api  Waiting
E            Container hex-mock-api  Waiting
E           container datahub-mock-api is unhealthy
E           """.

venv/lib/python3.11........./site-packages/pytest_docker/plugin.py:37: Exception

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@treff7es treff7es changed the title Kafka connect datahub infer lineage feat(ingest/kafka-connect): Kafka connect infer lineage from DataHub Nov 10, 2025
@treff7es treff7es marked this pull request as ready for review November 10, 2025 09:13
@treff7es treff7es force-pushed the kafka_connect_cloud_improvements branch from fa085cd to 2ede18b Compare November 10, 2025 09:14
@treff7es treff7es force-pushed the kafak_connect_datahub_infer_lineage branch from bf95de8 to 724d682 Compare November 10, 2025 09:17
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Nov 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants