Skip to content

Commit 45b970e

Browse files
author
Abdullah Tariq
committed
feat(dbt): add source_pattern filtering for database and schema patterns
1 parent 8f53dd9 commit 45b970e

File tree

7 files changed

+2910
-7995
lines changed

7 files changed

+2910
-7995
lines changed

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,23 @@ def can_emit_model_performance(self) -> bool:
246246
return self.model_performance == EmitDirective.YES
247247

248248

249+
class SourcePatternConfig(ConfigModel):
250+
"""Configuration for filtering materialized nodes based on their physical location"""
251+
252+
database_pattern: AllowDenyPattern = Field(
253+
default=AllowDenyPattern.allow_all(),
254+
description="Regex patterns for database names to filter materialized nodes.",
255+
)
256+
schema_pattern: AllowDenyPattern = Field(
257+
default=AllowDenyPattern.allow_all(),
258+
description="Regex patterns for schema names in format '{database}.{schema}' to filter materialized nodes.",
259+
)
260+
table_pattern: AllowDenyPattern = Field(
261+
default=AllowDenyPattern.allow_all(),
262+
description="Regex patterns for table names in format '{database}.{schema}.{table}' to filter materialized nodes.",
263+
)
264+
265+
249266
class DBTCommonConfig(
250267
StatefulIngestionConfigBase,
251268
PlatformInstanceConfigMixin,
@@ -294,14 +311,10 @@ class DBTCommonConfig(
294311
default=AllowDenyPattern.allow_all(),
295312
description="regex patterns for dbt model names to filter in ingestion.",
296313
)
297-
database_pattern: AllowDenyPattern = Field(
298-
default=AllowDenyPattern.allow_all(),
299-
description="Regex patterns for database (i.e. project_id) to filter in ingestion.",
300-
)
301-
schema_pattern: AllowDenyPattern = Field(
302-
default=AllowDenyPattern.allow_all(),
303-
description="Regex patterns for schema to filter in ingestion. Specify regex to only match the schema name. "
304-
"e.g. to match all tables in schema analytics, use the regex 'analytics'",
314+
source_pattern: SourcePatternConfig = Field(
315+
default=SourcePatternConfig(),
316+
description="Advanced filtering for materialized nodes based on their physical database location. "
317+
"Provides fine-grained control over database.schema.table patterns for catalog consistency.",
305318
)
306319
meta_mapping: Dict = Field(
307320
default={},
@@ -1028,13 +1041,53 @@ def get_workunits_internal(
10281041
)
10291042

10301043
def _is_allowed_node(self, node: DBTNode) -> bool:
1031-
return (
1032-
self.config.node_name_pattern.allowed(node.dbt_name)
1033-
and (not node.schema or self.config.schema_pattern.allowed(node.schema))
1034-
and (
1035-
not node.database or self.config.database_pattern.allowed(node.database)
1036-
)
1037-
)
1044+
# First: Internal dbt reference filtering
1045+
if not self.config.node_name_pattern.allowed(node.dbt_name):
1046+
return False
1047+
1048+
# Second: Materialized location filtering (for catalog consistency)
1049+
if not self._is_allowed_materialized_node(node):
1050+
return False
1051+
1052+
return True
1053+
1054+
def _is_allowed_materialized_node(self, node: DBTNode) -> bool:
1055+
"""Filter nodes based on their materialized database location for catalog consistency"""
1056+
1057+
# Skip if using default patterns (no filtering intended)
1058+
if (
1059+
self.config.source_pattern.database_pattern == AllowDenyPattern.allow_all()
1060+
and self.config.source_pattern.schema_pattern
1061+
== AllowDenyPattern.allow_all()
1062+
and self.config.source_pattern.table_pattern == AllowDenyPattern.allow_all()
1063+
):
1064+
return True
1065+
1066+
# Database level filtering
1067+
if node.database and not self.config.source_pattern.database_pattern.allowed(
1068+
node.database
1069+
):
1070+
return False
1071+
1072+
# Schema level filtering: {database}.{schema}
1073+
db_fqn = node._join_parts([node.database, node.schema])
1074+
if (
1075+
node.database
1076+
and node.schema
1077+
and not self.config.source_pattern.schema_pattern.allowed(db_fqn)
1078+
):
1079+
return False
1080+
1081+
# Table level filtering: {database}.{schema}.{table}
1082+
if (
1083+
node.database
1084+
and node.schema
1085+
and node.name
1086+
and not self.config.source_pattern.table_pattern.allowed(node.get_db_fqn())
1087+
):
1088+
return False
1089+
1090+
return True
10381091

10391092
def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
10401093
nodes: List[DBTNode] = []

0 commit comments

Comments
 (0)