Skip to content

Commit

Permalink
ELE-2932: Column anomalies group_by (#694)
Browse files Browse the repository at this point in the history
* add group_by param

* wip populating dimension & dimension_value columns

* wip temp metrics table

* remove debug

* add multi concat

* concat is better

* add ifs

* wip fail ut

* win

* add dimension to test

* reuse package macros

* add uts

* test for description

* description is good now

* remove unused change

* change parameter name

* mistake

* cr
  • Loading branch information
dapollak authored May 2, 2024
1 parent 639b7e2 commit 6124139
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 17 deletions.
206 changes: 206 additions & 0 deletions integration_tests/tests/test_column_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,209 @@ def test_volume_anomaly_static_data_drop(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)
assert test_result["status"] == expected_result


def test_anomalyless_column_anomalies_group_by_pass(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
"dimension1": "dim1",
"dimension2": "dim2",
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1))
for superhero in ["Superman", "Batman"]
]
test_args = DBT_TEST_ARGS.copy()
test_args["dimensions"] = ["dimension1", "dimension2"]
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)
assert test_result["status"] == "pass"


def test_anomalyless_column_anomalies_group_by_fail(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
"dimension": dim,
}
for cur_date in training_dates
for superhero in ["Superman", "Batman"]
for dim in ["dim1", "dim2"]
]

data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": None,
"dimension": "dim1",
}
for _ in range(100)
]

test_args = DBT_TEST_ARGS.copy()
test_args["dimensions"] = ["dimension"]
test_args["anomaly_sensitivity"] = 1
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)

assert test_result["status"] == "fail"
assert test_result["failures"] == 1

data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": None,
"dimension": "dim2",
}
for _ in range(100)
]

test_args = DBT_TEST_ARGS.copy()
test_args["dimensions"] = ["dimension"]
test_args["anomaly_sensitivity"] = 3
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)

assert test_result["status"] == "fail"
assert test_result["failures"] == 2


def test_anomalyless_column_anomalies_group_by_none_dimension(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
"dimension": dim,
}
for cur_date in training_dates
for superhero in ["Superman", "Batman"]
for dim in [None, "dim2"]
]

data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": None,
"dimension": None,
}
for _ in range(100)
]
data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": None,
"dimension": "dim2",
}
for _ in range(100)
]

test_args = DBT_TEST_ARGS.copy()
test_args["dimensions"] = ["dimension"]
test_args["anomaly_sensitivity"] = 3
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)

assert test_result["status"] == "fail"
assert test_result["failures"] == 2


def test_anomalyless_column_anomalies_group_by_multi(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
"dimension1": dim1,
"dimension2": dim2,
}
for cur_date in training_dates
for superhero in ["Superman", "Batman"]
for dim1 in ["dim1", "dim2"]
for dim2 in ["hey", "bye"]
]

data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": None,
"dimension1": dim1,
"dimension2": dim2,
}
for _ in range(100)
for dim1 in ["dim1", "dim2"]
for dim2 in ["hey"]
]
data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": None,
"dimension1": dim1,
"dimension2": dim2,
}
for _ in range(100)
for dim1 in ["dim1"]
for dim2 in ["bye"]
]

test_args = DBT_TEST_ARGS.copy()
test_args["dimensions"] = ["dimension1", "dimension2"]
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)

assert test_result["status"] == "fail"
assert test_result["failures"] == 3


def test_anomalyless_column_anomalies_group_by_description(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
"dimension": "super_dimension",
}
for cur_date in training_dates
for superhero in ["Superman", "Batman"]
]
data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": None,
"dimension": dim,
}
for _ in range(100)
for dim in ["dim_new", "super_dimension"]
]
test_args = DBT_TEST_ARGS.copy()
test_args["dimensions"] = ["dimension"]
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="superhero"
)

assert test_result["status"] == "fail"
assert test_result["failures"] == 1
assert "not enough data" not in test_result["test_results_description"].lower()
8 changes: 7 additions & 1 deletion macros/edr/alerts/anomaly_detection_description.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% macro anomaly_detection_description() %}
case
when dimension is not null then {{ elementary.dimension_metric_description() }}
when dimension is not null and column_name is null then {{ elementary.dimension_metric_description() }}
when dimension is not null and column_name is not null then {{ elementary.column_dimension_metric_description() }}
when metric_name = 'freshness' then {{ elementary.freshness_description() }}
when column_name is null then {{ elementary.table_metric_description() }}
when column_name is not null then {{ elementary.column_metric_description() }}
Expand All @@ -22,6 +23,11 @@
'. The average for this metric is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('training_avg') ~ ', 3)') }} || '.'
{% endmacro %}

{% macro column_dimension_metric_description() %}
'In column ' || column_name || ', the last ' || metric_name || ' value for dimension ' || dimension || ' is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('metric_value') ~ ', 3)') }} ||
'. The average for this metric is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('training_avg') ~ ', 3)') }} || '.'
{% endmacro %}

{% macro dimension_metric_description() %}
'The last ' || metric_name || ' value for dimension ' || dimension || ' - ' ||
case when dimension_value is null then 'NULL' else dimension_value end || ' is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('metric_value') ~ ', 3)') }} ||
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{% macro column_monitoring_query(monitored_table, monitored_table_relation, min_bucket_start, max_bucket_end, days_back, column_obj, column_monitors, metric_properties) %}
{% macro column_monitoring_query(monitored_table, monitored_table_relation, min_bucket_start, max_bucket_end, days_back, column_obj, column_monitors, metric_properties, dimensions) %}
{%- set full_table_name_str = elementary.edr_quote(elementary.relation_to_full_name(monitored_table_relation)) %}
{%- set timestamp_column = metric_properties.timestamp_column %}
{% set prefixed_dimensions = [] %}
{% for dimension_column in dimensions %}
{% do prefixed_dimensions.append("dimension_" ~ dimension_column) %}
{% endfor %}


with monitored_table as (
Expand All @@ -18,6 +22,7 @@

filtered_monitored_table as (
select {{ column_obj.quoted }},
{%- if dimensions -%} {{ elementary.select_dimensions_columns(dimensions, "dimension") }}, {%- endif -%}
{{ elementary.get_start_bucket_in_data(timestamp_column, min_bucket_start, metric_properties.time_bucket) }} as start_bucket_in_data
from monitored_table
where
Expand All @@ -27,7 +32,8 @@
{%- else %}
filtered_monitored_table as (
select {{ column_obj.quoted }},
{{ elementary.null_timestamp() }} as start_bucket_in_data
{%- if dimensions -%} {{ elementary.select_dimensions_columns(dimensions, "dimension") }}, {%- endif -%}
{{ elementary.null_timestamp() }} as start_bucket_in_data,
from monitored_table
),
{% endif %}
Expand All @@ -47,6 +53,9 @@
{{ elementary.null_int() }} as bucket_duration_hours,
{%- endif %}
{{ elementary.const_as_string(column_obj.name) }} as edr_column_name,
{% if dimensions | length > 0 %}
{{ elementary.select_dimensions_columns(prefixed_dimensions) }},
{% endif %}
{%- if 'null_count' in column_monitors -%} {{ elementary.null_count(column) }} {%- else -%} null {% endif %} as null_count,
{%- if 'null_percent' in column_monitors -%} {{ elementary.null_percent(column) }} {%- else -%} null {% endif %} as null_percent,
{%- if 'not_null_percent' in column_monitors -%} {{ elementary.not_null_percent(column) }} {%- else -%} null {% endif %} as not_null_percent,
Expand All @@ -71,7 +80,11 @@
{%- if timestamp_column %}
left join buckets on (edr_bucket_start = start_bucket_in_data)
{%- endif %}
group by 1,2,3,4
{% if dimensions | length > 0 %}
group by 1,2,3,4,{{ elementary.select_dimensions_columns(prefixed_dimensions) }}
{% else %}
group by 1,2,3,4
{% endif %}
{%- else %}
{{ elementary.empty_column_monitors_cte() }}
{%- endif %}
Expand All @@ -82,7 +95,21 @@

{%- if column_monitors %}
{% for monitor in column_monitors %}
select edr_column_name, bucket_start, bucket_end, bucket_duration_hours, {{ elementary.edr_cast_as_string(elementary.edr_quote(monitor)) }} as metric_name, {{ elementary.edr_cast_as_float(monitor) }} as metric_value from column_monitors where {{ monitor }} is not null
select
edr_column_name,
bucket_start,
bucket_end,
bucket_duration_hours,
{% if dimensions | length > 0 %}
{{ elementary.const_as_string(elementary.join_list(dimensions, separator='; ')) }} as dimension,
{{ elementary.list_concat_with_separator(prefixed_dimensions, separator='; ') }} as dimension_value,
{% else %}
{{ elementary.null_string() }} as dimension,
{{ elementary.null_string() }} as dimension_value,
{% endif %}
{{ elementary.edr_cast_as_float(monitor) }} as metric_value,
{{ elementary.edr_cast_as_string(elementary.edr_quote(monitor)) }} as metric_name
from column_monitors where {{ monitor }} is not null
{% if not loop.last %} union all {% endif %}
{%- endfor %}
{%- else %}
Expand All @@ -102,8 +129,8 @@
bucket_start,
bucket_end,
bucket_duration_hours,
{{ elementary.null_string() }} as dimension,
{{ elementary.null_string() }} as dimension_value,
dimension,
dimension_value,
{{elementary.dict_to_quoted_json(metric_properties) }} as metric_properties
from column_monitors_unpivot

Expand All @@ -114,6 +141,8 @@
'full_table_name',
'column_name',
'metric_name',
'dimension',
'dimension_value',
'bucket_end',
'metric_properties'
]) }} as id,
Expand All @@ -132,3 +161,21 @@
from metrics_final

{% endmacro %}

{% macro select_dimensions_columns(dimension_columns, as_prefix="") %}
{% set select_statements %}
{%- for column in dimension_columns -%}
{%- if col_prefix -%}
{{ col_prefix ~ "_" }}
{%- endif -%}
{{ column }}
{%- if as_prefix -%}
{{ " as " ~ as_prefix ~ "_" ~ column }}
{%- endif -%}
{%- if not loop.last -%}
{{ ", " }}
{%- endif -%}
{%- endfor -%}
{% endset %}
{{ return(select_statements) }}
{% endmacro %}
8 changes: 4 additions & 4 deletions macros/edr/materializations/test/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@

{% macro get_anomaly_test_result_row(flattened_test, anomaly_scores_rows) %}
{%- set latest_row = anomaly_scores_rows[-1] %}
{%- set rows_with_score = anomaly_scores_rows | rejectattr("anomaly_score", "none") | list %}
{%- set full_table_name = elementary.insensitive_get_dict_value(latest_row, 'full_table_name') %}
{%- set test_unique_id = flattened_test.unique_id %}
{%- set test_configuration = elementary.get_cache(test_unique_id) %}
Expand All @@ -153,8 +154,7 @@
{%- set column_name = elementary.insensitive_get_dict_value(latest_row, 'column_name') %}
{%- set metric_name = elementary.insensitive_get_dict_value(latest_row, 'metric_name') %}
{%- set test_unique_id = elementary.insensitive_get_dict_value(latest_row, 'test_unique_id') %}
{%- set has_anomaly_score = elementary.insensitive_get_dict_value(latest_row, 'anomaly_score') is not none %}
{%- if not has_anomaly_score %}
{%- if not rows_with_score %}
{% do elementary.edr_log("Not enough data to calculate anomaly scores on `{}`".format(test_unique_id)) %}
{% endif %}
{%- set test_results_query -%}
Expand All @@ -168,8 +168,8 @@
{%- endif %}
{%- endset -%}
{% set test_results_description %}
{% if has_anomaly_score %}
{{ elementary.insensitive_get_dict_value(latest_row, 'anomaly_description') }}
{% if rows_with_score %}
{{ elementary.insensitive_get_dict_value(rows_with_score[-1], 'anomaly_description') }}
{% else %}
Not enough data to calculate anomaly score.
{% endif %}
Expand Down
Loading

0 comments on commit 6124139

Please sign in to comment.