Skip to content

Commit

Permalink
Fix: Unexpected backfill of a parent of a changed forward-only child …
Browse files Browse the repository at this point in the history
…when the child runs before the parent (#3871)
  • Loading branch information
izeigerman committed Feb 19, 2025
1 parent 959b606 commit 4ef90ce
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 10 deletions.
20 changes: 12 additions & 8 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1712,16 +1712,20 @@ def missing_intervals(
snapshot_start_date = start_dt
snapshot_end_date: TimeLike = end_date

existing_interval_end = interval_end_per_model.get(snapshot.name)
if existing_interval_end and existing_interval_end > to_timestamp(snapshot_start_date):
snapshot_end_date = existing_interval_end

interval = restatements.get(snapshot.snapshot_id)
if interval:
snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in interval)
restated_interval = restatements.get(snapshot.snapshot_id)
if restated_interval:
snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in restated_interval)
snapshot = snapshot.copy()
snapshot.intervals = snapshot.intervals.copy()
snapshot.remove_interval(interval)
snapshot.remove_interval(restated_interval)
else:
existing_interval_end = interval_end_per_model.get(snapshot.name)
if existing_interval_end:
if to_timestamp(snapshot_start_date) >= existing_interval_end:
# The start exceeds the provided interval end, so we can skip this snapshot
# since it doesn't have missing intervals by definition
continue
snapshot_end_date = existing_interval_end

missing_interval_end_date = snapshot_end_date
node_end_date = snapshot.node.end
Expand Down
67 changes: 67 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,73 @@ def test_cron_not_aligned_with_day_boundary_new_model(init_and_plan_context: t.C
]


@time_machine.travel("2023-01-08 00:00:00 UTC")
def test_forward_only_preview_child_that_runs_before_parent(init_and_plan_context: t.Callable):
context, _ = init_and_plan_context("examples/sushi")

# This model runs at minute 30 of every hour
upstream_model = load_sql_based_model(
d.parse(
"""
MODEL (
name memory.sushi.upstream_model,
kind FULL,
cron '30 * * * *',
start '2023-01-01',
);
SELECT 1 AS a;
"""
)
)
context.upsert_model(upstream_model)

# This model runs at minute 0 of every hour, so it runs before the upstream model
downstream_model = load_sql_based_model(
d.parse(
"""
MODEL (
name memory.sushi.downstream_model,
kind INCREMENTAL_BY_TIME_RANGE(
time_column event_date,
forward_only True,
),
cron '0 * * * *',
start '2023-01-01',
);
SELECT a, '2023-01-06' AS event_date FROM memory.sushi.upstream_model;
"""
)
)
context.upsert_model(downstream_model)

context.plan("prod", skip_tests=True, auto_apply=True)

with time_machine.travel("2023-01-08 00:05:00 UTC"):
# The downstream model runs but not the upstream model
context.run("prod")

# Now it's time for the upstream model to run but it hasn't run yet
with time_machine.travel("2023-01-08 00:35:00 UTC"):
# Make a change to the downstream model.
downstream_model = add_projection_to_model(t.cast(SqlModel, downstream_model), literal=True)
context.upsert_model(downstream_model)

# The plan should only backfill the downstream model despite upstream missing intervals
plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build()
assert plan.missing_intervals == [
SnapshotIntervals(
snapshot_id=context.get_snapshot(
downstream_model.name, raise_if_missing=True
).snapshot_id,
intervals=[
(to_timestamp("2023-01-07 23:00:00"), to_timestamp("2023-01-08 00:00:00"))
],
),
]


@time_machine.travel("2023-01-08 00:00:00 UTC")
def test_forward_only_monthly_model(init_and_plan_context: t.Callable):
context, _ = init_and_plan_context("examples/sushi")
Expand Down
3 changes: 1 addition & 2 deletions tests/core/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2197,11 +2197,10 @@ def test_missing_intervals_interval_end_per_model(make_snapshot):
snapshot_a.name: to_timestamp("2023-01-09"),
snapshot_b.name: to_timestamp(
"2023-01-06"
), # The interval end is before the start. This should be ignored.
), # The interval end is before the start. The snapshot will be skipped
},
) == {
snapshot_a: [(to_timestamp("2023-01-08"), to_timestamp("2023-01-09"))],
snapshot_b: [(to_timestamp("2023-01-08"), to_timestamp("2023-01-09"))],
}


Expand Down

0 comments on commit 4ef90ce

Please sign in to comment.