Skip to content

Commit

Permalink
Fix: #22 Min/Max Aggregations (#23)
Browse files Browse the repository at this point in the history
## Description
- Fixes a bug whereby all relationships operating on columns using
min/max aggregations, would not select the correct field.
- Some other house keeping things including:
- Replaces the `stream(i)` macro for generating an alias name with the
constants `appended()` and `primary()` for clarity and simplicity.
- The `appended()` and `primary()` macros should now be used directly in
the `additional_join_condition` arg to an activity.
  • Loading branch information
tnightengale authored Mar 23, 2023
1 parent b8e4ec8 commit eb5f6bf
Show file tree
Hide file tree
Showing 36 changed files with 255 additions and 230 deletions.
65 changes: 44 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Use the [dataset macro](#dataset-source) to self-join an Activity Stream using
The [dataset macro](#dataset-source) will compile based on the provided
[activity macros](#activity-source) and the [relationship
macros](#relationships). It can then be nested in a CTE in a dbt-Core model. Eg:
```c
```sql
// my_first_dataset.sql

with
Expand Down Expand Up @@ -218,28 +218,51 @@ dataset.

- **`additional_join_condition (optional)`** : str

A valid SQL boolean to condition the join of the appended activity. Can
optionally contain the python f-string placeholders `{primary}` and
`{appended}` in the string. These placeholders will be compiled by the
[dataset macro](./macros/dataset.sql) with the correct SQL aliases for the
joins between the primary activity and the appended activity.
A valid sql boolean expression that is added to the join condition of the
appended activity. The expression is an `and` with the condition created by
the [relationship](#relationships).

The expression can optionally contain either or both of the `{{ primary() }}`
and `{{ appended() }}` macros, which are used to alias the primary and
appended activities respectively. If using these aliases in the expression, it
must be first assigned to a set block. Eg:

```sql
// my_second_dataset.sql

{% set join_condition %}
json_extract({{ dbt_activity_schema.primary() }}.feature_json, 'type')
= json_extract({{ dbt_activity_schema.appended() }}.feature_json, 'type')
{% endset %}


{{
dbt_activity_schema.dataset(
ref("activity_schema"),
dbt_activity_schema.activity(
dbt_activity_schema.all_ever(),
"signed up"
),
[
dbt_activity_schema.activity(
dbt_activity_schema.first_after(),
"visit page",
additional_join_condition=join_condition
)
]
)
}}

Eg:
```python
"json_extract({primary}.feature_json, 'dim1') =
json_extract({appended}.feature_json, 'dim1')"
```
The `{primary}` and `{appended}` placeholders compile according to the
cardinality of the activity in the `appended_activities` list argument to
`dataset.sql`.

Compiled:
```python
"json_extract(stream.feature_json, 'dim1') =
json_extract(stream_3.feature_json, 'dim1')"
```
Given that the appended activity was 3rd in the `appended_activities` list
argument.

The `{{ primary() }}` and `{{ appended() }}` placeholders are constants for
the aliases used in the joins of the [dataset macro](#dataset-source). Columns
used in the expression must be fully qualified with these aliases.

In the above example, the value of the `type` key in the `feature_json` of the
primary activity `feature_json` must match the value of the `type` key in the
`feature_json` of the appended activity, in addition to the relationship join
conditions.

## Relationships
In the Activity Schema framework,
Expand Down
10 changes: 6 additions & 4 deletions integration_tests/models/first_after/dataset__first_after_3.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{% set join_condition %}
json_extract({{ dbt_activity_schema.primary() }}.feature_json, 'type')
= json_extract({{ dbt_activity_schema.appended() }}.feature_json, 'type')
{% endset %}

{{
dbt_activity_schema.dataset(
ref("input__first_after"),
Expand All @@ -21,10 +26,7 @@
"activity_occurrence",
"ts"
],
additional_join_condition="
json_extract({primary}.feature_json, 'type')
= json_extract({appended}.feature_json, 'type')
"
additional_join_condition=join_condition
)
]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
dbt_activity_schema.activity(
dbt_activity_schema.first_in_between(),
"bought something",
["feature_json", "ts"]
["feature_json", "ts", "revenue_impact"]
)
]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
dbt_activity_schema.activity(
dbt_activity_schema.first_in_between(),
"bought something",
["feature_json", "ts"]
["feature_json", "ts", "revenue_impact"]
)
]
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{% set join_condition %}
json_extract({{ dbt_activity_schema.primary() }}.feature_json, 'type')
= json_extract({{ dbt_activity_schema.appended() }}.feature_json, 'type')
{% endset %}

{{
dbt_activity_schema.dataset(
ref("input__first_in_between"),
Expand All @@ -21,15 +26,13 @@
"activity_occurrence",
"ts"
],
additional_join_condition="
json_extract({primary}.feature_json, 'type')
= json_extract({appended}.feature_json, 'type')
"
additional_join_condition=join_condition
),
dbt_activity_schema.activity(
dbt_activity_schema.first_in_between(),
"bought something",
[
"revenue_impact",
"activity_id",
"ts"
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{
dbt_activity_schema.dataset(
ref("input__last_in_between"),
dbt_activity_schema.activity(dbt_activity_schema.all_ever(), "visit page"),
[
dbt_activity_schema.activity(
dbt_activity_schema.last_in_between(),
"bought something",
["activity_id", "revenue_impact"]
)
]
)
}}
6 changes: 6 additions & 0 deletions integration_tests/models/last_in_between/last_in_between.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ models:
tests:
- dbt_utils.equality:
compare_model: ref("output__last_in_between_2")

- name: dataset__last_in_between_3
description: A dataset model used to test a case where the `last_in_between` value of a column from an appended activity is not the maximum value for that column.
tests:
- dbt_utils.equality:
compare_model: ref("output__last_in_between_3")
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
activity_id,ts,entity_uuid,activity,anonymous_entity_uuid,feature_json,revenue_impact,link,activity_occurrence,activity_repeated_at
2,2022-01-01 22:10:11,1,visit page,,"{""type"": 1}",0,,1,2022-01-03 22:10:11
3,2022-01-02 22:10:11,1,signed up,,"{""type"": 1}",0,,1,
4,2022-01-03 22:10:11,1,visit page,,"{""type"": 2}",0,,2,2022-01-04 22:10:11
5,2022-01-04 22:10:11,1,visit page,,"{""type"": 2}",0,,3,2022-01-06 22:10:11
6,2022-01-05 22:10:11,1,bought something,,"{""type"": 1}",100,,1,
7,2022-01-06 22:10:11,1,visit page,,"{""type"": 1}",0,,4,
8,2022-01-07 22:10:11,7,visit page,,"{""type"": 1}",0,,1,2022-01-09 22:10:11
9,2022-01-08 22:10:11,7,signed up,,"{""type"": 1}",0,,1,
10,2022-01-09 22:10:11,7,visit page,,"{""type"": 2}",0,,2,2022-01-10 22:10:11
11,2022-01-10 22:10:11,7,visit page,,"{""type"": 2}",0,,3,2022-01-12 22:10:11
12,2022-01-11 22:10:11,7,bought something,,"{""type"": 1}",100,,1,
13,2022-01-12 22:10:11,7,visit page,,"{""type"": 1}",0,,4,
11,2022-01-01 22:10:11,1,visit page,,"{""type"": 1}",0,,1,2022-01-03 22:10:11
22,2022-01-02 22:10:11,1,signed up,,"{""type"": 1}",0,,1,
33,2022-01-03 22:10:11,1,visit page,,"{""type"": 2}",0,,2,2022-01-04 22:10:11
44,2022-01-04 22:10:11,1,visit page,,"{""type"": 2}",0,,3,2022-01-06 22:10:11
55,2022-01-05 22:10:11,1,bought something,,"{""type"": 1}",100,,1,2022-01-05 22:10:12
66,2022-01-05 22:10:12,1,bought something,,"{""type"": 2}",99,,2,
77,2022-01-06 22:10:11,1,visit page,,"{""type"": 1}",0,,4,
88,2022-01-07 22:10:11,7,visit page,,"{""type"": 1}",0,,1,2022-01-09 22:10:11
99,2022-01-08 22:10:11,7,signed up,,"{""type"": 1}",0,,1,
1010,2022-01-09 22:10:11,7,visit page,,"{""type"": 2}",0,,2,2022-01-10 22:10:11
1111,2022-01-10 22:10:11,7,visit page,,"{""type"": 2}",0,,3,2022-01-12 22:10:11
1212,2022-01-11 22:10:11,7,bought something,,"{""type"": 1}",100,,1,2022-01-11 22:10:12
1313,2022-01-11 22:10:12,7,bought something,,"{""type"": 2}",99,,2,
1414,2022-01-12 22:10:11,7,visit page,,"{""type"": 1}",0,,4,
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
activity_id,entity_uuid,ts,activity,anonymous_entity_uuid,feature_json,revenue_impact,link,activity_occurrence,activity_repeated_at,first_in_between_bought_something_feature_json,first_in_between_bought_something_ts
3,1,2022-01-02 22:10:11,signed up,,"{""type"": 1}",0,,1,,"{""type"": 1}",2022-01-05 22:10:11
9,7,2022-01-08 22:10:11,signed up,,"{""type"": 1}",0,,1,,"{""type"": 1}",2022-01-11 22:10:11
activity_id,entity_uuid,ts,revenue_impact,first_in_between_bought_something_feature_json,first_in_between_bought_something_ts,first_in_between_bought_something_revenue_impact
22,1,2022-01-02 22:10:11,0,"{""type"": 1}",2022-01-05 22:10:11,100.000000
99,7,2022-01-08 22:10:11,0,"{""type"": 1}",2022-01-11 22:10:11,100.000000
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
activity_id,entity_uuid,ts,activity,anonymous_entity_uuid,feature_json,revenue_impact,link,activity_occurrence,activity_repeated_at,first_in_between_bought_something_feature_json,first_in_between_bought_something_ts
5,1,2022-01-04 22:10:11,visit page,,"{""type"": 2}",0,,3,2022-01-06 22:10:11,"{""type"": 1}",2022-01-05 22:10:11
11,7,2022-01-10 22:10:11,visit page,,"{""type"": 2}",0,,3,2022-01-12 22:10:11,"{""type"": 1}",2022-01-11 22:10:11
2,1,2022-01-01 22:10:11,visit page,,"{""type"": 1}",0,,1,2022-01-03 22:10:11,,
4,1,2022-01-03 22:10:11,visit page,,"{""type"": 2}",0,,2,2022-01-04 22:10:11,,
7,1,2022-01-06 22:10:11,visit page,,"{""type"": 1}",0,,4,,,
8,7,2022-01-07 22:10:11,visit page,,"{""type"": 1}",0,,1,2022-01-09 22:10:11,,
10,7,2022-01-09 22:10:11,visit page,,"{""type"": 2}",0,,2,2022-01-10 22:10:11,,
13,7,2022-01-12 22:10:11,visit page,,"{""type"": 1}",0,,4,,,
activity_id,entity_uuid,ts,revenue_impact,first_in_between_bought_something_feature_json,first_in_between_bought_something_ts,first_in_between_bought_something_revenue_impact
11,1,2022-01-01 22:10:11,0,,,
33,1,2022-01-03 22:10:11,0,,,
44,1,2022-01-04 22:10:11,0,"{""type"": 1}",2022-01-05 22:10:11,100.000000
77,1,2022-01-06 22:10:11,0,,,
88,7,2022-01-07 22:10:11,0,,,
1010,7,2022-01-09 22:10:11,0,,,
1111,7,2022-01-10 22:10:11,0,"{""type"": 1}",2022-01-11 22:10:11,100.000000
1414,7,2022-01-12 22:10:11,0,,,
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
activity_id,entity_uuid,ts,revenue_impact,feature_json,first_in_between_visit_page_feature_json,first_in_between_visit_page_activity_occurrence,first_in_between_visit_page_ts,first_in_between_bought_something_activity_id,first_in_between_bought_something_ts
3,1,2022-01-02 22:10:11,0,"{""type"": 1}","{""type"": 1}",4,2022-01-06 22:10:11,6,2022-01-05 22:10:11
9,7,2022-01-08 22:10:11,0,"{""type"": 1}","{""type"": 1}",4,2022-01-12 22:10:11,12,2022-01-11 22:10:11
activity_id,entity_uuid,ts,revenue_impact,feature_json,first_in_between_visit_page_feature_json,first_in_between_visit_page_activity_occurrence,first_in_between_visit_page_ts,first_in_between_bought_something_revenue_impact,first_in_between_bought_something_activity_id,first_in_between_bought_something_ts
22,1,2022-01-02 22:10:11,0,"{""type"": 1}","{""type"": 1}",4.000000,2022-01-06 22:10:11,100.000000,55,2022-01-05 22:10:11
99,7,2022-01-08 22:10:11,0,"{""type"": 1}","{""type"": 1}",4.000000,2022-01-12 22:10:11,100.000000,1212,2022-01-11 22:10:11
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ activity_id,ts,entity_uuid,activity,anonymous_entity_uuid,feature_json,revenue_i
3,2022-01-02 22:10:11,1,signed up,,"[{""signed up"": 1}]",0,,1,
4,2022-01-03 22:10:11,1,visit page,,"[{""visited page"": 2}]",0,,2,2022-01-04 22:10:11
5,2022-01-04 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,3,2022-01-06 22:10:11
6,2022-01-05 22:10:11,1,bought something,,"[{""bought something"": 1}]",100,,1,
6,2022-01-05 22:10:11,1,bought something,,"[{""bought something"": 1}]",101,,1,2022-01-05 22:10:12
61,2022-01-05 22:10:12,1,bought something,,"[{""bought something"": 1}]",100,,2,
7,2022-01-06 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,4,
8,2022-01-07 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,1,2022-01-09 22:10:11
9,2022-01-08 22:10:11,7,signed up,,"[{""signed up"": 1}]",0,,1,
10,2022-01-09 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,2,2022-01-10 22:10:11
11,2022-01-10 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,3,2022-01-12 22:10:11
12,2022-01-11 22:10:11,7,bought something,,"[{""bought something"": 1}]",100,,1,
12,2022-01-11 22:10:11,7,bought something,,"[{""bought something"": 1}]",101,,1,2022-01-11 22:10:12
121,2022-01-11 22:10:12,7,bought something,,"[{""bought something"": 1}]",100,,2,
13,2022-01-12 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,4,
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
activity_id,entity_uuid,ts,revenue_impact,last_in_between_bought_something_activity_id,last_in_between_bought_something_revenue_impact
2,1,2022-01-01 22:10:11,0,,
4,1,2022-01-03 22:10:11,0,,
5,1,2022-01-04 22:10:11,0,61,100.000000
7,1,2022-01-06 22:10:11,0,,
8,7,2022-01-07 22:10:11,0,,
10,7,2022-01-09 22:10:11,0,,
11,7,2022-01-10 22:10:11,0,121,100.000000
13,7,2022-01-12 22:10:11,0,,
38 changes: 21 additions & 17 deletions macros/dataset.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ params:
#}

{% set columns = dbt_activity_schema.columns() %}
{% set stream = dbt_activity_schema.alias_stream %}
{% set primary = dbt_activity_schema.primary %}
{% set appended = dbt_activity_schema.appended %}
{% set alias_cte = dbt_activity_schema.alias_cte %}
{% set alias_column = dbt_activity_schema.alias_column %}
{% set alias_appended_activity = dbt_activity_schema.alias_appended_activity %}
Expand All @@ -41,12 +42,12 @@ with
filter_activity_stream_using_primary_activity as (
select
{% for col in primary_activity.included_columns + primary_activity.required_columns %}
{{ alias_column(col) }}{%- if not loop.last -%},{%- endif %}
{{ primary() }}.{{ col }}{%- if not loop.last -%},{%- endif %}
{% endfor %}

from {{ activity_stream }} as {{ stream() }}
from {{ activity_stream }} as {{ primary() }}

where {{ alias_column(columns.activity) }} = {{ dbt.string_literal(primary_activity.name) }}
where {{ primary() }}.{{ columns.activity }} = {{ dbt.string_literal(primary_activity.name) }}
and {{ primary_activity.relationship.where_clause }}
),

Expand All @@ -57,40 +58,43 @@ filter_activity_stream_using_primary_activity as (

-- Primary Activity Columns
{% for col in primary_activity.included_columns %}
{{ stream() }}.{{- col }},
{{ primary() }}.{{- col }},
{% endfor %}

{% for col in activity.included_columns %}
{{ render_agg(col, activity, i) }}{% if not loop.last %},{% endif %}
{% call activity.relationship.aggregation_func() %}
{{ appended() }}.{{ col }}
{% endcall %} as {{ dbt_activity_schema.alias_appended_activity(activity, col) }}
{% if not loop.last %},{% endif %}
{% endfor %}

from filter_activity_stream_using_primary_activity as {{ stream() }}
from filter_activity_stream_using_primary_activity as {{ primary() }}

left join {{ activity_stream }} as {{ stream(i) }}
left join {{ activity_stream }} as {{ appended() }}
on (
-- Join on Customer UUID Column
{{ stream(i) }}.{{ columns.customer }} = {{ stream() }}.{{ columns.customer }}
{{ appended() }}.{{ columns.customer }} = {{ primary() }}.{{ columns.customer }}

-- Join the Correct Activity
and {{ stream(i) }}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }}
and {{ appended() }}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }}

-- Relationship Specific Join Conditions
and (
{# nth_ever_join_clause relies on instantiated nth_occurance arg, in
addition to the i passed to the join #}
{% if activity.relationship.name == "nth_ever" %}
{{ activity.relationship.join_clause(activity.relationship.nth_occurance, i) }}
{{ activity.relationship.join_clause(activity.relationship.nth_occurance) }}
{% else %}
{{ activity.relationship.join_clause(i) }}
{{ activity.relationship.join_clause() }}
{% endif %}
)
-- Additional Join Condition
and ( {{ render_join(activity.additional_join_condition, i) }} )
and ( {{ activity.additional_join_condition }} )
)

group by
{% for col in primary_activity.included_columns %}
{{ alias_column(col) }}{%- if not loop.last -%},{%- endif %}
{{ primary() }}.{{ col }}{%- if not loop.last -%},{%- endif %}
{% endfor %}
),

Expand All @@ -100,7 +104,7 @@ rejoin_aggregated_activities as (
select

{% for col in primary_activity.included_columns %}
{{ alias_column(col) }},
{{ primary() }}.{{ col }},
{% endfor %}

{% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %}
Expand All @@ -109,12 +113,12 @@ rejoin_aggregated_activities as (
{% endfor %}
{% endfor %}

from filter_activity_stream_using_primary_activity as {{ stream() }}
from filter_activity_stream_using_primary_activity as {{ primary() }}

{% for activity in appended_activities %}{% set i = loop.index %}

left join {{ alias_cte(activity, i) }}
on {{ alias_cte(activity, i) }}.{{ columns.activity_id }} = {{ stream() }}.{{ columns.activity_id }}
on {{ alias_cte(activity, i) }}.{{ columns.activity_id }} = {{ primary() }}.{{ columns.activity_id }}

{% endfor %}
)
Expand Down
5 changes: 3 additions & 2 deletions macros/relationships/append_only/aggregate_after.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{% macro aggregate_after_join_clause(i) %}

{% set stream = dbt_activity_schema.alias_stream %}
{% set primary = dbt_activity_schema.primary %}
{% set columns = dbt_activity_schema.columns() %}
{% set appended = dbt_activity_schema.appended %}

(
{{ stream(i) }}.{{- columns.ts }} > {{ stream() }}.{{- columns.ts }}
{{ appended() }}.{{- columns.ts }} > {{ primary() }}.{{- columns.ts }}
)
{% endmacro %}

Expand Down
5 changes: 3 additions & 2 deletions macros/relationships/append_only/aggregate_before.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{% macro aggregate_before_join_clause(i) %}

{% set stream = dbt_activity_schema.alias_stream %}
{% set primary = dbt_activity_schema.primary %}
{% set columns = dbt_activity_schema.columns() %}
{% set appended = dbt_activity_schema.appended %}

(
{{ stream(i) }}.{{- columns.ts }} < {{ stream() }}.{{- columns.ts }}
{{ appended() }}.{{- columns.ts }} < {{ primary() }}.{{- columns.ts }}
)
{% endmacro %}

Expand Down
Loading

0 comments on commit eb5f6bf

Please sign in to comment.