Skip to content

Commit b7cbd87

Browse files
Abdullah Tariqclaude
andcommitted
feat(dbt): add database and schema pattern filtering
- Add database_pattern and schema_pattern config fields with AllowDenyPattern support - Enhance _is_allowed_node() to filter nodes by database and schema in addition to node names - Add comprehensive integration tests for new filtering capabilities - Support combined filtering patterns for fine-grained dbt ingestion control 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent c1694a3 commit b7cbd87

File tree

5 files changed

+11964
-4
lines changed

5 files changed

+11964
-4
lines changed

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,15 @@ class DBTCommonConfig(
294294
default=AllowDenyPattern.allow_all(),
295295
description="regex patterns for dbt model names to filter in ingestion.",
296296
)
297+
database_pattern: AllowDenyPattern = Field(
298+
default=AllowDenyPattern.allow_all(),
299+
description="Regex patterns for database (i.e. project_id) to filter in ingestion.",
300+
)
301+
schema_pattern: AllowDenyPattern = Field(
302+
default=AllowDenyPattern.allow_all(),
303+
description="Regex patterns for schema to filter in ingestion. Specify regex to only match the schema name. "
304+
"e.g. to match all tables in schema analytics, use the regex 'analytics'",
305+
)
297306
meta_mapping: Dict = Field(
298307
default={},
299308
description="mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings.",
@@ -1018,15 +1027,21 @@ def get_workunits_internal(
10181027
all_nodes_map,
10191028
)
10201029

1021-
def _is_allowed_node(self, key: str) -> bool:
1022-
return self.config.node_name_pattern.allowed(key)
1030+
def _is_allowed_node(self, node: DBTNode) -> bool:
1031+
return (
1032+
self.config.node_name_pattern.allowed(node.dbt_name)
1033+
and (not node.schema or self.config.schema_pattern.allowed(node.schema))
1034+
and (
1035+
not node.database or self.config.database_pattern.allowed(node.database)
1036+
)
1037+
)
10231038

10241039
def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
10251040
nodes: List[DBTNode] = []
10261041
for node in all_nodes:
10271042
key = node.dbt_name
10281043

1029-
if not self._is_allowed_node(key):
1044+
if not self._is_allowed_node(node):
10301045
self.report.nodes_filtered.append(key)
10311046
continue
10321047

@@ -1119,7 +1134,7 @@ def add_node_to_cll_list(dbt_name: str) -> None:
11191134
schema_nodes.add(dbt_name)
11201135

11211136
for dbt_name in all_nodes_map:
1122-
if self._is_allowed_node(dbt_name):
1137+
if self._is_allowed_node(all_nodes_map.get(dbt_name)):
11231138
add_node_to_cll_list(dbt_name)
11241139

11251140
return schema_nodes, cll_nodes

0 commit comments

Comments
 (0)