Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 371 additions & 0 deletions metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,377 @@ source:

**Note**: When `use_connect_topics_api` is `false`, topic information will not be extracted, which may impact lineage accuracy but improves performance and works in air-gapped environments.

### Enhanced Topic Resolution for Source and Sink Connectors

DataHub now provides intelligent topic resolution that works reliably across all environments, including Confluent Cloud where the Kafka Connect topics API is unavailable.

#### How It Works

**Source Connectors** (Debezium, Snowflake CDC, JDBC):

- Always derive expected topics from connector configuration (`table.include.list`, `database.include.list`)
- Apply configured transforms (RegexRouter, EventRouter, etc.) to predict final topic names
- When Kafka API is available: Filter to only topics that exist in Kafka
- When Kafka API is unavailable (Confluent Cloud): Create lineages for all configured tables without filtering

**Sink Connectors** (S3, Snowflake, BigQuery, JDBC):

- Support both explicit topic lists (`topics` field) and regex patterns (`topics.regex` field)
- When `topics.regex` is used:
- Priority 1: Match against `manifest.topic_names` from Kafka API (if available)
- Priority 2: Query DataHub for Kafka topics and match pattern (if `use_schema_resolver` enabled)
- Priority 3: Warn user that pattern cannot be expanded

#### Configuration Examples

**Source Connector with Pattern Expansion:**

```yml
# Debezium PostgreSQL source with wildcard tables
connector.config:
table.include.list: "public.analytics_.*"
# When Kafka API unavailable, DataHub will:
# 1. Query DataHub for all PostgreSQL tables matching pattern
# 2. Derive expected topic names (server.schema.table format)
# 3. Apply transforms if configured
# 4. Create lineages without Kafka validation
```

**Sink Connector with topics.regex (Confluent Cloud):**

```yml
# S3 sink connector consuming from pattern-matched topics
connector.config:
topics.regex: "analytics\\..*" # Match topics like analytics.users, analytics.orders
# When Kafka API unavailable, DataHub will:
# 1. Query DataHub for all Kafka topics (requires use_schema_resolver: true)
# 2. Match topics against the regex pattern
# 3. Create lineages for matched topics
```

**Enable DataHub Topic Querying for Sink Connectors:**

```yml
source:
type: kafka-connect
config:
connect_uri: "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-abc456"
username: "your-connect-api-key"
password: "your-connect-api-secret"

# Enable DataHub schema resolver for topic pattern expansion
use_schema_resolver: true # Required for topics.regex fallback

# Configure graph connection for DataHub queries
datahub_gms_url: "http://localhost:8080" # Your DataHub GMS endpoint
```

#### Key Benefits

1. **Confluent Cloud Support**: Both source and sink connectors work correctly with pattern-based configurations
2. **Config as Source of Truth**: Source connectors always derive topics from configuration, not from querying all tables in DataHub
3. **Smart Fallback**: Sink connectors can query DataHub for Kafka topics when Kafka API is unavailable
4. **Pattern Expansion**: Wildcards in `table.include.list` and `topics.regex` are properly expanded
5. **Transform Support**: All transforms (RegexRouter, EventRouter, etc.) are applied correctly

#### When DataHub Topic Querying is Used

DataHub will query for topics in these scenarios:

**Source Connectors:**

- When expanding wildcard patterns in `table.include.list` (e.g., `ANALYTICS.PUBLIC.*`)
- Queries source platform (PostgreSQL, MySQL, etc.) for tables matching the pattern

**Sink Connectors:**

- When `topics.regex` is used AND Kafka API is unavailable (Confluent Cloud)
- Queries DataHub's Kafka platform for topics matching the regex pattern
- Requires `use_schema_resolver: true` in configuration

**Important Notes:**

- DataHub never queries "all tables" to create lineages - config is always the source of truth
- Source connectors query source platforms (databases) to expand table patterns
- Sink connectors query Kafka platform to expand topic regex patterns
- Both require appropriate DataHub credentials and connectivity

### Using DataHub Schema Resolver for Pattern Expansion and Column-Level Lineage

The Kafka Connect source can query DataHub for schema information to provide two capabilities:

1. **Pattern Expansion** - Converts wildcard patterns like `database.*` into actual table names by querying DataHub
2. **Column-Level Lineage** - Generates field-level lineage by matching schemas between source tables and Kafka topics

Both features require existing metadata in DataHub from your database and Kafka schema registry ingestion.

#### Configuration Overview

```yml
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"

# Enable DataHub schema querying
use_schema_resolver: true

# Control which features to use (both default to true when schema resolver enabled)
schema_resolver_expand_patterns: true # Expand wildcard patterns
schema_resolver_finegrained_lineage: true # Generate column-level lineage

# DataHub connection (required when use_schema_resolver=true)
datahub_api:
server: "http://localhost:8080"
token: "your-datahub-token" # Optional
```

#### Pattern Expansion

Converts wildcard patterns in connector configurations into actual table names by querying DataHub.

**Example: MySQL Source with Wildcards**

```yml
# Connector config contains pattern
connector.config:
table.include.list: "analytics.user_*" # Pattern: matches user_events, user_profiles, etc.

# DataHub config
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true
# Result: DataHub queries for MySQL tables matching "analytics.user_*"
# Finds: user_events, user_profiles, user_sessions
# Creates lineage:
# mysql.analytics.user_events -> kafka.server.analytics.user_events
# mysql.analytics.user_profiles -> kafka.server.analytics.user_profiles
# mysql.analytics.user_sessions -> kafka.server.analytics.user_sessions
```

**When to use:**

- Connector configs have wildcard patterns (`database.*`, `schema.table_*`)
- You want accurate lineage without manually listing every table
- Source metadata exists in DataHub from database ingestion

**When to skip:**

- Connector configs use explicit table lists (no patterns)
- Source metadata not yet in DataHub
- Want faster ingestion without DataHub API calls

**Configuration:**

```yml
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true # Enable pattern expansion


# If you only want column-level lineage but NOT pattern expansion:
# schema_resolver_expand_patterns: false
```

**Behavior without schema resolver:**
Patterns are treated as literal table names, resulting in potentially incorrect lineage.

#### Column-Level Lineage

Generates field-level lineage by matching column names between source tables and Kafka topics.

**Example: PostgreSQL to Kafka CDC**

```yml
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_finegrained_lineage: true
# Source table schema in DataHub:
# postgres.public.users: [user_id, email, created_at, updated_at]

# Kafka topic schema in DataHub:
# kafka.server.public.users: [user_id, email, created_at, updated_at]

# Result: Column-level lineage created:
# postgres.public.users.user_id -> kafka.server.public.users.user_id
# postgres.public.users.email -> kafka.server.public.users.email
# postgres.public.users.created_at -> kafka.server.public.users.created_at
# postgres.public.users.updated_at -> kafka.server.public.users.updated_at
```

**Requirements:**

- Source table schema exists in DataHub (from database ingestion)
- Kafka topic schema exists in DataHub (from schema registry or Kafka ingestion)
- Column names match between source and target (case-insensitive matching)

**Benefits:**

- **Impact Analysis**: See which fields are affected by schema changes
- **Data Tracing**: Track specific data elements through pipelines
- **Schema Understanding**: Visualize how data flows at the field level

**ReplaceField Transform Support:**

Column-level lineage respects ReplaceField transforms that filter or rename columns:

```yml
# Connector excludes specific fields
connector.config:
transforms: "removeFields"
transforms.removeFields.type: "org.apache.kafka.connect.transforms.ReplaceField$Value"
transforms.removeFields.exclude: "internal_id,temp_column"
# DataHub behavior:
# Source schema: [user_id, email, internal_id, temp_column]
# After transform: [user_id, email]
# Column lineage created only for: user_id, email
```

**Configuration:**

```yml
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_finegrained_lineage: true # Enable column-level lineage


# If you only want pattern expansion but NOT column-level lineage:
# schema_resolver_finegrained_lineage: false
```

**Behavior without schema resolver:**
Only dataset-level lineage is created (e.g., `postgres.users -> kafka.users`), without field-level detail.

#### Complete Configuration Example

```yml
source:
type: kafka-connect
config:
# Kafka Connect cluster
connect_uri: "http://localhost:8083"
cluster_name: "production-connect"

# Enable schema resolver features
use_schema_resolver: true
schema_resolver_expand_patterns: true # Expand wildcard patterns
schema_resolver_finegrained_lineage: true # Generate column-level lineage

# DataHub connection
datahub_api:
server: "http://datahub.company.com"
token: "${DATAHUB_TOKEN}"

# Platform instances (if using multiple)
platform_instance_map:
postgres: "prod-postgres"
kafka: "prod-kafka"
```

#### Performance Impact

**API Calls per Connector:**

- Pattern expansion: 1 GraphQL query per unique wildcard pattern
- Column-level lineage: 2 GraphQL queries (source schema + target schema)
- Results cached for ingestion run duration

**Optimization:**

```yml
# Minimal configuration - no schema resolver
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
# use_schema_resolver: false # Default - no DataHub queries

# Pattern expansion only
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true
schema_resolver_finegrained_lineage: false # Skip column lineage for faster ingestion

# Column lineage only
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: false # Skip pattern expansion
schema_resolver_finegrained_lineage: true
```

**Best Practice:**
Run database and Kafka schema ingestion before Kafka Connect ingestion to pre-populate DataHub with schema metadata.

#### Troubleshooting

**"Pattern expansion found no matches for: analytics.\*"**

Causes:

- Source database metadata not in DataHub
- Pattern syntax doesn't match DataHub dataset names
- Platform instance mismatch

Solutions:

1. Run database ingestion first to populate DataHub
2. Verify pattern matches table naming in source system
3. Check `platform_instance_map` matches database ingestion config
4. Use explicit table list to bypass pattern expansion temporarily

**"SchemaResolver not available: DataHub graph connection is not available"**

Causes:

- Missing `datahub_api` configuration
- DataHub GMS not accessible

Solutions:

```yml
source:
type: kafka-connect
config:
use_schema_resolver: true
datahub_api:
server: "http://localhost:8080" # Add DataHub GMS URL
token: "your-token" # Add if authentication enabled
```

**Column-level lineage not appearing**

Check:

1. Source table schema exists: Search for table in DataHub UI
2. Kafka topic schema exists: Search for topic in DataHub UI
3. Column names match (case differences are handled automatically)
4. Check ingestion logs for warnings about missing schemas

**Slow ingestion with schema resolver enabled**

Profile:

- Check logs for "Schema resolver cache hits: X, misses: Y"
- High misses indicate missing metadata in DataHub

Temporarily disable to compare:

```yml
use_schema_resolver: false
```

### Working with Platform Instances

If you've multiple instances of kafka OR source/sink systems that are referred in your `kafka-connect` setup, you'd need to configure platform instance for these systems in `kafka-connect` recipe to generate correct lineage edges. You must have already set `platform_instance` in recipes of original source/sink systems. Refer the document [Working with Platform Instances](https://docs.datahub.com/docs/platform-instances) to understand more about this.
Expand Down
Loading
Loading