Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,23 @@ def can_emit_model_performance(self) -> bool:
return self.model_performance == EmitDirective.YES


class MaterializedNodePatternConfig(ConfigModel):
"""Configuration for filtering materialized nodes based on their physical location"""

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for database names to filter materialized nodes.",
)
schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schema names in format '{database}.{schema}' to filter materialized nodes.",
)
table_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for table/view names in format '{database}.{schema}.{table}' to filter materialized nodes.",
)


class DBTCommonConfig(
StatefulIngestionConfigBase,
PlatformInstanceConfigMixin,
Expand Down Expand Up @@ -294,6 +311,11 @@ class DBTCommonConfig(
default=AllowDenyPattern.allow_all(),
description="regex patterns for dbt model names to filter in ingestion.",
)
materialized_node_pattern: MaterializedNodePatternConfig = Field(
default=MaterializedNodePatternConfig(),
description="Advanced filtering for materialized nodes based on their physical database location. "
"Provides fine-grained control over database.schema.table patterns for catalog consistency.",
)
meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings.",
Expand Down Expand Up @@ -1018,15 +1040,53 @@ def get_workunits_internal(
all_nodes_map,
)

def _is_allowed_node(self, key: str) -> bool:
return self.config.node_name_pattern.allowed(key)
def _is_allowed_node(self, node: DBTNode) -> bool:
"""
Check whether a node should be processed, using multi-layer rules. Checks for materialized nodes might need to be restricted in the future to some cases
"""
if not self.config.node_name_pattern.allowed(node.dbt_name):
return False

if not self._is_allowed_materialized_node(node):
return False

return True

def _is_allowed_materialized_node(self, node: DBTNode) -> bool:
"""Filter nodes based on their materialized database location for catalog consistency"""

# Database level filtering
if not node.database:
return True
if not self.config.materialized_node_pattern.database_pattern.allowed(
node.database
):
return False

# Schema level filtering: {database}.{schema}
if not node.schema:
return True
if not self.config.materialized_node_pattern.schema_pattern.allowed(
node._join_parts([node.database, node.schema])
):
return False

# Table level filtering: {database}.{schema}.{table}
if not node.name:
return True
if not self.config.materialized_node_pattern.table_pattern.allowed(
node.get_db_fqn()
):
return False

return True

def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
nodes: List[DBTNode] = []
for node in all_nodes:
key = node.dbt_name

if not self._is_allowed_node(key):
if not self._is_allowed_node(node):
self.report.nodes_filtered.append(key)
continue

Expand Down Expand Up @@ -1118,8 +1178,8 @@ def add_node_to_cll_list(dbt_name: str) -> None:
cll_nodes.add(dbt_name)
schema_nodes.add(dbt_name)

for dbt_name in all_nodes_map:
if self._is_allowed_node(dbt_name):
for dbt_name, dbt_node in all_nodes_map.items():
if self._is_allowed_node(dbt_node):
add_node_to_cll_list(dbt_name)

return schema_nodes, cll_nodes
Expand Down
Loading
Loading