Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-83: Restore Uniqueness Constraint on Logical Date, Make It Nullable #46295

Merged
merged 47 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d0e22ad
restore unique constraint on logical date and make it nullable
vatsrahul1001 Jan 29, 2025
2b11aed
restore unique constraint on logical date and make it nullable
vatsrahul1001 Jan 29, 2025
237360d
fix migration file
vatsrahul1001 Jan 29, 2025
6f423c1
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Jan 29, 2025
18efafb
fix migration file
vatsrahul1001 Jan 29, 2025
8f9e076
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Jan 29, 2025
1c47c75
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Jan 29, 2025
d3bfe20
Merge branch 'main' of github.com:astronomer/airflow into restore-uni…
vatsrahul1001 Jan 30, 2025
0fa2477
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Jan 30, 2025
75a684a
fixing tests
vatsrahul1001 Jan 30, 2025
cefc9a3
refactor backfill reprocess logic
vatsrahul1001 Jan 30, 2025
8f84606
fixing tests
vatsrahul1001 Jan 30, 2025
f239846
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Jan 30, 2025
322b25c
fix tests
vatsrahul1001 Feb 1, 2025
0c24eff
fix tests
vatsrahul1001 Feb 1, 2025
f74c84b
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 1, 2025
53d329b
resolve conflict
vatsrahul1001 Feb 3, 2025
c3e29c7
remove default date from logical date in dag run model
vatsrahul1001 Feb 3, 2025
0830d58
Merge branch 'main' of github.com:astronomer/airflow into restore-uni…
vatsrahul1001 Feb 3, 2025
2b4251d
fix task_filter
vatsrahul1001 Feb 3, 2025
3ec1ecd
merge main
vatsrahul1001 Feb 3, 2025
c05c19e
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Feb 3, 2025
4b223d8
Merge branch 'refactor-backfill-reprocess-logic' of github.com:astron…
vatsrahul1001 Feb 3, 2025
16cdaa1
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 3, 2025
5680a76
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Feb 3, 2025
a4bc83d
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 3, 2025
90595d6
fix failing tests
vatsrahul1001 Feb 3, 2025
8b654b5
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 3, 2025
16cd717
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Feb 3, 2025
7d6c9f4
fix tests
vatsrahul1001 Feb 3, 2025
02a82a0
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Feb 3, 2025
83b62bb
implement review comments
vatsrahul1001 Feb 4, 2025
7ca4de3
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Feb 4, 2025
0e507de
Merge branch 'main' of github.com:astronomer/airflow into refactor-ba…
vatsrahul1001 Feb 4, 2025
c64a60e
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 4, 2025
bbef51a
implement review comment
vatsrahul1001 Feb 4, 2025
8cef9f4
fix tests
vatsrahul1001 Feb 4, 2025
be405ca
Merge branch 'refactor-backfill-reprocess-logic' of github.com:astron…
vatsrahul1001 Feb 4, 2025
26cf27b
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 4, 2025
a50083c
Improve logical date setup for asset API tests
uranusjr Feb 5, 2025
0bedc8d
Better fix for logical date duplication
uranusjr Feb 5, 2025
8c14712
Use the run_id to clear that one specific run
uranusjr Feb 5, 2025
a5af752
Merge branch 'main' of github.com:astronomer/airflow into refactor-ba…
vatsrahul1001 Feb 5, 2025
28c195d
fix tests
vatsrahul1001 Feb 5, 2025
10b1199
remove backfill changes + xfail backfill tests
vatsrahul1001 Feb 5, 2025
da8e5ce
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 5, 2025
e08a227
Improve setup in API tests
uranusjr Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# under the License.

"""
Drop ``execution_date`` unique constraint on DagRun.
Make logical_date nullable.

The column has also been renamed to logical_date, although the Python model is
The column has been renamed to logical_date, although the Python model is
not changed. This allows us to not need to fix all the Python code at once, but
still do the two changes in one migration instead of two.

Expand Down Expand Up @@ -49,10 +49,15 @@ def upgrade():
"execution_date",
new_column_name="logical_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
nullable=True,
)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique")
batch_op.create_unique_constraint(
"dag_run_dag_id_logical_date_key",
columns=["dag_id", "logical_date"],
)


def downgrade():
Expand All @@ -61,9 +66,11 @@ def downgrade():
"logical_date",
new_column_name="execution_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
nullable=False,
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique")
batch_op.create_unique_constraint(
"dag_run_dag_id_execution_date_key",
columns=["dag_id", "execution_date"],
Expand Down
26 changes: 24 additions & 2 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -141,6 +140,7 @@ class BackfillDagRunExceptionReason(str, Enum):
IN_FLIGHT = "in flight"
ALREADY_EXISTS = "already exists"
UNKNOWN = "unknown"
CLEARED_RUN = "cleared existing run"


class BackfillDagRun(Base):
Expand Down Expand Up @@ -194,7 +194,11 @@ def _get_latest_dag_run_row_query(info, session):
def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None:
non_create_reason = None
if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
if dr.clear_number == 0:
non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
else:
non_create_reason = BackfillDagRunExceptionReason.CLEARED_RUN

vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
elif reprocess_behavior is ReprocessBehavior.NONE:
non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
elif reprocess_behavior is ReprocessBehavior.FAILED:
Expand Down Expand Up @@ -262,7 +266,23 @@ def _create_backfill_dag_run(
dag_run_conf,
backfill_sort_ordinal,
session,
from_date: datetime,
to_date: datetime,
):
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
from airflow.models import DAG

dr = session.scalar(_get_latest_dag_run_row_query(info, session))
if (
dr
and dr.state not in {DagRunState.RUNNING}
and reprocess_behavior in {ReprocessBehavior.COMPLETED, ReprocessBehavior.FAILED}
):
DAG.clear_dags(
[dag],
start_date=from_date,
end_date=to_date,
dag_run_state=DagRunState.QUEUED,
)
with session.begin_nested() as nested:
dr = session.scalar(
with_row_locks(
Expand Down Expand Up @@ -412,6 +432,8 @@ def _create_backfill(
reprocess_behavior=br.reprocess_behavior,
backfill_sort_ordinal=backfill_sort_ordinal,
session=session,
from_date=from_date,
to_date=to_date,
)
log.info(
"created backfill dag run dag_id=%s backfill_id=%s, info=%s",
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ def create_dagrun(
self,
*,
run_id: str,
logical_date: datetime,
logical_date: datetime | None,
data_interval: tuple[datetime, datetime],
run_after: datetime,
conf: dict | None = None,
Expand Down
11 changes: 8 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class DagRun(Base, LoggingMixin):
id = Column(Integer, primary_key=True)
dag_id = Column(StringID(), nullable=False)
queued_at = Column(UtcDateTime)
logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
logical_date = Column(UtcDateTime, nullable=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
_state = Column("state", String(50), default=DagRunState.QUEUED)
Expand Down Expand Up @@ -186,6 +186,7 @@ class DagRun(Base, LoggingMixin):
__table_args__ = (
Index("dag_id_state", dag_id, _state),
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"),
Index("idx_dag_run_dag_id", dag_id),
Index("idx_dag_run_run_after", run_after),
Index(
Expand Down Expand Up @@ -1321,8 +1322,12 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
def task_filter(task: Operator) -> bool:
return task.task_id not in task_ids and (
self.run_type == DagRunType.BACKFILL_JOB
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
or (task.start_date is None or task.start_date <= self.logical_date)
and (task.end_date is None or self.logical_date <= task.end_date)
or (
task.start_date is None
or self.logical_date is None
or task.start_date <= self.logical_date
)
and (task.end_date is None or self.logical_date is None or self.logical_date <= task.end_date)
)

created_counts: dict[str, int] = defaultdict(int)
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
232f2f252ce0d3889fa5a9ceb00c88788e12083a6ea0c155c74d3fe61ad02412
76818a684a0e05c1fd3ecee6c74b204c9a8d59b22966c62ba08089312fbd6ff4
1 change: 0 additions & 1 deletion docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Drop ``execution_date`` unique constraint on DagRun. |
| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Make logical_date nullable. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a2c32e6c7729`` | ``0bfc26bc256e`` | ``3.0.0`` | Add triggered_by field to DagRun. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
10 changes: 7 additions & 3 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _create_dag_run(session, num: int = 2):
dag_id="source_dag_id",
run_id=f"source_run_id_{i}",
run_type=DagRunType.MANUAL,
logical_date=DEFAULT_DATE,
logical_date=DEFAULT_DATE + timedelta(days=i - 1),
start_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
external_trigger=True,
Expand Down Expand Up @@ -579,7 +579,9 @@ def test_should_respond_200(self, test_client, session):
{
"run_id": "source_run_id_2",
"dag_id": "source_dag_id",
"logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"logical_date": from_datetime_to_zulu_without_ms(
DEFAULT_DATE + timedelta(days=1),
),
"start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"state": "success",
Expand Down Expand Up @@ -747,7 +749,9 @@ def test_should_mask_sensitive_extra(self, test_client, session):
{
"run_id": "source_run_id_2",
"dag_id": "source_dag_id",
"logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"logical_date": from_datetime_to_zulu_without_ms(
DEFAULT_DATE + timedelta(days=1),
),
"start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"state": "success",
Expand Down
44 changes: 21 additions & 23 deletions tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, sessio
)

@time_machine.travel(timezone.utcnow(), tick=False)
def test_should_response_200_for_duplicate_logical_date(self, test_client):
def test_should_response_409_for_duplicate_logical_date(self, test_client):
RUN_ID_1 = "random_1"
RUN_ID_2 = "random_2"
now = timezone.utcnow().isoformat().replace("+00:00", "Z")
Expand All @@ -1333,28 +1333,26 @@ def test_should_response_200_for_duplicate_logical_date(self, test_client):
json={"dag_run_id": RUN_ID_2, "note": note},
)

assert response_1.status_code == response_2.status_code == 200
body1 = response_1.json()
body2 = response_2.json()

for each_run_id, each_body in [(RUN_ID_1, body1), (RUN_ID_2, body2)]:
assert each_body == {
"dag_run_id": each_run_id,
"dag_id": DAG1_ID,
"logical_date": now,
"queued_at": now,
"start_date": None,
"end_date": None,
"data_interval_start": now,
"data_interval_end": now,
"last_scheduling_decision": None,
"run_type": "manual",
"state": "queued",
"external_trigger": True,
"triggered_by": "rest_api",
"conf": {},
"note": note,
}
assert response_1.status_code == 200
assert response_1.json() == {
"dag_run_id": RUN_ID_1,
"dag_id": DAG1_ID,
"logical_date": now,
"queued_at": now,
"start_date": None,
"end_date": None,
"data_interval_start": now,
"data_interval_end": now,
"last_scheduling_decision": None,
"run_type": "manual",
"state": "queued",
"external_trigger": True,
"triggered_by": "rest_api",
"conf": {},
"note": note,
}

assert response_2.status_code == 409
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.parametrize(
"data_interval_start, data_interval_end",
Expand Down
19 changes: 13 additions & 6 deletions tests/api_fastapi/core_api/routes/public/test_task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import datetime as dt
import itertools
import os
from datetime import timedelta
from unittest import mock

import pendulum
Expand Down Expand Up @@ -1027,20 +1028,26 @@ def test_should_respond_200_for_dag_id_filter(self, test_client, session):
assert count == len(response.json()["task_instances"])

@pytest.mark.parametrize(
"order_by_field", ["start_date", "logical_date", "data_interval_start", "data_interval_end"]
"order_by_field, base_date",
[
("start_date", DEFAULT_DATETIME_1 + timedelta(days=20)),
("logical_date", DEFAULT_DATETIME_2),
("data_interval_start", DEFAULT_DATETIME_1 + timedelta(days=5)),
("data_interval_end", DEFAULT_DATETIME_2 + timedelta(days=8)),
],
)
def test_should_respond_200_for_order_by(self, order_by_field, test_client, session):
def test_should_respond_200_for_order_by(self, order_by_field, base_date, test_client, session):
dag_id = "example_python_operator"

dag_runs = [
DagRun(
dag_id=dag_id,
run_id=f"run_{i}",
run_type=DagRunType.MANUAL,
logical_date=DEFAULT_DATETIME_1 + dt.timedelta(days=i),
logical_date=base_date + dt.timedelta(days=i),
data_interval=(
DEFAULT_DATETIME_1 + dt.timedelta(days=i),
DEFAULT_DATETIME_1 + dt.timedelta(days=i, hours=1),
base_date + dt.timedelta(days=i),
base_date + dt.timedelta(days=i, hours=1),
),
)
for i in range(10)
Expand All @@ -1051,7 +1058,7 @@ def test_should_respond_200_for_order_by(self, order_by_field, test_client, sess
self.create_task_instances(
session,
task_instances=[
{"run_id": f"run_{i}", "start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))}
{"run_id": f"run_{i}", "start_date": base_date + dt.timedelta(minutes=(i + 1))}
for i in range(10)
],
dag_id=dag_id,
Expand Down
24 changes: 12 additions & 12 deletions tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,28 +159,32 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
ReprocessBehavior.NONE,
{
"2021-01-01": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
(
ReprocessBehavior.FAILED,
{
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
(
ReprocessBehavior.COMPLETED,
{
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-05": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
Expand All @@ -205,12 +209,8 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session):
# whether a dag run is created for backfill depends on
# the last run for a logical date
("2021-01-02", ["failed"]),
("2021-01-03", ["success", "failed"]), # <-- 2021-01-03 is "failed"
("2021-01-04", ["failed", "success"]), # <-- 2021-01-04 is "success"
("2021-01-05", ["success", "success"]),
("2021-01-06", ["failed", "failed"]),
("2021-01-07", ["running", "running"]),
("2021-01-08", ["failed", "running"]),
("2021-01-05", ["success"]),
("2021-01-08", ["running"]),
]
for state in states
]
Expand Down Expand Up @@ -266,12 +266,12 @@ def _get_bdr(date):

# 2021-01-04 is "failed" so it may or may not be reprocessed depending
# on the configuration
bdr = _get_bdr("2021-01-04")
bdr = _get_bdr("2021-01-05")
actual_reason = bdr.exception_reason
if reprocess_behavior is ReprocessBehavior.FAILED:
assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS
assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN
elif reprocess_behavior is ReprocessBehavior.COMPLETED:
assert actual_reason is None
assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN
elif reprocess_behavior is ReprocessBehavior.NONE:
assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS

Expand Down
1 change: 1 addition & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session):

vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
dr = dag_maker.create_dagrun(
run_id="test_dagrun_no_deadlock_1",
run_type=DagRunType.SCHEDULED,
start_date=DEFAULT_DATE,
)
dr2 = dag_maker.create_dagrun_after(
Expand Down
Loading
Loading