Skip to content

Commit 1fd9c81

Browse files
abdullahtariqqAbdullah Tariqskrydal
authored andcommitted
feat(dbt): add filtering for materialized nodes based on their physical location (#14689)
Co-authored-by: Abdullah Tariq <[email protected]> Co-authored-by: skrydal <[email protected]>
1 parent 48be21c commit 1fd9c81

File tree

6 files changed

+7230
-5
lines changed

6 files changed

+7230
-5
lines changed

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,23 @@ def can_emit_model_performance(self) -> bool:
246246
return self.model_performance == EmitDirective.YES
247247

248248

249+
class MaterializedNodePatternConfig(ConfigModel):
250+
"""Configuration for filtering materialized nodes based on their physical location"""
251+
252+
database_pattern: AllowDenyPattern = Field(
253+
default=AllowDenyPattern.allow_all(),
254+
description="Regex patterns for database names to filter materialized nodes.",
255+
)
256+
schema_pattern: AllowDenyPattern = Field(
257+
default=AllowDenyPattern.allow_all(),
258+
description="Regex patterns for schema names in format '{database}.{schema}' to filter materialized nodes.",
259+
)
260+
table_pattern: AllowDenyPattern = Field(
261+
default=AllowDenyPattern.allow_all(),
262+
description="Regex patterns for table/view names in format '{database}.{schema}.{table}' to filter materialized nodes.",
263+
)
264+
265+
249266
class DBTCommonConfig(
250267
StatefulIngestionConfigBase,
251268
PlatformInstanceConfigMixin,
@@ -294,6 +311,11 @@ class DBTCommonConfig(
294311
default=AllowDenyPattern.allow_all(),
295312
description="regex patterns for dbt model names to filter in ingestion.",
296313
)
314+
materialized_node_pattern: MaterializedNodePatternConfig = Field(
315+
default=MaterializedNodePatternConfig(),
316+
description="Advanced filtering for materialized nodes based on their physical database location. "
317+
"Provides fine-grained control over database.schema.table patterns for catalog consistency.",
318+
)
297319
meta_mapping: Dict = Field(
298320
default={},
299321
description="mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings.",
@@ -1018,15 +1040,53 @@ def get_workunits_internal(
10181040
all_nodes_map,
10191041
)
10201042

1021-
def _is_allowed_node(self, key: str) -> bool:
1022-
return self.config.node_name_pattern.allowed(key)
1043+
def _is_allowed_node(self, node: DBTNode) -> bool:
1044+
"""
1045+
Check whether a node should be processed, using multi-layer rules. Checks for materialized nodes might need to be restricted in the future to some cases
1046+
"""
1047+
if not self.config.node_name_pattern.allowed(node.dbt_name):
1048+
return False
1049+
1050+
if not self._is_allowed_materialized_node(node):
1051+
return False
1052+
1053+
return True
1054+
1055+
def _is_allowed_materialized_node(self, node: DBTNode) -> bool:
1056+
"""Filter nodes based on their materialized database location for catalog consistency"""
1057+
1058+
# Database level filtering
1059+
if not node.database:
1060+
return True
1061+
if not self.config.materialized_node_pattern.database_pattern.allowed(
1062+
node.database
1063+
):
1064+
return False
1065+
1066+
# Schema level filtering: {database}.{schema}
1067+
if not node.schema:
1068+
return True
1069+
if not self.config.materialized_node_pattern.schema_pattern.allowed(
1070+
node._join_parts([node.database, node.schema])
1071+
):
1072+
return False
1073+
1074+
# Table level filtering: {database}.{schema}.{table}
1075+
if not node.name:
1076+
return True
1077+
if not self.config.materialized_node_pattern.table_pattern.allowed(
1078+
node.get_db_fqn()
1079+
):
1080+
return False
1081+
1082+
return True
10231083

10241084
def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
10251085
nodes: List[DBTNode] = []
10261086
for node in all_nodes:
10271087
key = node.dbt_name
10281088

1029-
if not self._is_allowed_node(key):
1089+
if not self._is_allowed_node(node):
10301090
self.report.nodes_filtered.append(key)
10311091
continue
10321092

@@ -1118,8 +1178,8 @@ def add_node_to_cll_list(dbt_name: str) -> None:
11181178
cll_nodes.add(dbt_name)
11191179
schema_nodes.add(dbt_name)
11201180

1121-
for dbt_name in all_nodes_map:
1122-
if self._is_allowed_node(dbt_name):
1181+
for dbt_name, dbt_node in all_nodes_map.items():
1182+
if self._is_allowed_node(dbt_node):
11231183
add_node_to_cll_list(dbt_name)
11241184

11251185
return schema_nodes, cll_nodes

0 commit comments

Comments
 (0)