Skip to content

Commit

Permalink
Feature/49 add dbt selection (#50)
Browse files Browse the repository at this point in the history
* chore: minor docs correction

* feat: add dbt selection + some code refactors

* test: get 100% code cov

* docs: update cli references

* fix: lint
  • Loading branch information
datnguye authored Sep 8, 2023
1 parent 6813ab0 commit bed433d
Show file tree
Hide file tree
Showing 26 changed files with 1,210 additions and 70 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/target
/samples/local
CHANGELOG.md
/dbt_packages

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
1 change: 1 addition & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ currently being supported with security updates.

| Version | Supported |
| ------- | ------------------ |
| 1.3+ | :white_check_mark: |
| 1.2.x | :white_check_mark: |
| 1.1.x | :white_check_mark: |
| 1.0.x | :white_check_mark: |
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion dbterd/adapters/algos/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy

from dbterd.adapters.algos.meta import Column, Table
from dbterd.adapters.meta import Column, Table


def get_tables(manifest, catalog):
Expand Down
12 changes: 8 additions & 4 deletions dbterd/adapters/algos/test_relationship.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from dbterd.adapters.algos import base
from dbterd.adapters.algos.filter import is_selected_table
from dbterd.adapters.algos.meta import Ref
from dbterd.adapters.filter import is_selected_table
from dbterd.adapters.meta import Ref
from dbterd.constants import (
DEFAULT_ALGO_RULE,
TEST_META_IGNORE_IN_ERD,
TEST_META_RELATIONSHIP_TYPE,
)
from dbterd.helpers.log import logger


def parse(manifest, catalog, **kwargs):
Expand All @@ -27,9 +28,9 @@ def parse(manifest, catalog, **kwargs):
for table in tables
if is_selected_table(
table=table,
select_rules=(kwargs.get("select") or []),
select_rules=kwargs.get("select") or [],
resource_types=kwargs.get("resource_type", []),
exclude_rules=kwargs.get("exclude", []),
exclude_rules=kwargs.get("exclude") or [],
)
]

Expand All @@ -47,6 +48,9 @@ def parse(manifest, catalog, **kwargs):
tables=tables, relationships=relationships
)

logger.info(
f"Collected {len(tables)} table(s) and {len(relationships)} relationship(s)"
)
return (tables, relationships)


Expand Down
108 changes: 94 additions & 14 deletions dbterd/adapters/base.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import abc
import os
from pathlib import Path

import click

from dbterd.adapters import factory
from dbterd import default
from dbterd.adapters import adapter
from dbterd.adapters.dbt_invocation import DbtInvocation
from dbterd.adapters.filter import has_unsupported_rule
from dbterd.helpers import cli_messaging
from dbterd.helpers import file as file_handlers
from dbterd.helpers.log import logger


class Executor(abc.ABC):
"""Base executor"""
class Executor:
"""Main Executor"""

ctx: click.Context

Expand All @@ -19,11 +23,82 @@ def __init__(self, ctx) -> None:
self.filename_manifest = "manifest.json"
self.filename_catalog = "catalog.json"

@abc.abstractmethod
def run(self, **kwargs):
"""Main function helps to run by the target strategy"""
kwargs = self.evaluate_kwargs(**kwargs)
self.__run_by_strategy(**kwargs)

def evaluate_kwargs(self, **kwargs) -> dict:
"""Re-calculate the options
Raises:
click.UsageError: Not Supported exception
Returns:
dict: kwargs dict
"""
artifacts_dir, dbt_project_dir = self.__get_dir(**kwargs)
logger.info(f"Using dbt artifact dir at: {artifacts_dir}")
logger.info(f"Using dbt project dir at: {dbt_project_dir}")

select = list(kwargs.get("select")) or []
exclude = list(kwargs.get("exclude")) or []
if kwargs.get("dbt"):
select = self.__get_selection(**kwargs)
exclude = []
else:
unsupported, rule = has_unsupported_rule(
rules=select.extend(exclude) if exclude else select
)
if unsupported:
message = f"Unsupported Selection found: {rule}"
logger.error(message)
raise click.UsageError(message)

kwargs["artifacts_dir"] = artifacts_dir
kwargs["dbt_project_dir"] = dbt_project_dir
kwargs["select"] = select
kwargs["exclude"] = exclude

return kwargs

def __get_dir(self, **kwargs) -> str:
"""Calculate the dbt artifact directory and dbt project directory
Returns:
tuple(str, str): Path to target directory and dbt project directory
"""
artifact_dir = (
f"{kwargs.get('artifacts_dir') or kwargs.get('dbt_project_dir')}" # default
)
project_dir = (
f"{kwargs.get('dbt_project_dir') or kwargs.get('artifacts_dir')}" # default
)

if not artifact_dir:
return (
default.default_artifact_path(),
str(Path(default.default_artifact_path()).parent.absolute()),
)

artifact_dir = Path(artifact_dir).absolute()
project_dir = Path(project_dir).absolute()

if not os.path.isfile(f"{artifact_dir}/{self.filename_manifest}"):
artifact_dir = f"{project_dir}/target" # try child target

return (str(artifact_dir), str(project_dir))

def __get_selection(self, **kwargs):
"""Override the Selection using dbt's one with `--dbt`"""
return DbtInvocation(
dbt_project_dir=kwargs.get("dbt_project_dir"),
dbt_target=kwargs.get("dbt_target"),
).get_selection(
select_rules=kwargs.get("select"),
exclude_rules=kwargs.get("exclude"),
)

def __read_manifest(self, mp: str, mv: int = None):
"""Read the Manifest content
Expand Down Expand Up @@ -55,24 +130,29 @@ def __read_catalog(self, cp: str, cv: int = None):

def __run_by_strategy(self, **kwargs):
"""Read artifacts and export the diagram file following the target"""
target = factory.load_executor(name=kwargs["target"]) # import {target}
target = adapter.load_executor(name=kwargs["target"]) # import {target}
run_operation_dispatcher = getattr(target, "run_operation_dispatcher")
operation_default = getattr(target, "run_operation_default")
operation = run_operation_dispatcher.get(
f"{kwargs['target']}_{kwargs['algo'].split(':')[0]}",
operation_default,
)

manifest = self.__read_manifest(
mp=kwargs.get("manifest_path") or kwargs["artifacts_dir"],
mv=kwargs["manifest_version"],
mp=kwargs.get("artifacts_dir"),
mv=kwargs.get("manifest_version"),
)
catalog = self.__read_catalog(
cp=kwargs.get("manifest_path") or kwargs["artifacts_dir"],
cv=kwargs["catalog_version"],
cp=kwargs.get("artifacts_dir"),
cv=kwargs.get("catalog_version"),
)

result = operation(manifest=manifest, catalog=catalog, **kwargs)
path = kwargs["output"] + f"/{result[0]}"
with open(path, "w") as f:
logger.info(path)
f.write(result[1])
path = kwargs.get("output") + f"/{result[0]}"
try:
with open(path, "w") as f:
logger.info(path)
f.write(result[1])
except Exception as e:
logger.error(str(e))
raise click.FileError(f"Could not save the output: {str(e)}")
76 changes: 76 additions & 0 deletions dbterd/adapters/dbt_invocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
import importlib.util
from importlib.metadata import version
from pathlib import Path
from typing import List

import click
from dbt.cli.main import dbtRunner, dbtRunnerResult

from dbterd.helpers.log import logger


class DbtInvocation:
"""Runner of dbt (https://docs.getdbt.com/reference/programmatic-invocations)"""

def __init__(self, dbt_project_dir: str = None, dbt_target: str = None) -> None:
"""Initialization
Args:
dbt_project_dir (str, optional): Custom dbt project directory path. Defaults to None.
dbt_target (str, optional): Custom dbt target name. Defaults to None - using default target
"""
self.__ensure_dbt_installed()
self.dbt = dbtRunner()
self.project_dir = (
dbt_project_dir or os.environ.get("DBT_PROJECT_DIR") or str(Path.cwd())
)
self.target = dbt_target

def __ensure_dbt_installed(self):
dbt_spec = importlib.util.find_spec("dbt")
if dbt_spec and dbt_spec.loader:
installed_path = dbt_spec.submodule_search_locations[0]
logger.debug(
f"Found dbt v{version('dbt-core')} installed at {installed_path}"
)
else:
message = (
"dbt module is not found or unsupported version, "
"please try to install dbt-core v1.5 or later, "
"OR let's try again without `--dbt` flag"
)
logger.error(message)
raise click.UsageError(message)

def get_selection(
self, select_rules: List[str] = [], exclude_rules: List[str] = []
) -> List[str]:
"""Get dbt selected models
Args:
select_rules (List[str], optional): Model inclusives. Defaults to [].
exclude_rules (List[str], optional): Model exclusives. Defaults to [].
Returns:
List[str]: Selected node names with 'exact' rule
"""
args = ["ls", "--project-dir", self.project_dir, "--resource-type", "model"]
if select_rules:
args.extend(["--select", " ".join(select_rules)])
if exclude_rules:
args.extend(["--exclude", " ".join(exclude_rules)])
if self.target:
args.extend(["--target", self.target])

logger.info(f"Invoking: `dbt {' '.join(args)}` at {self.project_dir}")
r: dbtRunnerResult = self.dbt.invoke(args)

if not r.success:
logger.error(str(r))
raise click.UsageError("str(r)")

return [
f"exact:model.{str(x).split('.')[0]}.{str(x).split('.')[-1]}"
for x in r.result
]
55 changes: 48 additions & 7 deletions dbterd/adapters/algos/filter.py → dbterd/adapters/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,29 @@
from fnmatch import fnmatch
from typing import List

from dbterd.adapters.algos.meta import Table
from dbterd.adapters.meta import Table

RULE_FUNC_PREFIX = "is_satisfied_by_"


def has_unsupported_rule(rules: List[str] = []) -> bool:
"""Verify if existing the unsupported selection rule
Args:
rules (List[str]): Any (selection or/and exclusion) rules
Returns:
bool: True if existing any unsupported one
"""
for rule in rules:
type = rule.split(":")
if len(type) == 1:
continue
rule_func = f"{RULE_FUNC_PREFIX}{type[0]}"
if not hasattr(sys.modules[__name__], rule_func):
return (True, type[0])

return (False, None)


def is_selected_table(
Expand All @@ -28,6 +50,7 @@ def is_selected_table(
selected = any([evaluate_rule(table=table, rule=rule) for rule in select_rules])
if resource_types:
selected = selected and table.resource_type in resource_types

# Exclusion
excluded = False
if exclude_rules:
Expand Down Expand Up @@ -55,12 +78,15 @@ def evaluate_rule(table: Table, rule: str):
type, rule = "name", rule_parts[0]
if len(rule_parts) > 1:
type, rule = tuple(rule_parts[:2])
selected_func = getattr(sys.modules[__name__], f"__is_satisfied_by_{type}")

rule_func = f"{RULE_FUNC_PREFIX}{type}"
selected_func = getattr(sys.modules[__name__], rule_func)
results.append(selected_func(table=table, rule=rule))

return all(results)


def __is_satisfied_by_name(table: Table, rule: str = ""):
def is_satisfied_by_name(table: Table, rule: str = ""):
"""Evaluate rule by Name
Args:
Expand All @@ -75,7 +101,22 @@ def __is_satisfied_by_name(table: Table, rule: str = ""):
return table.name.startswith(rule)


def __is_satisfied_by_schema(table: Table, rule: str = ""):
def is_satisfied_by_exact(table: Table, rule: str = ""):
"""Evaluate rule by model name with exact match
Args:
table (Table): Table object
rule (str, optional): Rule def. Defaults to "".
Returns:
bool: True if satisfied `equal` logic applied to Table name
"""
if not rule:
return True
return table.name == rule


def is_satisfied_by_schema(table: Table, rule: str = ""):
"""Evaluate rule by Schema name
Args:
Expand All @@ -96,7 +137,7 @@ def __is_satisfied_by_schema(table: Table, rule: str = ""):
)


def __is_satisfied_by_wildcard(table: Table, rule: str = "*"):
def is_satisfied_by_wildcard(table: Table, rule: str = "*"):
"""Evaluate rule by Wildcard (Unix Style)
Args:
Expand All @@ -111,15 +152,15 @@ def __is_satisfied_by_wildcard(table: Table, rule: str = "*"):
return fnmatch(table.name, rule)


def __is_satisfied_by_exposure(table: Table, rule: str = ""):
def is_satisfied_by_exposure(table: Table, rule: str = ""):
"""Evaluate rule by dbt Exposure name
Args:
table (Table): Table object
rule (str, optional): Rule def. Defaults to "".
Returns:
bool: True if satisfied table name matched the pattern
bool: True if satisfied exposure name exists in the table's exposures
"""
if not rule:
return True
Expand Down
Loading

0 comments on commit bed433d

Please sign in to comment.