Skip to content

Commit

Permalink
Merge pull request #193 from analyst-collective/fix/incremental-type-…
Browse files Browse the repository at this point in the history
…expansion

Fix/incremental type expansion
  • Loading branch information
drewbanin authored Oct 27, 2016
2 parents c8c049e + 11d9a3d commit 6fc3011
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 39 deletions.
6 changes: 2 additions & 4 deletions dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
'modules-path': 'dbt_modules'
}

default_profiles = {
'user': {}
}
default_profiles = {}

class DbtProjectError(Exception):
def __init__(self, message, project):
Expand All @@ -43,7 +41,7 @@ def __init__(self, cfg, profiles, profile_to_load=None):
self.profile_to_load = self.cfg['profile']

if self.profile_to_load is None:
self.profile_to_load = 'user'
raise DbtProjectError("No profile was supplied in the dbt_project.yml file, or the command line", self)

if self.profile_to_load in self.profiles:
self.cfg.update(self.profiles[self.profile_to_load])
Expand Down
1 change: 1 addition & 0 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def __init__(self, project, target_path, graph_type, threads):
"invocation_id" : dbt.tracking.invocation_id,
"get_columns_in_table" : self.schema.get_columns_in_table,
"get_missing_columns" : self.schema.get_missing_columns,
"already_exists" : self.schema.table_exists,
}


Expand Down
17 changes: 17 additions & 0 deletions dbt/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def __init__(self, column, dtype, char_size):
def name(self):
return self.column

@property
def quoted(self):
return '"{}"'.format(self.column)

@property
def data_type(self):
if self.is_string():
Expand Down Expand Up @@ -69,6 +73,7 @@ def __init__(self, project, target):
self.logger = logging.getLogger(__name__)

self.schema_cache = {}
self.runtime_existing = self.query_for_existing(self.target.schema)

def cache_table_columns(self, schema, table, columns):
tid = (schema, table)
Expand Down Expand Up @@ -285,3 +290,15 @@ def expand_column_types_if_needed(self, temp_table, to_schema, to_table):
self.logger.debug("Changing col type from %s to %s in table %s.%s", dest_column.data_type, new_type, to_schema, to_table)
self.alter_column_type(to_schema, to_table, column_name, new_type)

# update these cols in the cache! This is a hack to fix broken incremental models for type expansion. TODO
self.cache_table_columns(to_schema, to_table, source_columns)

def table_exists(self, schema, table):
if schema == self.target.schema:
exists = self.runtime_existing.get(table) is not None
return exists
else:
tables = self.query_for_existing(schema)
exists = tables.get(table) is not None
return exists

2 changes: 1 addition & 1 deletion dbt/task/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __pull_deps_recursive(self, repos, processed_repos = set(), i=0):
dep_project = project.read_project(
os.path.join(self.project['modules-path'],
dep_folder,
'dbt_project.yml')
'dbt_project.yml'), profile_to_load=self.project.profile_to_load
)
processed_repos.add(dep_folder)
self.__pull_deps_recursive(dep_project['repositories'], processed_repos, i+1)
Expand Down
52 changes: 19 additions & 33 deletions dbt/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@ class BaseCreateTemplate(object):
{query}
);"""

# Distribution style, sort keys,BACKUP, and NULL properties are inherited by LIKE tables,
# but you cannot explicitly set them in the CREATE TABLE ... LIKE statement.
# via http://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html
incremental_template = """
create temporary table "{identifier}__dbt_incremental_empty_tmp" {dist_qualifier} {sort_qualifier} as (
select * from (
{query}
) as tmp limit 0
{{% if not already_exists("{schema}", "{identifier}") %}}
create table "{schema}"."{identifier}" {dist_qualifier} {sort_qualifier} as (
{query}
);
create table if not exists "{schema}"."{identifier}" (like "{identifier}__dbt_incremental_empty_tmp");
{{% else %}}
create temporary table "{identifier}__dbt_incremental_tmp" as (
with dbt_incr_sbq as (
Expand All @@ -27,12 +25,20 @@ class BaseCreateTemplate(object):
-- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ temp_table: "{identifier}__dbt_incremental_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }}
{{% set dest_columns = get_columns_in_table("{schema}", "{identifier}") %}}
{{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}}
{incremental_delete_statement}
insert into "{schema}"."{identifier}" (
select * from "{identifier}__dbt_incremental_tmp"
insert into "{schema}"."{identifier}" ({{{{ dest_cols_csv }}}})
(
select {{{{ dest_cols_csv }}}}
from "{identifier}__dbt_incremental_tmp"
);
"""
{{% endif %}}
"""

incremental_delete_template = """
delete from "{schema}"."{identifier}" where ({unique_key}) in (
Expand Down Expand Up @@ -110,30 +116,10 @@ class DryCreateTemplate(object):


incremental_template = """
create temporary table "{identifier}__dbt_incremental_empty_tmp" {dist_qualifier} {sort_qualifier} as (
create table "{schema}"."{identifier}" {dist_qualifier} {sort_qualifier} as (
select * from (
{query}
) as tmp limit 0
);
create table if not exists "{schema}"."{identifier}" (like "{identifier}__dbt_incremental_empty_tmp");
create temporary table "{identifier}__dbt_incremental_tmp" as (
with dbt_incr_sbq as (
{query}
)
select * from dbt_incr_sbq
where ({sql_where}) or ({sql_where}) is null
limit 0
);
-- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ temp_table: "{identifier}__dbt_incremental_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }}
{incremental_delete_statement}
insert into "{schema}"."{identifier}" (
select * from "{identifier}__dbt_incremental_tmp"
) s limit 0
);
"""

Expand Down
7 changes: 6 additions & 1 deletion dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dbt.project
import pprint
import json
import dbt.project

DBTConfigKeys = [
'enabled',
Expand Down Expand Up @@ -81,7 +82,11 @@ def dependency_projects(project):
for obj in os.listdir(project['modules-path']):
full_obj = os.path.join(project['modules-path'], obj)
if os.path.isdir(full_obj):
yield dbt.project.read_project(os.path.join(full_obj, 'dbt_project.yml'))
try:
yield dbt.project.read_project(os.path.join(full_obj, 'dbt_project.yml'), profile_to_load=project.profile_to_load)
except dbt.project.DbtProjectError as e:
print("Error reading dependency project at {}".format(full_obj))
print(str(e))

def split_path(path):
norm = os.path.normpath(path)
Expand Down

0 comments on commit 6fc3011

Please sign in to comment.