Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 2204 bug timestamp column as sql expression doesnt work #637

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions integration_tests/tests/test_column_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,26 @@ def test_column_anomalies_with_where_parameter(test_id: str, dbt_project: DbtPro
test_vars={"force_metrics_backfill": True},
)
assert test_result["status"] == "fail"


def test_column_anomalies_with_timestamp_as_sql_expression(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1))
for superhero in ["Superman", "Batman"]
]
test_args = {
"timestamp_column": "case when updated_at is not null then updated_at else updated_at end",
"column_anomalies": ["null_count"],
}

test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)
assert test_result["status"] == "pass"
20 changes: 20 additions & 0 deletions integration_tests/tests/test_dimension_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ def test_anomalyless_dimension_anomalies(test_id: str, dbt_project: DbtProject):
assert test_result["status"] == "pass"


def test_dimension_anomalies_with_timestamp_as_sql_expression(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1))
for superhero in ["Superman", "Spiderman"]
]
test_args = {
"timestamp_column": "case when updated_at is not null then updated_at else updated_at end",
"dimensions": ["superhero"],
}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"


def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
Expand Down
15 changes: 15 additions & 0 deletions integration_tests/tests/test_volume_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ def test_anomalyless_table_volume_anomalies(test_id: str, dbt_project: DbtProjec
assert test_result["status"] == "pass"


def test_table_volume_anomalies_with_timestamp_as_sql_expression(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
data = [
{TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}
for cur_date in generate_dates(base_date=utc_today)
]
test_args = {
"timestamp_column": "case when updated_at is not null then updated_at else updated_at end"
}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"


def test_full_drop_table_volume_anomalies(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
data = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@
{% macro get_timestamp_table_query(monitored_table, metric_properties, timestamp_column, table_monitors, min_bucket_start, max_bucket_end, full_table_name_str) %}
with partially_time_filtered_monitored_table as (
select
{{ elementary.edr_cast_as_timestamp(timestamp_column) }} as {{ timestamp_column }}
{{ elementary.edr_cast_as_timestamp(timestamp_column) }} as monitored_table_timestamp_column
{%- if metric_properties.timestamp_column and metric_properties.event_timestamp_column %}
, {{ elementary.edr_cast_as_timestamp(metric_properties.event_timestamp_column) }} as {{ metric_properties.event_timestamp_column }}
, {{ elementary.edr_cast_as_timestamp(metric_properties.event_timestamp_column) }} as monitored_table_event_timestamp_column
{%- endif %}
from {{ monitored_table }}
-- Freshness metric calculated differences between consecutive buckets, thus the first diff
Expand All @@ -88,7 +88,7 @@
select
*
from partially_time_filtered_monitored_table
where {{ timestamp_column }} >= {{ elementary.edr_cast_as_timestamp(min_bucket_start) }}
where monitored_table_timestamp_column >= {{ elementary.edr_cast_as_timestamp(min_bucket_start) }}
),
buckets as (
select edr_bucket_start, edr_bucket_end
Expand All @@ -99,13 +99,13 @@

time_filtered_monitored_table as (
select *,
{{ elementary.get_start_bucket_in_data(timestamp_column, min_bucket_start, metric_properties.time_bucket) }} as start_bucket_in_data
{{ elementary.get_start_bucket_in_data('monitored_table_timestamp_column', min_bucket_start, metric_properties.time_bucket) }} as start_bucket_in_data
from monitored_table
where
{{ timestamp_column }} >= (select min(edr_bucket_start) from buckets)
and {{ timestamp_column }} < (select max(edr_bucket_end) from buckets)
monitored_table_timestamp_column >= (select min(edr_bucket_start) from buckets)
and monitored_table_timestamp_column < (select max(edr_bucket_end) from buckets)
{# To avoid adding buckets before the table first timestamp #}
and {{ timestamp_column }} >= (select min({{ timestamp_column }}) from monitored_table)
and monitored_table_timestamp_column >= (select min(monitored_table_timestamp_column) from monitored_table)
),

metrics as (
Expand Down Expand Up @@ -205,14 +205,9 @@
{% endmacro %}

{% macro freshness_metric_query(metric_properties) %}
{%- set freshness_column = metric_properties.freshness_column %}
{%- if not freshness_column %}
{%- set freshness_column = metric_properties.timestamp_column %}
{%- endif %}

-- get ordered consecutive update timestamps in the source data
with unique_timestamps as (
select distinct {{ elementary.edr_cast_as_timestamp(freshness_column) }} as timestamp_val
select distinct monitored_table_timestamp_column as timestamp_val
from partially_time_filtered_monitored_table
order by 1
),
Expand Down Expand Up @@ -278,15 +273,13 @@
{% endmacro %}

{% macro event_freshness_metric_query(metric_properties) %}
{% set event_timestamp_column = metric_properties.event_timestamp_column %}
{% set update_timestamp_column = metric_properties.timestamp_column %}
select
edr_bucket_start,
edr_bucket_end,
{{ elementary.const_as_string('event_freshness') }} as metric_name,
{{ elementary.edr_cast_as_string('max({})'.format(event_timestamp_column)) }} as source_value,
{{ elementary.edr_cast_as_string('max({})'.format('monitored_table_event_timestamp_column')) }} as source_value,
{{ 'coalesce(max({}), {})'.format(
elementary.timediff('second', elementary.edr_cast_as_timestamp(event_timestamp_column), elementary.edr_cast_as_timestamp(update_timestamp_column)),
elementary.timediff('second', elementary.edr_cast_as_timestamp('monitored_table_event_timestamp_column'), elementary.edr_cast_as_timestamp('monitored_table_timestamp_column')),
elementary.timediff('second', 'edr_bucket_start', 'edr_bucket_end')
) }} as metric_value
from buckets left join time_filtered_monitored_table on (edr_bucket_start = start_bucket_in_data)
Expand Down