Skip to content

Commit 018bc7c

Browse files
committed
Refactor platform extraction
1 parent 4535b49 commit 018bc7c

File tree

3 files changed

+43
-66
lines changed

3 files changed

+43
-66
lines changed

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/common.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -653,9 +653,8 @@ def supports_connector_class(connector_class: str) -> bool:
653653
"""Check if this connector handles the given class. Override in subclasses."""
654654
return False
655655

656-
@staticmethod
657-
def get_platform(connector_class: str) -> str:
658-
"""Get the platform for this connector type. Override in subclasses."""
656+
def get_platform(self) -> str:
657+
"""Get the platform for this connector instance. Override in subclasses."""
659658
return "unknown"
660659

661660
def _extract_fine_grained_lineage(

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/connector_registry.py

Lines changed: 16 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ class ConnectorRegistry:
4343
def create_schema_resolver(
4444
ctx: Optional["PipelineContext"],
4545
config: KafkaConnectSourceConfig,
46-
connector_manifest: ConnectorManifest,
46+
connector: BaseConnector,
4747
) -> Optional["SchemaResolver"]:
4848
"""
4949
Create SchemaResolver for enhanced lineage extraction if enabled.
5050
5151
Args:
5252
ctx: Pipeline context (contains graph connection)
5353
config: Kafka Connect source configuration
54-
connector_manifest: Connector manifest to determine platform
54+
connector: Connector instance to get platform from
5555
5656
Returns:
5757
SchemaResolver instance if feature is enabled and graph is available, None otherwise
@@ -62,19 +62,16 @@ def create_schema_resolver(
6262
try:
6363
from datahub.sql_parsing.schema_resolver import SchemaResolver
6464

65-
# Determine platform from connector class
66-
connector_class = connector_manifest.config.get("connector.class", "")
67-
platform = ConnectorRegistry._infer_platform_from_connector(
68-
connector_class, connector_manifest
69-
)
65+
# Get platform from connector instance (single source of truth)
66+
platform = connector.get_platform()
7067

7168
# Get platform instance if configured
7269
platform_instance = get_platform_instance(
73-
config, connector_manifest.name, platform
70+
config, connector.connector_manifest.name, platform
7471
)
7572

7673
logger.info(
77-
f"Creating SchemaResolver for connector {connector_manifest.name} "
74+
f"Creating SchemaResolver for connector {connector.connector_manifest.name} "
7875
f"with platform={platform}, instance={platform_instance}"
7976
)
8077

@@ -86,46 +83,11 @@ def create_schema_resolver(
8683
)
8784
except Exception as e:
8885
logger.warning(
89-
f"Failed to create SchemaResolver for connector {connector_manifest.name}: {e}. "
86+
f"Failed to create SchemaResolver for connector {connector.connector_manifest.name}: {e}. "
9087
"Falling back to standard lineage extraction."
9188
)
9289
return None
9390

94-
@staticmethod
95-
def _infer_platform_from_connector(
96-
connector_class: str, manifest: ConnectorManifest
97-
) -> str:
98-
"""Infer the source platform from connector class."""
99-
from datahub.ingestion.source.kafka_connect.source_connectors import (
100-
DebeziumSourceConnector,
101-
)
102-
103-
# Try to get platform from Debezium source connector
104-
if (
105-
"debezium" in connector_class.lower()
106-
or connector_class in CLOUD_JDBC_SOURCE_CLASSES
107-
):
108-
return DebeziumSourceConnector._get_platform_from_connector_class(
109-
connector_class
110-
)
111-
112-
# Default platform inference
113-
connector_lower = connector_class.lower()
114-
if "postgres" in connector_lower:
115-
return "postgres"
116-
elif "mysql" in connector_lower:
117-
return "mysql"
118-
elif "sqlserver" in connector_lower or "mssql" in connector_lower:
119-
return "mssql"
120-
elif "oracle" in connector_lower:
121-
return "oracle"
122-
elif "mongodb" in connector_lower or "mongo" in connector_lower:
123-
return "mongodb"
124-
elif "snowflake" in connector_lower:
125-
return "snowflake"
126-
else:
127-
return "unknown"
128-
12991
@staticmethod
13092
def get_connector_for_manifest(
13193
manifest: ConnectorManifest,
@@ -147,12 +109,7 @@ def get_connector_for_manifest(
147109
"""
148110
connector_class_value = manifest.config.get("connector.class", "")
149111

150-
# Create schema resolver if enabled
151-
schema_resolver = ConnectorRegistry.create_schema_resolver(
152-
ctx, config, manifest
153-
)
154-
155-
# Determine connector type based on manifest type
112+
# Create connector instance first
156113
if manifest.type == SOURCE:
157114
connector = ConnectorRegistry._get_source_connector(
158115
connector_class_value, manifest, config, report
@@ -164,9 +121,13 @@ def get_connector_for_manifest(
164121
else:
165122
return None
166123

167-
# Attach schema resolver to connector if created
168-
if connector and schema_resolver:
169-
connector.schema_resolver = schema_resolver
124+
# Create and attach schema resolver using connector's platform
125+
if connector:
126+
schema_resolver = ConnectorRegistry.create_schema_resolver(
127+
ctx, config, connector
128+
)
129+
if schema_resolver:
130+
connector.schema_resolver = schema_resolver
170131

171132
return connector
172133

@@ -347,7 +308,6 @@ def supports_connector_class(connector_class: str) -> bool:
347308
"""Generic connector supports any unknown class."""
348309
return True
349310

350-
@staticmethod
351-
def get_platform(connector_class: str) -> str:
311+
def get_platform(self) -> str:
352312
"""Generic connectors have configurable platforms."""
353313
return "unknown"

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/source_connectors.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,23 @@ def _find_topics_by_advanced_prefix_patterns(
15021502

15031503
return matching_topics
15041504

1505+
def get_platform(self) -> str:
1506+
"""
1507+
Get platform for JDBC connector.
1508+
1509+
JDBC connectors can connect to multiple databases, so platform is inferred from
1510+
the connection URL in the connector configuration.
1511+
"""
1512+
try:
1513+
parser = self.get_parser(self.connector_manifest)
1514+
return parser.source_platform
1515+
except Exception:
1516+
# If parser fails, try to infer from JDBC URL directly
1517+
jdbc_url = self.connector_manifest.config.get("connection.url", "")
1518+
if jdbc_url:
1519+
return self._extract_platform_from_jdbc_url(jdbc_url)
1520+
return "unknown"
1521+
15051522

15061523
@dataclass
15071524
class SnowflakeSourceConnector(BaseConnector):
@@ -1931,8 +1948,7 @@ def supports_connector_class(connector_class: str) -> bool:
19311948
"""Check if this connector handles Snowflake Source."""
19321949
return connector_class == SNOWFLAKE_SOURCE_CLOUD
19331950

1934-
@staticmethod
1935-
def get_platform(connector_class: str) -> str:
1951+
def get_platform(self) -> str:
19361952
"""Get the platform for Snowflake Source connector."""
19371953
return "snowflake"
19381954

@@ -1998,6 +2014,10 @@ def extract_lineages(self) -> List[KafkaConnectLineage]:
19982014
lineages.append(lineage)
19992015
return lineages
20002016

2017+
def get_platform(self) -> str:
2018+
"""Get the platform for Mongo Source connector."""
2019+
return "mongodb"
2020+
20012021

20022022
@dataclass
20032023
class DebeziumSourceConnector(BaseConnector):
@@ -2036,10 +2056,8 @@ def get_parser(
20362056
self,
20372057
connector_manifest: ConnectorManifest,
20382058
) -> DebeziumParser:
2039-
connector_class = connector_manifest.config.get(CONNECTOR_CLASS, "")
2040-
20412059
# Map connector class to platform
2042-
platform = self._get_platform_from_connector_class(connector_class)
2060+
platform = self.get_platform()
20432061

20442062
# Map handler platform to parser platform (handler uses "sqlserver", parser expects "mssql")
20452063
parser_platform = "mssql" if platform == "sqlserver" else platform
@@ -2108,9 +2126,9 @@ def _get_database_name_for_platform(
21082126
# postgres, oracle, db2 use database.dbname
21092127
return config.get("database.dbname")
21102128

2111-
@staticmethod
2112-
def _get_platform_from_connector_class(connector_class: str) -> str:
2129+
def get_platform(self) -> str:
21132130
"""Map Debezium connector class to platform name."""
2131+
connector_class = self.connector_manifest.config.get(CONNECTOR_CLASS, "")
21142132
# Map based on well-known Debezium connector classes
21152133
if "mysql" in connector_class.lower():
21162134
return "mysql"

0 commit comments

Comments
 (0)